[Ocfs2-commits] zab commits r1933 - trunk/fs/ocfs2
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Wed Mar 2 14:01:31 CST 2005
Author: zab
Signed-off-by: mfasheh
Date: 2005-03-02 14:01:29 -0600 (Wed, 02 Mar 2005)
New Revision: 1933
Added:
trunk/fs/ocfs2/aio.c
trunk/fs/ocfs2/aio.h
Modified:
trunk/fs/ocfs2/Makefile
trunk/fs/ocfs2/aops.c
trunk/fs/ocfs2/dlmglue.c
trunk/fs/ocfs2/dlmglue.h
trunk/fs/ocfs2/file.c
trunk/fs/ocfs2/file.h
trunk/fs/ocfs2/mmap.c
trunk/fs/ocfs2/mmap.h
trunk/fs/ocfs2/ocfs.h
Log:
More formally address AIO read and write. Primarily this brings DLM locking
to the operations and allows AIO write to extend the file. To avoid
deadlocking between the concurrent locked AIO ops we make waiting in the
DLM for lock levels somewhat async.
o teach dlmglue to register a callback when it goes to wait on a flag
instead of sleeping. refactor sync callers to use callbacks that wake.
o teach the mmap.c lock collection acquisition to return EIOCBQUEUED from
dlmglue and make it restartable.
o while we're in mmap.c, clean up how it treats the 'target' inode by making
all locking take generic inputs from the binode.
o hoist the locking and extending parts of write into helpers
o introduce fs/ocfs2/aio.[ch] which track locking state across retries and
teardown when the iocb's dtor is called
o call blockdev_direct_IO_no_locking() now that we're doing the locking
ourselves
Signed-off-by: mfasheh
Modified: trunk/fs/ocfs2/Makefile
===================================================================
--- trunk/fs/ocfs2/Makefile 2005-03-02 18:17:23 UTC (rev 1932)
+++ trunk/fs/ocfs2/Makefile 2005-03-02 20:01:29 UTC (rev 1933)
@@ -51,6 +51,7 @@
SOURCES = \
24io.c \
+ aio.c \
alloc.c \
aops.c \
buffer_head_io.c \
@@ -86,6 +87,7 @@
ocfs_compat.h \
ocfs_journal.h \
buffer_head_io.h \
+ aio.h \
alloc.h \
dcache.h \
dir.h \
Added: trunk/fs/ocfs2/aio.c
===================================================================
--- trunk/fs/ocfs2/aio.c 2005-03-02 18:17:23 UTC (rev 1932)
+++ trunk/fs/ocfs2/aio.c 2005-03-02 20:01:29 UTC (rev 1933)
@@ -0,0 +1,291 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * aio.c
+ *
+ * aio read and write
+ *
+ * Copyright (C) 2002, 2004, 2005 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+
+#include "ocfs_compat.h"
+
+#include <linux/version.h>
+
+#include <linux/fs.h>
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+#include <linux/pagemap.h>
+#include <linux/uio.h>
+
+#include "ocfs_log.h"
+#include "ocfs.h"
+#include "ocfs2.h"
+
+#include "alloc.h"
+#include "dir.h"
+#include "dlmglue.h"
+#include "extent_map.h"
+#include "file.h"
+#include "sysfile.h"
+#include "inode.h"
+#include "ioctl.h"
+#include "mmap.h"
+#include "suballoc.h"
+#include "util.h"
+
+struct ocfs2_kiocb_private {
+ struct list_head kp_teardown_item;
+ unsigned kp_have_alloc_sem:1,
+ kp_have_write_locks:1;
+ struct inode *kp_inode;
+ ocfs2_buffer_lock_ctxt kp_ctxt;
+ struct ocfs2_write_lock_info kp_info;
+};
+
+static void okp_teardown_from_list(void *unused);
+static DECLARE_WORK(okp_teardown_work, okp_teardown_from_list, NULL);
+static LIST_HEAD(okp_teardown_list);
+static spinlock_t okp_teardown_lock = SPIN_LOCK_UNLOCKED;
+
+static void okp_teardown(struct ocfs2_kiocb_private *okp)
+{
+ BUG_ON(okp->kp_inode == NULL);
+
+ if (okp->kp_info.wl_unlock_ctxt)
+ ocfs2_unlock_buffer_inodes(&okp->kp_ctxt);
+ if (okp->kp_have_alloc_sem)
+ up_read(&OCFS_I(okp->kp_inode)->ip_alloc_sem);
+
+ iput(okp->kp_inode);
+ kfree(okp);
+}
+
+static void okp_teardown_from_list(void *unused)
+{
+ unsigned long flags;
+ struct list_head *pos, *tmp;
+ struct ocfs2_kiocb_private *okp;
+ LIST_HEAD(my_list);
+
+ spin_lock_irqsave(&okp_teardown_lock, flags);
+ list_splice_init(&okp_teardown_list, &my_list);
+ spin_unlock_irqrestore(&okp_teardown_lock, flags);
+
+ list_for_each_safe(pos, tmp, &my_list) {
+ okp = list_entry(pos, struct ocfs2_kiocb_private,
+ kp_teardown_item);
+ list_del_init(&okp->kp_teardown_item);
+
+ okp_teardown(okp);
+ }
+}
+
+static void ocfs2_ki_dtor(struct kiocb *iocb)
+{
+ struct ocfs2_kiocb_private *okp = iocb->private;
+ unsigned long flags;
+
+ /* okp_alloc only assigns the iocb->private and ->ki_dtor pointers if
+ * it was able to alloc the okp and get an inode reference */
+ BUG_ON(okp == NULL);
+ BUG_ON(okp->kp_inode == NULL);
+ BUG_ON(!list_empty(&okp->kp_teardown_item));
+
+ if (in_interrupt()) {
+ /*
+ * there is very little in the teardown that is interrupt-safe,
+ * push it to keventd
+ */
+ spin_lock_irqsave(&okp_teardown_lock, flags);
+ list_add_tail(&okp->kp_teardown_item, &okp_teardown_list);
+ schedule_work(&okp_teardown_work);
+ spin_unlock_irqrestore(&okp_teardown_lock, flags);
+ } else
+ okp_teardown(okp);
+}
+
+static struct ocfs2_kiocb_private *okp_alloc(struct kiocb *iocb)
+{
+ struct inode *inode = iocb->ki_filp->f_dentry->d_inode;
+ struct ocfs2_kiocb_private *okp;
+
+ okp = kcalloc(1, sizeof(*okp), GFP_KERNEL);
+ if (okp == NULL) {
+ okp = ERR_PTR(-ENOMEM);
+ goto out;
+ }
+
+ /* our dtor only gets registerd if we can guarantee that it holds
+ * a reference to the inode */
+ okp->kp_inode = igrab(inode);
+ if (okp->kp_inode == NULL) {
+ kfree(okp);
+ okp = ERR_PTR(-EINVAL);
+ goto out;
+ }
+
+ iocb->private = okp;
+ iocb->ki_dtor = ocfs2_ki_dtor;
+ INIT_BUFFER_LOCK_CTXT(&okp->kp_ctxt);
+ INIT_LIST_HEAD(&okp->kp_teardown_item);
+out:
+ return okp;
+}
+
+/* this is a hack until 2.6 gets its story straight regarding bubbling up
+ * EIOCBQUEUED and the like. in mainline we'd pass an iocb down and do lots of
+ * is_sync() testing. In suparna's patches the dlm would use waitqueues and
+ * the waiting primatives would test current->wait for sync. until that gets
+ * settled we have a very limited async/cb mechanism in the dlm and have it
+ * call this which triggers a retry. */
+static void ocfs2_aio_kick(int status, unsigned long data)
+{
+ struct kiocb *iocb = (struct kiocb *)data;
+ /* XXX worry about racing with ki_cancel once we set it */
+ kick_iocb(iocb);
+}
+
+/* this is called as iocb->ki_retry so it is careful to only repeat
+ * what is needed */
+ssize_t ocfs2_file_aio_read(struct kiocb *iocb, char *buf, size_t count,
+ loff_t pos)
+{
+ struct ocfs2_kiocb_private *okp = iocb->private;
+ struct file *filp = iocb->ki_filp;
+ struct inode *inode = filp->f_dentry->d_inode;
+ ocfs2_backing_inode *target_binode;
+ int ret;
+
+ if (okp == NULL) {
+ okp = okp_alloc(iocb);
+ if (IS_ERR(okp)) {
+ ret = PTR_ERR(okp);
+ LOG_ERROR_STATUS(ret);
+ goto out;
+ }
+
+ ret = ocfs2_setup_io_locks(inode->i_sb, inode, buf, count,
+ &okp->kp_ctxt, &target_binode);
+ if (ret < 0) {
+ LOG_ERROR_STATUS(ret);
+ goto out;
+ }
+
+ okp->kp_ctxt.b_cb = ocfs2_aio_kick;
+ okp->kp_ctxt.b_cb_data = (unsigned long)iocb;
+ target_binode->ba_lock_data = filp->f_flags & O_DIRECT ? 0 : 1;
+ }
+
+ /* this might return EIOCBQUEUED and we'll come back again to
+ * continue the locking. It's harmless to call it once it has
+ * returned success.. */
+ okp->kp_info.wl_unlock_ctxt = 1; /* re-use the write info path */
+ ret = ocfs2_lock_buffer_inodes(&okp->kp_ctxt, NULL);
+ if (ret < 0) {
+ if (ret != -EINTR)
+ LOG_ERROR_STATUS(ret);
+ goto out;
+ }
+
+ /* hold the ip_alloc_sem across the op */
+ if (!okp->kp_have_alloc_sem) {
+ down_read(&OCFS_I(inode)->ip_alloc_sem);
+ okp->kp_have_alloc_sem = 1;
+ }
+
+ ret = generic_file_aio_read(iocb, buf, count, pos);
+out:
+ /* ki_dtor is always called, no matter what we return. */
+ return ret;
+}
+
+/* this is called as iocb->ki_retry so it is careful to only repeat
+ * what is needed */
+ssize_t ocfs2_file_aio_write(struct kiocb *iocb, const char *buf,
+ size_t count, loff_t pos)
+{
+ struct ocfs2_kiocb_private *okp = iocb->private;
+ struct file *filp = iocb->ki_filp;
+ struct inode *inode = filp->f_dentry->d_inode;
+ int ret = 0;
+ struct iovec local_iov = { .iov_base = (void *)buf,
+ .iov_len = count };
+
+ if (okp == NULL) {
+ okp = okp_alloc(iocb);
+ if (IS_ERR(okp)) {
+ ret = PTR_ERR(okp);
+ LOG_ERROR_STATUS(ret);
+ goto out;
+ }
+
+ okp->kp_ctxt.b_cb = ocfs2_aio_kick;
+ okp->kp_ctxt.b_cb_data = (unsigned long)iocb;
+ }
+
+ if (!okp->kp_have_write_locks) {
+ ret = ocfs_write_lock_maybe_extend(filp, buf, count,
+ &iocb->ki_pos,
+ &okp->kp_info,
+ &okp->kp_ctxt);
+ okp->kp_have_write_locks = 1;
+ if (okp->kp_info.wl_extended) {
+ /*
+ * this is not a particularly nice place to do this but
+ * extending aio in ocfs2 is not yet a priority. it
+ * means that we'll write zeros in the buffered case
+ * before then over-writing them with the real op. It
+ * also sleeps in the aio submission context.
+ */
+ ocfs2_file_finish_extension(inode,
+ !okp->kp_info.wl_newsize,
+ !okp->kp_info.wl_do_direct_io);
+ okp->kp_info.wl_extended = 0;
+ }
+ if (ret) {
+ LOG_ERROR_STATUS(ret);
+ goto out;
+ }
+ }
+
+ /* hold the ip_alloc_sem across the op */
+ if (!okp->kp_have_alloc_sem) {
+ down_read(&OCFS_I(inode)->ip_alloc_sem);
+ okp->kp_have_alloc_sem = 1;
+ }
+
+out:
+ /*
+ * never hold i_sem when we leave this function, nor when we call
+ * g_f_a_w(). we've done all extending and inode field updating under
+ * the i_sem and we hold the ip_alloc_sem for reading across the ops.
+ * ocfs2_direct_IO calls blockdev_direct_IO with NO_LOCKING.
+ */
+ if (okp->kp_info.wl_have_i_sem) {
+ up(&inode->i_sem);
+ okp->kp_info.wl_have_i_sem = 0;
+ }
+ if (ret == 0)
+ ret = generic_file_aio_write_nolock(iocb, &local_iov, 1,
+ &iocb->ki_pos);
+
+ /* ki_dtor is always called, no matter what we return. */
+ return ret;
+}
Added: trunk/fs/ocfs2/aio.h
===================================================================
--- trunk/fs/ocfs2/aio.h 2005-03-02 18:17:23 UTC (rev 1932)
+++ trunk/fs/ocfs2/aio.h 2005-03-02 20:01:29 UTC (rev 1933)
@@ -0,0 +1,36 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * aio.h
+ *
+ * Function prototypes
+ *
+ * Copyright (C) 2002, 2004, 2005 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+
+#ifndef OCFS2_AIO_H
+#define OCFS2_AIO_H
+
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
+ssize_t ocfs2_file_aio_write(struct kiocb *iocb, const char *buf,
+ size_t count, loff_t pos);
+ssize_t ocfs2_file_aio_read(struct kiocb *iocb, char *buf, size_t count,
+ loff_t pos);
+#endif
+
+#endif /* OCFS2_AIO_H */
Modified: trunk/fs/ocfs2/aops.c
===================================================================
--- trunk/fs/ocfs2/aops.c 2005-03-02 18:17:23 UTC (rev 1932)
+++ trunk/fs/ocfs2/aops.c 2005-03-02 20:01:29 UTC (rev 1933)
@@ -576,8 +576,10 @@
LOG_ENTRY ();
/* blockdev_direct_IO checks alignment for us, using */
- ret = blockdev_direct_IO (rw, iocb, inode, inode->i_sb->s_bdev, iov, offset, nr_segs, ocfs_direct_IO_get_blocks, NULL);
-
+ ret = blockdev_direct_IO_no_locking(rw, iocb, inode,
+ inode->i_sb->s_bdev, iov, offset,
+ nr_segs, ocfs_direct_IO_get_blocks,
+ NULL);
LOG_EXIT_INT (ret);
LOG_CLEAR_CONTEXT();
Modified: trunk/fs/ocfs2/dlmglue.c
===================================================================
--- trunk/fs/ocfs2/dlmglue.c 2005-03-02 18:17:23 UTC (rev 1932)
+++ trunk/fs/ocfs2/dlmglue.c 2005-03-02 20:01:29 UTC (rev 1933)
@@ -174,7 +174,9 @@
static int ocfs2_cluster_lock(ocfs_super *osb,
ocfs2_lock_res *lockres,
int level,
- int lkm_flags);
+ int lkm_flags,
+ ocfs2_lock_callback cb,
+ unsigned long cb_data);
static void ocfs2_cluster_unlock(ocfs_super *osb,
ocfs2_lock_res *lockres,
int level);
@@ -323,6 +325,7 @@
res->l_type = type;
res->l_level = LKM_IVMODE;
INIT_LIST_HEAD(&res->l_blocked_list);
+ INIT_LIST_HEAD(&res->l_flag_cb_list);
res->l_priv = priv;
LOG_EXIT();
}
@@ -449,6 +452,37 @@
return new_level;
}
+/* XXX must be called with lockres->l_lock held */
+static void lockres_set_flags(ocfs2_lock_res *lockres, unsigned long newflags)
+{
+ struct list_head *pos, *tmp;
+ struct ocfs2_lockres_flag_callback *fcb;
+
+ lockres->l_flags = newflags;
+
+ list_for_each_safe(pos, tmp, &lockres->l_flag_cb_list) {
+ fcb = list_entry(pos, struct ocfs2_lockres_flag_callback,
+ fc_lockres_item);
+ if ((lockres->l_flags & fcb->fc_flag_mask) !=
+ fcb->fc_flag_goal)
+ continue;
+
+ list_del_init(&fcb->fc_lockres_item);
+ fcb->fc_cb(0, fcb->fc_data);
+ if (fcb->fc_free_once_called)
+ kfree(fcb);
+ }
+}
+
+static void lockres_or_flags(ocfs2_lock_res *lockres, unsigned long or)
+{
+ lockres_set_flags(lockres, lockres->l_flags | or);
+}
+static void lockres_clear_flags(ocfs2_lock_res *lockres, unsigned long clear)
+{
+ lockres_set_flags(lockres, lockres->l_flags & ~clear);
+}
+
static inline void ocfs2_generic_handle_downconvert_action(ocfs2_lock_res *lockres)
{
LOG_ENTRY();
@@ -462,9 +496,9 @@
if (lockres->l_level <=
ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
lockres->l_blocking = LKM_NLMODE;
- lockres->l_flags &= ~OCFS2_LOCK_BLOCKED;
+ lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
}
- lockres->l_flags &= ~OCFS2_LOCK_BUSY;
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
LOG_EXIT();
}
@@ -511,10 +545,10 @@
* *anything* however should mark ourselves as needing an
* update */
if (lockres->l_level == LKM_NLMODE)
- lockres->l_flags |= OCFS2_LOCK_NEEDS_REFRESH;
+ lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
lockres->l_level = lockres->l_requested;
- lockres->l_flags &= ~OCFS2_LOCK_BUSY;
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
LOG_EXIT();
}
@@ -542,11 +576,11 @@
if (lockres->l_requested > LKM_NLMODE &&
!(lockres->l_flags & OCFS2_LOCK_LOCAL))
- lockres->l_flags |= OCFS2_LOCK_NEEDS_REFRESH;
+ lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
lockres->l_level = lockres->l_requested;
- lockres->l_flags |= OCFS2_LOCK_ATTACHED;
- lockres->l_flags &= ~OCFS2_LOCK_BUSY;
+ lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
LOG_EXIT();
}
@@ -570,6 +604,7 @@
OCFS_ASSERT(ocfs2_is_inode_lock(lockres));
spin_lock(&lockres->l_lock);
+
lksb = &(lockres->l_lksb);
if (lksb->status != DLM_NORMAL) {
LOG_ERROR_ARGS("ocfs2_inode_ast_func: lksb status value of %u "
@@ -604,7 +639,7 @@
/* data locking ignores refresh flag for now. */
if (lockres->l_type == OCFS_TYPE_DATA)
- lockres->l_flags &= ~OCFS2_LOCK_NEEDS_REFRESH;
+ lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
/* set it to something invalid so if we get called again we
* can catch it. */
@@ -743,7 +778,7 @@
{
LOG_ENTRY();
spin_lock(&lockres->l_lock);
- lockres->l_flags &= ~OCFS2_LOCK_BUSY;
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
if (convert)
lockres->l_action = OCFS2_AST_INVALID;
else
@@ -773,7 +808,7 @@
lockres->l_action = OCFS2_AST_ATTACH;
lockres->l_requested = level;
- lockres->l_flags |= OCFS2_LOCK_BUSY;
+ lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
spin_unlock(&lockres->l_lock);
status = dlmlock(osb->dlm,
@@ -836,6 +871,18 @@
LOG_EXIT();
}
+static void lockres_add_flag_callback(ocfs2_lock_res *lockres,
+ struct ocfs2_lockres_flag_callback *fcb,
+ unsigned long mask, unsigned long goal)
+{
+ BUG_ON(!list_empty(&fcb->fc_lockres_item));
+ BUG_ON(fcb->fc_cb == NULL);
+
+ list_add_tail(&fcb->fc_lockres_item, &lockres->l_flag_cb_list);
+ fcb->fc_flag_mask = mask;
+ fcb->fc_flag_goal = goal;
+}
+
/* predict what lock level we'll be dropping down to on behalf
* of another node, and return true if the currently wanted
* level will be compatible with it. */
@@ -847,21 +894,58 @@
return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
}
+/* these are generic and could be used elsewhere */
+struct ocfs2_status_completion {
+ int sc_status;
+ struct completion sc_complete;
+};
+
+static void ocfs2_status_completion_cb(int rc, unsigned long data)
+{
+ struct ocfs2_status_completion *sc;
+
+ sc = (struct ocfs2_status_completion *)data;
+ sc->sc_status = rc;
+ complete(&sc->sc_complete);
+}
+
static int ocfs2_cluster_lock(ocfs_super *osb,
ocfs2_lock_res *lockres,
int level,
- int lkm_flags)
+ int lkm_flags,
+ ocfs2_lock_callback cb,
+ unsigned long cb_data)
{
+ struct ocfs2_lockres_flag_callback _fcb, *fcb = &_fcb;
+ struct ocfs2_status_completion sc;
+ dlm_status status;
int ret;
int catch_signals = 1;
- dlm_status status;
LOG_ENTRY();
+ if (cb != NULL) {
+ fcb = kmalloc(sizeof(*fcb), GFP_NOFS);
+ if (fcb == NULL) {
+ ret = -ENOMEM;
+ goto out;
+ }
+ fcb->fc_cb = cb;
+ fcb->fc_data = cb_data;
+ fcb->fc_free_once_called = 1;
+ } else {
+ init_completion(&sc.sc_complete);
+ fcb->fc_cb = ocfs2_status_completion_cb;
+ fcb->fc_data = (unsigned long)≻
+ fcb->fc_free_once_called = 0;
+ }
+
+ INIT_LIST_HEAD(&fcb->fc_lockres_item);
+
again:
if (catch_signals && signal_pending(current)) {
ret = -EINTR;
- goto bail;
+ goto out;
}
spin_lock(&lockres->l_lock);
@@ -873,10 +957,9 @@
level > lockres->l_level) {
/* is someone sitting in dlm_lock? If so, wait on
* them. */
- spin_unlock(&lockres->l_lock);
-
- ocfs2_wait_on_busy_lock(lockres);
- goto again;
+ lockres_add_flag_callback(lockres, fcb, OCFS2_LOCK_BUSY, 0);
+ ret = -EIOCBQUEUED;
+ goto unlock;
}
if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
@@ -885,7 +968,7 @@
ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
if (ret < 0) {
LOG_ERROR_STATUS(ret);
- goto bail;
+ goto out;
}
goto again;
}
@@ -894,10 +977,9 @@
!ocfs2_may_continue_on_blocked_lock(lockres, level)) {
/* is the lock is currently blocked on behalf of
* another node */
- spin_unlock(&lockres->l_lock);
-
- ocfs2_wait_on_blocked_lock(lockres);
- goto again;
+ lockres_add_flag_callback(lockres, fcb, OCFS2_LOCK_BLOCKED, 0);
+ ret = -EIOCBQUEUED;
+ goto unlock;
}
if (level > lockres->l_level) {
@@ -930,27 +1012,37 @@
ret = -ENOENT;
}
ocfs2_recover_from_dlm_error(lockres, 1);
- goto bail;
+ goto out;
}
dprintk("lock %s, successfull return from dlmlock\n",
lockres->l_name);
- ocfs2_wait_on_busy_lock(lockres);
-
/* At this point we've gone inside the dlm and need to
* complete our work regardless. */
catch_signals = 0;
+
+ /* wait for busy to clear and carry on */
goto again;
}
/* Ok, if we get here then we're good to go. */
ocfs2_inc_holders(lockres, level);
+ ret = 0;
+unlock:
spin_unlock(&lockres->l_lock);
+out:
+ if (ret == -EIOCBQUEUED && fcb->fc_cb == ocfs2_status_completion_cb) {
+ wait_for_completion(&sc.sc_complete);
+ ret = sc.sc_status;
+ if (ret == 0)
+ goto again;
+ }
- ret = 0;
-bail:
+ if (ret && fcb != NULL && fcb != &_fcb)
+ kfree(fcb);
+
LOG_EXIT_STATUS(ret);
return ret;
}
@@ -994,10 +1086,10 @@
lockres = &OCFS_I(inode)->ip_meta_lockres;
OCFS_ASSERT(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
- lockres->l_flags |= OCFS2_LOCK_LOCAL;
+ lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
status = ocfs2_lock_create(osb, lockres, LKM_EXMODE, LKM_LOCAL);
- lockres->l_flags &= ~OCFS2_LOCK_LOCAL;
+ lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
if (status < 0) {
LOG_ERROR_STATUS(status);
goto bail;
@@ -1005,10 +1097,10 @@
lockres = &OCFS_I(inode)->ip_data_lockres;
OCFS_ASSERT(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
- lockres->l_flags |= OCFS2_LOCK_LOCAL;
+ lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
status = ocfs2_lock_create(osb, lockres, LKM_EXMODE, LKM_LOCAL);
- lockres->l_flags &= ~OCFS2_LOCK_LOCAL;
+ lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
if (status < 0) {
LOG_ERROR_STATUS(status);
goto bail;
@@ -1037,7 +1129,8 @@
level = write ? LKM_EXMODE : LKM_PRMODE;
- status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0);
+ status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
+ NULL, 0);
if (status < 0 && status != -EINTR)
LOG_ERROR_STATUS(status);
@@ -1231,7 +1324,7 @@
}
/* Ok, I'll be the one to refresh this lock. */
- lockres->l_flags |= OCFS2_LOCK_REFRESHING;
+ lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
spin_unlock(&lockres->l_lock);
status = 1;
@@ -1248,9 +1341,9 @@
LOG_ENTRY();
spin_lock(&lockres->l_lock);
- lockres->l_flags &= ~OCFS2_LOCK_REFRESHING;
+ lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
if (!status)
- lockres->l_flags &= ~OCFS2_LOCK_NEEDS_REFRESH;
+ lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
spin_unlock(&lockres->l_lock);
wake_up(&lockres->l_event);
@@ -1320,22 +1413,24 @@
return status;
}
-int ocfs2_meta_lock_flags(struct inode *inode,
- ocfs_journal_handle *handle,
- struct buffer_head **ret_bh,
- int ex,
- int flags)
+/*
+ * returns < 0 error if the callback will never be called, otherwise
+ * the result of the lock will be communicated via the callback.
+ */
+int ocfs2_meta_lock_flags_async(struct inode *inode,
+ struct buffer_head **ret_bh,
+ int ex,
+ int flags,
+ ocfs2_lock_callback cb,
+ unsigned long cb_data)
{
int status, level, dlm_flags;
ocfs2_lock_res *lockres;
ocfs_super *osb = OCFS2_SB(inode->i_sb);
- struct buffer_head *bh = NULL;
+ struct buffer_head *local_bh = NULL;
OCFS_ASSERT(inode);
- if (handle && !ex)
- BUG();
-
LOG_ENTRY();
dprintk("inode %llu, take %s META lock\n", OCFS_I(inode)->ip_blkno,
@@ -1355,7 +1450,8 @@
if (flags & OCFS2_META_LOCK_NOQUEUE)
dlm_flags |= LKM_NOQUEUE;
- status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags);
+ status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, cb,
+ cb_data);
if (status < 0) {
if (status != -EINTR && status != -EAGAIN)
LOG_ERROR_STATUS(status);
@@ -1371,15 +1467,46 @@
ocfs_node_map_is_empty(osb,
&osb->recovery_map));
- status = ocfs2_meta_lock_update(inode, &bh);
+ /* it's pretty weak to do this possibly sync read here, but until
+ * we have a real async version of it it's as good a place as any */
+ if (ret_bh == NULL)
+ ret_bh = &local_bh;
+ status = ocfs2_meta_lock_update(inode, ret_bh);
if (status < 0) {
LOG_ERROR_STATUS(status);
goto bail;
}
+bail:
+ if (local_bh)
+ brelse(local_bh);
+ LOG_EXIT_STATUS(status);
+ return status;
+}
+
+/* grabs the meta lock synchronusly. */
+int ocfs2_meta_lock_flags(struct inode *inode,
+ ocfs_journal_handle *handle,
+ struct buffer_head **ret_bh,
+ int ex,
+ int flags)
+{
+ struct buffer_head *bh = NULL;
+ int status;
+
+ LOG_ENTRY();
+
+ BUG_ON(handle && !ex);
+
+ status = ocfs2_meta_lock_flags_async(inode, ret_bh, ex, flags,
+ NULL, 0);
+ if (status)
+ goto bail;
+
if (ret_bh && !bh) {
/* caller wants a buffer head but we haven't read it yet. */
- status = ocfs_read_block(osb, OCFS_I(inode)->ip_blkno, &bh,
+ status = ocfs_read_block(OCFS2_SB(inode->i_sb),
+ OCFS_I(inode)->ip_blkno, &bh,
OCFS_BH_CACHED, inode);
if (status < 0) {
LOG_ERROR_STATUS(status);
@@ -1429,7 +1556,7 @@
LOG_ENTRY();
- status = ocfs2_cluster_lock(osb, lockres, level, 0);
+ status = ocfs2_cluster_lock(osb, lockres, level, 0, NULL, 0);
if (status < 0) {
if (status != -EINTR)
LOG_ERROR_STATUS(status);
@@ -1561,7 +1688,7 @@
BUG();
}
lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
- lockres->l_flags &= ~OCFS2_LOCK_BUSY;
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
spin_unlock(&lockres->l_lock);
wake_up(&lockres->l_event);
@@ -1588,14 +1715,14 @@
goto bail;
}
- lockres->l_flags &= ~OCFS2_LOCK_ATTACHED;
+ lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
/* make sure we never get here while waiting for an ast to
* fire. */
OCFS_ASSERT(lockres->l_action == OCFS2_AST_INVALID);
/* is this necessary? */
- lockres->l_flags |= OCFS2_LOCK_BUSY;
+ lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
spin_unlock(&lockres->l_lock);
@@ -1698,7 +1825,7 @@
lockres->l_action = OCFS2_AST_DOWNCONVERT;
lockres->l_requested = new_level;
- lockres->l_flags |= OCFS2_LOCK_BUSY;
+ lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
spin_unlock(&lockres->l_lock);
if (lvb)
Modified: trunk/fs/ocfs2/dlmglue.h
===================================================================
--- trunk/fs/ocfs2/dlmglue.h 2005-03-02 18:17:23 UTC (rev 1932)
+++ trunk/fs/ocfs2/dlmglue.h 2005-03-02 20:01:29 UTC (rev 1933)
@@ -103,6 +103,12 @@
struct buffer_head **ret_bh,
int ex,
int flags);
+int ocfs2_meta_lock_flags_async(struct inode *inode,
+ struct buffer_head **ret_bh,
+ int ex,
+ int flags,
+ ocfs2_lock_callback cb,
+ unsigned long cb_data);
void ocfs2_meta_unlock(struct inode *inode,
int ex);
int ocfs2_super_lock(ocfs_super *osb,
Modified: trunk/fs/ocfs2/file.c
===================================================================
--- trunk/fs/ocfs2/file.c 2005-03-02 18:17:23 UTC (rev 1932)
+++ trunk/fs/ocfs2/file.c 2005-03-02 20:01:29 UTC (rev 1933)
@@ -38,6 +38,7 @@
#include "ocfs.h"
#include "ocfs2.h"
+#include "aio.h"
#include "alloc.h"
#include "dir.h"
#include "dlmglue.h"
@@ -66,11 +67,6 @@
ocfs2_dinode *fe,
u64 new_size);
-static int ocfs_extend_file(ocfs_super *osb,
- struct inode *inode,
- u64 new_i_size,
- u64 *bytes_extended);
-
int ocfs_sync_inode(struct inode *inode)
{
filemap_fdatawrite(inode->i_mapping);
@@ -189,6 +185,29 @@
sb->s_blocksize_bits;
}
+void ocfs2_file_finish_extension(struct inode *inode, loff_t newsize,
+ unsigned should_zero)
+{
+ LOG_TRACE_STR("Generic_file_write ok, asking for OIN update now");
+ ocfs2_update_inode_size(inode, newsize);
+
+ if (!should_zero) {
+ /*
+ * This leaves dirty data in holes.
+ * Caveat Emptor.
+ */
+ OCFS_I(inode)->ip_mmu_private = newsize;
+ } else {
+ int status = ocfs2_zero_extend(inode);
+ /*
+ * Don't overwrite the result of
+ * generic_file_write
+ */
+ if (status)
+ LOG_ERROR_ARGS("Unable to pre-zero extension of inode (%d)", status);
+ }
+}
+
/*
* ocfs_file_write()
* Linux 2.6 TODO: Remove all O_DIRECT conditionals here, they are no longer
@@ -198,17 +217,11 @@
size_t count, loff_t *ppos)
{
int ret = 0;
- int extended = 0;
ocfs_super *osb = NULL;
struct dentry *dentry = filp->f_dentry;
struct inode *inode = dentry->d_inode;
- int status;
- u64 newsize, bytes_added = 0;
- int do_direct_io = 0;
int sector_size;
- int have_i_sem = 0;
- int level = filp->f_flags & O_APPEND;
- loff_t saved_ppos;
+ struct ocfs2_write_lock_info info = {0, };
DECLARE_BUFFER_LOCK_CTXT(ctxt);
LOG_SET_CONTEXT(WRITE);
@@ -233,159 +246,11 @@
osb = OCFS_SB(inode->i_sb);
sector_size = 1 << osb->s_sectsize_bits;
- down(&inode->i_sem);
- have_i_sem = 1;
-
- ret = ocfs2_setup_io_locks(inode->i_sb, inode, (char *) buf,
- count, &ctxt);
- if (ret < 0) {
- LOG_ERROR_STATUS(ret);
+ ret = ocfs_write_lock_maybe_extend(filp, buf, count, ppos, &info,
+ &ctxt);
+ if (ret)
goto bail;
- }
- /* This will lock everyone in the context who's order puts
- * them before us. */
- ret = ocfs2_lock_buffer_inodes(&ctxt, inode);
- if (ret < 0) {
- if (ret != -EINTR)
- LOG_ERROR_STATUS(ret);
- goto bail;
- }
-
- ctxt.b_lock_data_write = 1;
-lock:
- status = ocfs2_meta_lock(inode, NULL, NULL, level);
- if (status < 0) {
- if (status != -EINTR)
- LOG_ERROR_STATUS(status);
- ret = status;
- goto bail;
- }
- /* to handle extending writes, we do a bit of our own locking
- * here, but we setup the ctxt do unlock for us (as well as
- * handle locking everything else. */
- if (level)
- ctxt.b_lock_meta_write = 1;
-
- /* work on a copy of ppos until we're sure that we won't have
- * to recalculate it due to relocking. */
- saved_ppos = *ppos;
-
- if (filp->f_flags & O_APPEND) {
- saved_ppos = i_size_read(inode);
- LOG_TRACE_ARGS("O_APPEND: inode->i_size=%llu\n", saved_ppos);
-
- /* ugh, work around some applications which open
- * everything O_DIRECT + O_APPEND and really don't
- * mean to use O_DIRECT. */
-#warning this is wrong wrong wrong
- filp->f_flags &= ~O_DIRECT;
- }
-
- if (filp->f_flags & O_DIRECT) {
- /* anything special for o_direct? */
- LOG_TRACE_STR ("O_DIRECT");
- if ((saved_ppos & (sector_size - 1)) ||
- (count & (sector_size - 1)) ||
- ((unsigned long)buf & (sector_size - 1))) {
- do_direct_io = 0;
- filp->f_flags |= O_SYNC;
- } else {
- do_direct_io = 1;
- }
- }
- ctxt.b_lock_direct = do_direct_io;
-
- newsize = count + saved_ppos;
- if (filp->f_flags & O_APPEND)
- newsize = count + i_size_read(inode);
-
- LOG_TRACE_ARGS ("ppos=%llu newsize=%llu cursize=%llu\n",
- saved_ppos, newsize, i_size_read(inode));
-
- if (newsize > i_size_read(inode)) {
- if (!level) {
- /* we want an extend, but need a higher
- * level cluster lock. */
- LOG_TRACE_ARGS("inode %llu, had a PR, looping back "
- "for EX\n", OCFS_I(inode)->ip_blkno);
- ocfs2_meta_unlock(inode, level);
- level = 1;
- goto lock;
- }
- extended = 1;
-
- LOG_TRACE_ARGS("Writing at EOF, will need more allocation: "
- "i_size=%llu, need=%llu\n",
- i_size_read(inode), newsize);
-
- /* If we extend AT ALL here then we update our state
- * and continue the write call, regardless of error --
- * this is basically a short write. */
- status = ocfs_extend_file(osb, inode, newsize, &bytes_added);
- if (status < 0 && (!bytes_added)) {
- if (status != -EINTR && status != -ENOSPC) {
- LOG_ERROR_STATUS (status);
- LOG_ERROR_ARGS("Failed to extend inode %llu "
- "from %llu to %llu",
- OCFS_I(inode)->ip_blkno,
- *ppos, newsize);
- }
- ret = status;
- goto bail_unlock;
- }
-
- /* We need to recalulate newsize and count according
- * to what extend could give us. If we got the whole
- * extend then this doesn't wind up changing the
- * values. */
- newsize = i_size_read(inode) + bytes_added;
- count = newsize - saved_ppos;
-
- if (status < 0 && status != -ENOSPC && status != -EINTR)
- LOG_ERROR_ARGS("status return of %d extending inode "
- "%llu\n", status,
- OCFS_I(inode)->ip_blkno);
- status = 0;
- }
-
- /* we've got whatever cluster lock is appropriate now, so we
- * can stuff *ppos back. */
- *ppos = saved_ppos;
-
- if (!do_direct_io) {
- status = ocfs2_data_lock(inode, 1);
- if (status < 0) {
- if (status != -EINTR)
- LOG_ERROR_STATUS(status);
- ret = status;
-
- /* The write context stuff needs to be updated
- * to understand that we haven't locked for
- * data so that we can just jump down to the
- * generic zeroing code here instead. */
- if (extended) {
- ocfs2_update_inode_size(inode, newsize);
- OCFS_I(inode)->ip_mmu_private = newsize;
- }
- ocfs2_meta_unlock(inode, level);
- goto bail_unlock;
- }
- }
-
- /* Alright, fool the io locking stuff into thinking it's
- * handled our inode for us. We can now count on it to do the
- * unlock for us. */
- ctxt.b_target->ba_locked = 1;
-
- /* This will lock everyone who's order puts them *after* our inode. */
- ret = ocfs2_lock_buffer_inodes(&ctxt, NULL);
- if (ret < 0) {
- if (ret != -EINTR)
- LOG_ERROR_STATUS(ret);
- goto bail_zero;
- }
-
down_read(&OCFS_I(inode)->ip_alloc_sem);
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
/*
@@ -399,7 +264,7 @@
/* ick. seems to be our only way of toggling
* directio for 2.6 */
unsigned int savedflags = filp->f_flags;
- if (do_direct_io)
+ if (info.wl_do_direct_io)
filp->f_flags |= O_DIRECT;
else
filp->f_flags &= ~O_DIRECT;
@@ -407,47 +272,28 @@
filp->f_flags = savedflags;
}
#else
- if (do_direct_io)
+ if (info.wl_do_direct_io)
ret = ocfs_rw_direct (WRITE, filp, (char *) buf, count, ppos);
else
ret = generic_file_write_nolock (filp, buf, count, ppos);
#endif
up_read(&OCFS_I(inode)->ip_alloc_sem);
-bail_zero:
- if (extended) {
- LOG_TRACE_STR
- ("Generic_file_write ok, asking for OIN update now");
- ocfs2_update_inode_size(inode, newsize);
-
- if (do_direct_io) {
- /*
- * This leaves dirty data in holes.
- * Caveat Emptor.
- */
- OCFS_I(inode)->ip_mmu_private = i_size_read(inode);
- } else {
- status = ocfs2_zero_extend(inode);
- /*
- * Don't overwrite the result of
- * generic_file_write
- */
- if (status)
- LOG_ERROR_ARGS("Unable to pre-zero extension of inode (%d)", status);
- }
- }
-
-bail_unlock:
- ocfs2_unlock_buffer_inodes(&ctxt);
-
bail:
- if (have_i_sem)
+ /* we might have to finish up extentions that were performed before
+ * an error was returned by, say, data locking */
+ if (info.wl_extended)
+ ocfs2_file_finish_extension(inode, info.wl_newsize,
+ !info.wl_do_direct_io);
+ if (info.wl_unlock_ctxt)
+ ocfs2_unlock_buffer_inodes(&ctxt);
+ if (info.wl_have_i_sem)
up(&inode->i_sem);
LOG_EXIT_INT (ret);
LOG_CLEAR_CONTEXT();
return ret;
-} /* ocfs_file_write */
+}
/*
* ocfs_file_read()
@@ -461,6 +307,7 @@
ocfs_super *osb = NULL;
struct dentry *dentry = filp->f_dentry;
struct inode *inode = dentry->d_inode;
+ ocfs2_backing_inode *target_binode;
int status = 0;
int do_direct_io = 0;
int sector_size;
@@ -491,14 +338,16 @@
do_direct_io = 1;
}
}
- ctxt.b_lock_direct = do_direct_io;
- ret = ocfs2_setup_io_locks(inode->i_sb, inode, buf, count, &ctxt);
+ ret = ocfs2_setup_io_locks(inode->i_sb, inode, buf, count, &ctxt,
+ &target_binode);
if (ret < 0) {
LOG_ERROR_STATUS(status);
goto bail;
}
+ target_binode->ba_lock_data = do_direct_io ? 0 : 1;
+
ret = ocfs2_lock_buffer_inodes(&ctxt, NULL);
if (ret < 0) {
if (ret != -EINTR)
@@ -553,8 +402,8 @@
.open = ocfs_file_open,
.ioctl = ocfs_ioctl,
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
- .aio_read = generic_file_aio_read,
- .aio_write = generic_file_aio_write,
+ .aio_read = ocfs2_file_aio_read,
+ .aio_write = ocfs2_file_aio_write,
#else
.aio_read = ocfs_aio_read,
.aio_write = ocfs_aio_write,
@@ -909,10 +758,10 @@
* dinode->i_size, NOT how much allocated was actually added to the
* file. It will always be correct, even when we return an error.
*/
-static int ocfs_extend_file(ocfs_super *osb,
- struct inode *inode,
- u64 new_i_size,
- u64 *bytes_extended)
+int ocfs_extend_file(ocfs_super *osb,
+ struct inode *inode,
+ u64 new_i_size,
+ u64 *bytes_extended)
{
int status = 0;
int restart_func = 0;
Modified: trunk/fs/ocfs2/file.h
===================================================================
--- trunk/fs/ocfs2/file.h 2005-03-02 18:17:23 UTC (rev 1932)
+++ trunk/fs/ocfs2/file.h 2005-03-02 20:01:29 UTC (rev 1933)
@@ -45,6 +45,10 @@
enum ocfs2_alloc_restarted *reason);
int ocfs_setattr(struct dentry *dentry, struct iattr *attr);
int ocfs_sync_inode(struct inode *inode);
+int ocfs_extend_file(ocfs_super *osb,
+ struct inode *inode,
+ u64 new_i_size,
+ u64 *bytes_extended);
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
int ocfs_getattr(struct vfsmount *mnt, struct dentry *dentry,
@@ -56,4 +60,7 @@
struct buffer_head *fe_bh,
u64 new_i_size);
+void ocfs2_file_finish_extension(struct inode *inode, loff_t newsize,
+ unsigned should_zero);
+
#endif /* OCFS2_FILE_H */
Modified: trunk/fs/ocfs2/mmap.c
===================================================================
--- trunk/fs/ocfs2/mmap.c 2005-03-02 18:17:23 UTC (rev 1932)
+++ trunk/fs/ocfs2/mmap.c 2005-03-02 20:01:29 UTC (rev 1933)
@@ -49,7 +49,8 @@
static inline struct rb_node * __ocfs2_buffer_lock_ctxt_root(
ocfs2_buffer_lock_ctxt *ctxt);
static int ocfs2_buffer_lock_ctxt_insert(ocfs2_buffer_lock_ctxt *ctxt,
- struct inode *inode);
+ struct inode *inode,
+ ocfs2_backing_inode **binode_ret);
static int ocfs2_fill_ctxt_from_buf(struct super_block *sb,
struct inode *target_inode,
char *buf,
@@ -163,7 +164,8 @@
}
static int ocfs2_buffer_lock_ctxt_insert(ocfs2_buffer_lock_ctxt *ctxt,
- struct inode *inode)
+ struct inode *inode,
+ ocfs2_backing_inode **binode_ret)
{
u64 blkno;
ocfs2_backing_inode *tmp, *binode;
@@ -194,14 +196,15 @@
return 0; /* Don't insert duplicates */
}
- binode = kmalloc(sizeof(ocfs2_backing_inode), GFP_KERNEL);
+ binode = kcalloc(1, sizeof(ocfs2_backing_inode), GFP_KERNEL);
if (!binode)
return -ENOMEM;
- memset(binode, 0, sizeof(ocfs2_backing_inode));
binode->ba_inode = inode;
- binode->ba_locked = 0;
ocfs2_init_io_marker(&binode->ba_task);
+ if (binode_ret)
+ *binode_ret = binode;
+
rb_link_node(&binode->ba_node, parent, p);
rb_insert_color(&binode->ba_node, &ctxt->b_inodes);
@@ -231,7 +234,8 @@
if (inode->i_sb == sb &&
inode != target_inode) {
status = ocfs2_buffer_lock_ctxt_insert(ctxt,
- inode);
+ inode,
+ NULL);
if (status < 0)
goto bail;
}
@@ -246,12 +250,12 @@
struct inode *target_inode,
char *buf,
size_t size,
- ocfs2_buffer_lock_ctxt *ctxt)
+ ocfs2_buffer_lock_ctxt *ctxt,
+ ocfs2_backing_inode **target_binode)
{
int skip_sem = current->flags & PF_DUMPCORE;
int status;
struct mm_struct *mm = current->mm;
- struct rb_node *first;
OCFS_ASSERT(mm);
@@ -260,15 +264,12 @@
OCFS_ASSERT(!__ocfs2_buffer_lock_ctxt_root(ctxt));
- /* We always insert target because it might not be backing
- part of the buffer - but it needs to be in there so that
- it's lock gets ordered with everything else */
- status = ocfs2_buffer_lock_ctxt_insert(ctxt, target_inode);
+ /* We always insert target because it might not be backing part of the
+ * buffer - but it needs to be in there so that it's lock gets ordered
+ * with everything else */
+ status = ocfs2_buffer_lock_ctxt_insert(ctxt, target_inode,
+ target_binode);
if (!status) {
- /* The assert above guarantees that this will work. */
- ctxt->b_target = rb_entry(__ocfs2_buffer_lock_ctxt_root(ctxt),
- ocfs2_backing_inode, ba_node);
-
/* Now fill the tree with any inodes that back this
* buffer. If target inode is in there, it will be
* skipped over. */
@@ -285,14 +286,46 @@
goto bail;
}
- first = rb_first(&ctxt->b_inodes);
- ctxt->b_head = rb_entry(first, ocfs2_backing_inode, ba_node);
-
status = 0;
bail:
return status;
}
+/* starting from pos, which can be null for the first call, give the
+ * next buffer that needs unlocking. we return null when there are none
+ * left or we see last_inode */
+static ocfs2_backing_inode *ocfs2_next_unlocked(ocfs2_buffer_lock_ctxt *ctxt,
+ struct inode *last_inode,
+ ocfs2_backing_inode *pos)
+{
+ ocfs2_backing_inode *binode = NULL;
+ struct rb_node *node = NULL;
+
+ if (pos == NULL) {
+ if (ctxt->b_next_unlocked)
+ binode = ctxt->b_next_unlocked;
+ else
+ node = rb_first(&ctxt->b_inodes);
+ } else
+ node = rb_next(&pos->ba_node);
+
+ if (node)
+ binode = rb_entry(node, ocfs2_backing_inode, ba_node);
+
+ if (binode && last_inode && binode->ba_inode == last_inode)
+ binode = NULL;
+
+ /* this is just an optimization to skip nodes in the tree
+ * that we've already seen. If we're moving from one we've locked
+ * to one we haven't then we mark this node in the ctxt so that
+ * we'll return to it in a future after, say, hitting last_inode
+ * or EIOCBQUEUED in lock_buffer_inodes */
+ if (pos && pos->ba_locked && binode)
+ ctxt->b_next_unlocked = binode;
+
+ return binode;
+}
+
/* Will take locks on all inodes in the ctxt up until 'last_inode'. If
* last_inode is NULL, then we take locks on everything. We mark lock
* status on the context so we skip any that have already been
@@ -303,43 +336,40 @@
int ocfs2_lock_buffer_inodes(ocfs2_buffer_lock_ctxt *ctxt,
struct inode *last_inode)
{
- int status, meta_level, data_level;
- ocfs2_backing_inode *binode;
+ int status, data_level;
+ ocfs2_backing_inode *binode = NULL;
struct inode *inode;
- struct rb_node *node;
- binode = ctxt->b_head;
-
- while(binode) {
+ while((binode = ocfs2_next_unlocked(ctxt, last_inode, binode))) {
+ /* the tricksy caller might have locked inodes themselves
+ * between calls. */
+ if (binode->ba_locked)
+ continue;
inode = binode->ba_inode;
- if (inode == last_inode)
- break;
- if (binode->ba_locked)
- goto skip_locking;
-
- meta_level = 0;
- if (ocfs2_buffer_lock_is_target(ctxt, inode))
- meta_level = ctxt->b_lock_meta_write;
-
- status = ocfs2_meta_lock(inode, NULL, NULL, meta_level);
- if (status < 0) {
- if (status != -EINTR)
- LOG_ERROR_STATUS(status);
- goto bail;
+ if (!binode->ba_meta_locked) {
+ status = ocfs2_meta_lock_flags_async(inode, NULL,
+ binode->ba_lock_meta_level,
+ 0,
+ ctxt->b_cb,
+ ctxt->b_cb_data);
+ if (status < 0) {
+ if (status != -EINTR)
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+ binode->ba_meta_locked = 1;
}
- /* If we're doing direct IO, then skip data locking on
- * the target. */
- if (!ocfs2_buffer_lock_is_target(ctxt, inode) ||
- !ctxt->b_lock_direct) {
- data_level = 0;
- if (ocfs2_buffer_lock_is_target(ctxt, inode))
- data_level = ctxt->b_lock_data_write;
-
+ /* ba_lock_data isn't set for direct io */
+ if (binode->ba_lock_data) {
+ data_level = binode->ba_lock_data_level;
status = ocfs2_data_lock(inode, data_level);
if (status < 0) {
- ocfs2_meta_unlock(inode, meta_level);
+ if (status == -EIOCBQUEUED)
+ goto bail;
+ ocfs2_meta_unlock(inode,
+ binode->ba_lock_meta_level);
if (status != -EINTR)
LOG_ERROR_STATUS(status);
@@ -347,17 +377,9 @@
}
}
ocfs2_add_io_marker(inode, &binode->ba_task);
-
binode->ba_locked = 1;
-skip_locking:
- node = rb_next(&binode->ba_node);
- binode = NULL;
- if (node)
- binode = rb_entry(node, ocfs2_backing_inode, ba_node);
}
- ctxt->b_head = binode;
-
status = 0;
bail:
return status;
@@ -365,44 +387,233 @@
void ocfs2_unlock_buffer_inodes(ocfs2_buffer_lock_ctxt *ctxt)
{
- int level;
ocfs2_backing_inode *binode;
- struct inode *inode;
- struct rb_node *node, *tmp;
+ struct rb_node *node;
- node = rb_first(&ctxt->b_inodes);
- while(node) {
+ /* dlm locks don't mask ints.. this should be lower down */
+ BUG_ON(in_interrupt());
+
+ /* unlock in reverse order to minimize waking forward lockers */
+ while ((node = rb_last(&ctxt->b_inodes)) != NULL) {
binode = rb_entry(node, ocfs2_backing_inode, ba_node);
- if (!binode->ba_locked)
- goto skip_unlock;
- inode = binode->ba_inode;
- ocfs2_del_io_marker(inode, &binode->ba_task);
+ ocfs2_del_io_marker(binode->ba_inode, &binode->ba_task);
- if (!ocfs2_buffer_lock_is_target(ctxt, inode) ||
- !ctxt->b_lock_direct) {
- level = 0;
- if (ocfs2_buffer_lock_is_target(ctxt, inode))
- level = ctxt->b_lock_data_write;
+ if (binode->ba_locked && binode->ba_lock_data)
+ ocfs2_data_unlock(binode->ba_inode,
+ binode->ba_lock_data_level);
- ocfs2_data_unlock(inode, level);
+ if (binode->ba_locked || binode->ba_meta_locked)
+ ocfs2_meta_unlock(binode->ba_inode,
+ binode->ba_lock_meta_level);
+
+ rb_erase(node, &ctxt->b_inodes);
+ kfree(binode);
+ }
+
+ ctxt->b_next_unlocked = NULL;
+}
+
+/*
+ * This builds up the locking state that will be used by a write. both normal
+ * file writes and AIO writes come in through here. This function does no
+ * teardown on its own. The caller must examine the info struct to see if it
+ * needs to release locks or i_sem, etc. This function is also restartable in
+ * that it can return EIOCBQUEUED if it would have blocked in the dlm. It
+ * stores its partial progress in the info struct so the caller can call back
+ * in when it thinks the dlm won't block any more. Thus, the caller must zero
+ * the info struct before calling in the first time.
+ */
+ssize_t ocfs_write_lock_maybe_extend(struct file *filp, const char *buf,
+ size_t count, loff_t *ppos,
+ struct ocfs2_write_lock_info *info,
+ ocfs2_buffer_lock_ctxt *ctxt)
+{
+ int ret = 0;
+ ocfs_super *osb = NULL;
+ struct dentry *dentry = filp->f_dentry;
+ struct inode *inode = dentry->d_inode;
+ int status;
+ int sector_size;
+ int level = filp->f_flags & O_APPEND;
+ loff_t saved_ppos;
+ u64 bytes_added = 0;
+
+ osb = OCFS_SB(inode->i_sb);
+ sector_size = 1 << osb->s_sectsize_bits;
+
+ /* the target inode is different from the other inodes. in o_direct it
+ * doesn't get a data lock and when appending it gets a level 1 meta
+ * lock. we use target_binode to set its flags accordingly */
+ if (info->wl_target_binode == NULL) {
+ ret = ocfs2_setup_io_locks(inode->i_sb, inode, (char *) buf,
+ count, ctxt,
+ &info->wl_target_binode);
+ if (ret < 0) {
+ BUG_ON(ret == -EIOCBQUEUED);
+ LOG_ERROR_STATUS(ret);
+ goto bail;
}
+ }
- level = 0;
- if (ocfs2_buffer_lock_is_target(ctxt, inode))
- level = ctxt->b_lock_meta_write;
+ /* This will lock everyone in the context who's order puts
+ * them before us. */
+ if (!info->wl_have_before) {
+ info->wl_unlock_ctxt = 1;
+ ret = ocfs2_lock_buffer_inodes(ctxt, inode);
+ if (ret < 0) {
+ if (ret != -EINTR)
+ LOG_ERROR_STATUS(ret);
+ goto bail;
+ }
+ info->wl_have_before = 1;
+ /* we're writing so get an ex data cluster lock */
+ info->wl_target_binode->ba_lock_data_level = 1;
+ }
- ocfs2_meta_unlock(inode, level);
+ if (!info->wl_have_i_sem) {
+ down(&inode->i_sem);
+ info->wl_have_i_sem = 1;
+ }
-skip_unlock:
- tmp = node;
- node = rb_next(node);
+lock:
+ if (!info->wl_have_target_meta) {
+ status = ocfs2_meta_lock(inode, NULL, NULL, level);
+ if (status < 0) {
+ if (status != -EINTR)
+ LOG_ERROR_STATUS(status);
+ ret = status;
+ goto bail;
+ }
+ info->wl_have_target_meta = 1;
+ }
+ /* to handle extending writes, we do a bit of our own locking
+ * here, but we setup the ctxt do unlock for us (as well as
+ * handle locking everything else. */
+ if (level)
+ info->wl_target_binode->ba_lock_meta_level = 1;
- rb_erase(tmp, &ctxt->b_inodes);
- kfree(binode);
+ /* work on a copy of ppos until we're sure that we won't have
+ * to recalculate it due to relocking. */
+ saved_ppos = *ppos;
+
+ if (filp->f_flags & O_APPEND) {
+ saved_ppos = i_size_read(inode);
+ LOG_TRACE_ARGS("O_APPEND: inode->i_size=%llu\n", saved_ppos);
+
+ /* ugh, work around some applications which open
+ * everything O_DIRECT + O_APPEND and really don't
+ * mean to use O_DIRECT. */
+#warning this is wrong wrong wrong
+ filp->f_flags &= ~O_DIRECT;
}
- ctxt->b_target = ctxt->b_head = NULL;
+ if (filp->f_flags & O_DIRECT) {
+ /* anything special for o_direct? */
+ LOG_TRACE_STR ("O_DIRECT");
+ if ((saved_ppos & (sector_size - 1)) ||
+ (count & (sector_size - 1)) ||
+ ((unsigned long)buf & (sector_size - 1))) {
+ info->wl_do_direct_io = 0;
+ filp->f_flags |= O_SYNC;
+ } else {
+ info->wl_do_direct_io = 1;
+ }
+ }
+ info->wl_target_binode->ba_lock_data = info->wl_do_direct_io ? 0 : 1;
+
+ info->wl_newsize = count + saved_ppos;
+ if (filp->f_flags & O_APPEND)
+ info->wl_newsize = count + i_size_read(inode);
+
+ LOG_TRACE_ARGS ("ppos=%llu newsize=%llu cursize=%llu\n",
+ saved_ppos, info->wl_newsize, i_size_read(inode));
+
+ if (info->wl_newsize > i_size_read(inode)) {
+ if (!level) {
+ /* we want an extend, but need a higher
+ * level cluster lock. */
+ LOG_TRACE_ARGS("inode %llu, had a PR, looping back "
+ "for EX\n", OCFS_I(inode)->ip_blkno);
+ ocfs2_meta_unlock(inode, level);
+ info->wl_have_target_meta = 0;
+ level = 1;
+ goto lock;
+ }
+ info->wl_extended = 1;
+
+ LOG_TRACE_ARGS("Writing at EOF, will need more allocation: "
+ "i_size=%llu, need=%llu\n",
+ i_size_read(inode), info->wl_newsize);
+
+ /* If we extend AT ALL here then we update our state
+ * and continue the write call, regardless of error --
+ * this is basically a short write. */
+ status = ocfs_extend_file(osb, inode, info->wl_newsize,
+ &bytes_added);
+ if (status < 0 && (!bytes_added)) {
+ if (status != -EINTR && status != -ENOSPC) {
+ LOG_ERROR_STATUS (status);
+ LOG_ERROR_ARGS("Failed to extend inode %llu "
+ "from %llu to %llu",
+ OCFS_I(inode)->ip_blkno,
+ *ppos, info->wl_newsize);
+ }
+ ret = status;
+
+ info->wl_have_target_meta = 0;
+ ocfs2_meta_unlock(inode, level);
+ goto bail;
+ }
+
+ /* We need to recalulate newsize and count according
+ * to what extend could give us. If we got the whole
+ * extend then this doesn't wind up changing the
+ * values. */
+ info->wl_newsize = i_size_read(inode) + bytes_added;
+ count = info->wl_newsize - saved_ppos;
+
+ if (status < 0 && status != -ENOSPC && status != -EINTR)
+ LOG_ERROR_ARGS("status return of %d extending inode "
+ "%llu\n", status,
+ OCFS_I(inode)->ip_blkno);
+ status = 0;
+ }
+
+ /* we've got whatever cluster lock is appropriate now, so we
+ * can stuff *ppos back. */
+ *ppos = saved_ppos;
+
+ if (!info->wl_do_direct_io && !info->wl_have_data_lock) {
+ status = ocfs2_data_lock(inode, 1);
+ if (status < 0) {
+ if (status != -EINTR)
+ LOG_ERROR_STATUS(status);
+ ret = status;
+
+ info->wl_have_target_meta = 0;
+ ocfs2_meta_unlock(inode, level);
+ goto bail;
+ }
+ info->wl_have_data_lock = 1;
+ }
+
+ /* Alright, fool the io locking stuff into thinking it's
+ * handled our inode for us. We can now count on it to do the
+ * unlock for us. */
+ info->wl_target_binode->ba_locked = 1;
+
+ /* This will lock everyone who's order puts them *after* our inode. */
+ ret = ocfs2_lock_buffer_inodes(ctxt, NULL);
+ if (ret < 0) {
+ if (ret != -EINTR)
+ LOG_ERROR_STATUS(ret);
+ goto bail;
+ }
+
+bail:
+ LOG_EXIT_INT(ret);
+ return ret;
}
#if 0
Modified: trunk/fs/ocfs2/mmap.h
===================================================================
--- trunk/fs/ocfs2/mmap.h 2005-03-02 18:17:23 UTC (rev 1932)
+++ trunk/fs/ocfs2/mmap.h 2005-03-02 20:01:29 UTC (rev 1933)
@@ -42,7 +42,8 @@
ocfs_inode_private *oip = OCFS_I(inode);
spin_lock(&oip->ip_lock);
- list_del_init(&task->io_list);
+ if (!list_empty(&task->io_list))
+ list_del_init(&task->io_list);
spin_unlock(&oip->ip_lock);
}
@@ -70,46 +71,61 @@
typedef struct _ocfs2_backing_inode {
struct rb_node ba_node;
struct inode *ba_inode;
- int ba_locked;
+ unsigned ba_meta_locked:1, /* meta is locked */
+ ba_locked:1, /* both are locked */
+ ba_lock_data:1, /* should lock data */
+ ba_lock_meta_level:1,
+ ba_lock_data_level:1;
struct _ocfs2_io_marker ba_task;
} ocfs2_backing_inode;
/* Used to manage the locks taken during I/O. */
typedef struct _ocfs2_buffer_lock_ctxt {
- /* target flags */
- unsigned b_lock_direct:1,
- b_lock_meta_write:1,
- b_lock_data_write:1;
struct rb_root b_inodes;
- ocfs2_backing_inode *b_target;
- ocfs2_backing_inode *b_head;
+ ocfs2_backing_inode *b_next_unlocked;
+ ocfs2_lock_callback b_cb;
+ unsigned long b_cb_data;
} ocfs2_buffer_lock_ctxt;
-#define __BUFFERLOCK_INITIALIZER(name) { \
+#define __BUFFERLOCK_INITIALIZER { \
.b_inodes = RB_ROOT, \
- .b_target = NULL, \
- .b_head = NULL }
+ .b_next_unlocked = NULL, \
+ .b_cb = NULL, \
+ .b_cb_data = 0 }
#define DECLARE_BUFFER_LOCK_CTXT(name) \
- ocfs2_buffer_lock_ctxt name = __BUFFERLOCK_INITIALIZER(name)
+ ocfs2_buffer_lock_ctxt name = __BUFFERLOCK_INITIALIZER
+#define INIT_BUFFER_LOCK_CTXT(ctxt) \
+ *(ctxt) = (ocfs2_buffer_lock_ctxt) __BUFFERLOCK_INITIALIZER
+
int ocfs2_setup_io_locks(struct super_block *sb,
struct inode *target_inode,
char *buf,
size_t size,
- ocfs2_buffer_lock_ctxt *ctxt);
+ ocfs2_buffer_lock_ctxt *ctxt,
+ ocfs2_backing_inode **target_binode);
int ocfs2_lock_buffer_inodes(ocfs2_buffer_lock_ctxt *ctxt,
struct inode *last_inode);
void ocfs2_unlock_buffer_inodes(struct _ocfs2_buffer_lock_ctxt *ctxt);
-static inline int ocfs2_buffer_lock_is_target(ocfs2_buffer_lock_ctxt *ctxt,
- struct inode *inode)
-{
- if (!ctxt->b_target)
- return 0;
- return inode == ctxt->b_target->ba_inode;
-}
+struct ocfs2_write_lock_info {
+ u64 wl_newsize;
+ unsigned wl_extended:1,
+ wl_do_direct_io:1,
+ wl_have_i_sem:1,
+ wl_unlock_ctxt:1,
+ wl_have_before:1,
+ wl_have_target_meta:1,
+ wl_have_data_lock:1;
+ ocfs2_backing_inode *wl_target_binode;
+};
+ssize_t ocfs_write_lock_maybe_extend(struct file *filp, const char *buf,
+ size_t count, loff_t *ppos,
+ struct ocfs2_write_lock_info *info,
+ ocfs2_buffer_lock_ctxt *ctxt);
+
#endif /* OCFS2_MMAP_H */
Modified: trunk/fs/ocfs2/ocfs.h
===================================================================
--- trunk/fs/ocfs2/ocfs.h 2005-03-02 18:17:23 UTC (rev 1932)
+++ trunk/fs/ocfs2/ocfs.h 2005-03-02 20:01:29 UTC (rev 1933)
@@ -163,15 +163,30 @@
struct ocfs2_lock_res_ops;
+typedef void (*ocfs2_lock_callback)(int status, unsigned long data);
+
+struct ocfs2_lockres_flag_callback {
+ struct list_head fc_lockres_item;
+ unsigned fc_free_once_called:1;
+
+ unsigned long fc_flag_mask;
+ unsigned long fc_flag_goal;
+
+ ocfs2_lock_callback fc_cb;
+ unsigned long fc_data;
+};
+
+
typedef struct _ocfs2_lock_res {
void *l_priv;
struct ocfs2_lock_res_ops *l_ops;
spinlock_t l_lock;
struct list_head l_blocked_list;
+ struct list_head l_flag_cb_list;
enum ocfs2_lock_type l_type;
- int l_flags;
+ unsigned long l_flags;
char *l_name;
int l_level;
unsigned int l_ro_holders;
@@ -186,6 +201,7 @@
int l_blocking;
wait_queue_head_t l_event;
+
} ocfs2_lock_res;
/* OCFS2 Inode Private Data */
More information about the Ocfs2-commits
mailing list