[Ocfs2-commits] khackel commits r1939 - in branches/dlm-reco-mig: . fs/ocfs2 fs/ocfs2/cluster fs/ocfs2/dlm fs/usysfs kapi-compat/include

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Fri Mar 4 16:21:30 CST 2005


Author: khackel
Date: 2005-03-04 16:21:29 -0600 (Fri, 04 Mar 2005)
New Revision: 1939

Added:
   branches/dlm-reco-mig/fs/ocfs2/aio.c
   branches/dlm-reco-mig/fs/ocfs2/aio.h
   branches/dlm-reco-mig/kapi-compat/include/journal_access.h
Modified:
   branches/dlm-reco-mig/Config.make.in
   branches/dlm-reco-mig/configure.in
   branches/dlm-reco-mig/fs/ocfs2/Cscope.make
   branches/dlm-reco-mig/fs/ocfs2/Makefile
   branches/dlm-reco-mig/fs/ocfs2/aops.c
   branches/dlm-reco-mig/fs/ocfs2/cluster/heartbeat.c
   branches/dlm-reco-mig/fs/ocfs2/cluster/tcp.c
   branches/dlm-reco-mig/fs/ocfs2/cluster/util.c
   branches/dlm-reco-mig/fs/ocfs2/cluster/util.h
   branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.c
   branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.h
   branches/dlm-reco-mig/fs/ocfs2/dlm/dlmthread.c
   branches/dlm-reco-mig/fs/ocfs2/dlm/userdlm.c
   branches/dlm-reco-mig/fs/ocfs2/dlm/util.c
   branches/dlm-reco-mig/fs/ocfs2/dlm/util.h
   branches/dlm-reco-mig/fs/ocfs2/dlmglue.c
   branches/dlm-reco-mig/fs/ocfs2/dlmglue.h
   branches/dlm-reco-mig/fs/ocfs2/file.c
   branches/dlm-reco-mig/fs/ocfs2/file.h
   branches/dlm-reco-mig/fs/ocfs2/heartbeat.c
   branches/dlm-reco-mig/fs/ocfs2/heartbeat.h
   branches/dlm-reco-mig/fs/ocfs2/journal.c
   branches/dlm-reco-mig/fs/ocfs2/mmap.c
   branches/dlm-reco-mig/fs/ocfs2/mmap.h
   branches/dlm-reco-mig/fs/ocfs2/ocfs.h
   branches/dlm-reco-mig/fs/ocfs2/proc.c
   branches/dlm-reco-mig/fs/ocfs2/suballoc.c
   branches/dlm-reco-mig/fs/ocfs2/super.c
   branches/dlm-reco-mig/fs/ocfs2/util.c
   branches/dlm-reco-mig/fs/ocfs2/util.h
   branches/dlm-reco-mig/fs/ocfs2/vote.c
   branches/dlm-reco-mig/fs/ocfs2/vote.h
   branches/dlm-reco-mig/fs/usysfs/dir.c
   branches/dlm-reco-mig/fs/usysfs/symlink.c
Log:
o Merged revision 1915:1938 from trunk
        - [1938] thread cleanup



Modified: branches/dlm-reco-mig/Config.make.in
===================================================================
--- branches/dlm-reco-mig/Config.make.in	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/Config.make.in	2005-03-04 22:21:29 UTC (rev 1939)
@@ -62,6 +62,7 @@
 EXTRA_CFLAGS += @KAPI_COMPAT_CFLAGS@
 
 MISSING_SOCK_CREATE_LITE = @MISSING_SOCK_CREATE_LITE@
+JOURNAL_ACCESS_WITH_CREDITS = @JOURNAL_ACCESS_WITH_CREDITS@
 
 COMPAT_SAFE_WRITE = @COMPAT_SAFE_WRITE@
 

Modified: branches/dlm-reco-mig/configure.in
===================================================================
--- branches/dlm-reco-mig/configure.in	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/configure.in	2005-03-04 22:21:29 UTC (rev 1939)
@@ -172,8 +172,10 @@
 ])
 AC_MSG_RESULT($kernelsourcedir)
 
-KERNELINC="$kernelsourcedir/include"
+KERNELSRC="$kernelsourcedir"
 
+KERNELINC="$KERNELSRC/include"
+
 if test ! -f "$KERNELINC/linux/autoconf.h"; then
   AC_MSG_ERROR(No configured kernel include tree found)
 fi
@@ -232,9 +234,9 @@
 AC_MSG_CHECKING([for safe write ordering])
 if test "x$KERNEL_26" = "xyes"; then
   have_safe_write=yes
-elif egrep "EXPORT_SYMBOL.*\(generic_file_write_nolock\);" "$kernelsrc/kernel/ksyms.c" >/dev/null 2>&1; then
+elif egrep "EXPORT_SYMBOL.*\(generic_file_write_nolock\);" "$KERNELSRC/kernel/ksyms.c" >/dev/null 2>&1; then
   have_safe_write=yes
-elif egrep "EXPORT_SYMBOL.*\(do_generic_file_write\);" "$kernelsrc/kernel/ksyms.c" >/dev/null 2>&1; then
+elif egrep "EXPORT_SYMBOL.*\(do_generic_file_write\);" "$KERNELSRC/kernel/ksyms.c" >/dev/null 2>&1; then
   have_safe_write=yes
   COMPAT_SAFE_WRITE=yes
 else
@@ -263,7 +265,7 @@
 fi
 AC_SUBST(MODVERSIONS)
 
-CPPFLAGS="-I$KERNELINC/include $saved_CPPFLAGS"
+CPPFLAGS="-I$KERNELINC $saved_CPPFLAGS"
 
 AC_MSG_CHECKING([for NPTL support])
 if test "x$KERNEL_26" = "xyes"; then
@@ -328,6 +330,17 @@
   AC_MSG_RESULT(no)
 fi
 
+AC_MSG_CHECKING([for journal access functions with a credits pointer])
+KAPI_COMPAT_HEADERS="$KAPI_COMPAT_HEADERS journal_access.h"
+  JOURNAL_ACCESS_WITH_CREDITS=
+if grep "\<int \*credits);" "$KERNELINC/linux/jbd.h" >/dev/null 2>&1 ; then
+  AC_MSG_RESULT(yes)
+  JOURNAL_ACCESS_WITH_CREDITS=yes
+else
+  AC_MSG_RESULT(no)
+fi
+AC_SUBST(JOURNAL_ACCESS_WITH_CREDITS)
+
 # using -include has two advantages:
 #  the source doesn't need to know to include compat headers
 #  the compat header file names don't go through the search path

Modified: branches/dlm-reco-mig/fs/ocfs2/Cscope.make
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/Cscope.make	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/Cscope.make	2005-03-04 22:21:29 UTC (rev 1939)
@@ -1,6 +1,6 @@
-CSCOPEFILES=*.c
+CSCOPEFILES=*.c *.h
 CSCOPEFILES+=$(KERNELINC)/../fs/*.c $(KERNELINC)/../fs/jbd/*.c
-CSCOPEFILES+=cluster/*.c dlm/*.c
+CSCOPEFILES+=cluster/*.c dlm/*.c cluster/*.h dlm/*.h
 
 cscope:
 	rm -f cscope.*

Modified: branches/dlm-reco-mig/fs/ocfs2/Makefile
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/Makefile	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/Makefile	2005-03-04 22:21:29 UTC (rev 1939)
@@ -30,6 +30,10 @@
 EXTRA_CFLAGS += -DOCFS_COMPAT_SAFE_WRITE
 endif
 
+ifdef JOURNAL_ACCESS_WITH_CREDITS
+EXTRA_CFLAGS += -DJOURNAL_ACCESS_WITH_CREDITS
+endif
+
 ifneq ($(QUIET),1)
 EXTRA_CFLAGS += -DVERBOSE_BH_JBD_TRACE
 EXTRA_CFLAGS += -DVERBOSE_LOCKING_TRACE
@@ -51,6 +55,7 @@
 
 SOURCES =			\
 	24io.c			\
+	aio.c 			\
 	alloc.c 		\
 	aops.c 			\
 	buffer_head_io.c	\
@@ -86,6 +91,7 @@
 	ocfs_compat.h		\
 	ocfs_journal.h		\
 	buffer_head_io.h	\
+	aio.h			\
 	alloc.h			\
 	dcache.h		\
 	dir.h			\

Copied: branches/dlm-reco-mig/fs/ocfs2/aio.c (from rev 1938, trunk/fs/ocfs2/aio.c)

Copied: branches/dlm-reco-mig/fs/ocfs2/aio.h (from rev 1938, trunk/fs/ocfs2/aio.h)

Modified: branches/dlm-reco-mig/fs/ocfs2/aops.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/aops.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/aops.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -576,8 +576,10 @@
 	LOG_ENTRY ();
 
 	/* blockdev_direct_IO checks alignment for us, using */
-	ret = blockdev_direct_IO (rw, iocb, inode, inode->i_sb->s_bdev, iov, offset, nr_segs, ocfs_direct_IO_get_blocks, NULL);
-
+	ret = blockdev_direct_IO_no_locking(rw, iocb, inode,
+					    inode->i_sb->s_bdev, iov, offset,
+					    nr_segs, ocfs_direct_IO_get_blocks,
+					    NULL);
 	LOG_EXIT_INT (ret);
 
 	LOG_CLEAR_CONTEXT();

Modified: branches/dlm-reco-mig/fs/ocfs2/cluster/heartbeat.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/cluster/heartbeat.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/cluster/heartbeat.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -48,7 +48,6 @@
 #include <linux/socket.h>
 #include <linux/inet.h>
 #include <linux/in.h>
-#include <linux/module.h>
 
 #include <linux/linkage.h>
 #include <linux/time.h>
@@ -61,6 +60,7 @@
 #include <linux/pagemap.h>
 #include <linux/file.h>
 #include <linux/bitops.h>
+#include <linux/kthread.h>
 
 #include <asm/uaccess.h>
 
@@ -104,8 +104,6 @@
 static LIST_HEAD(hb_net_groups);
 static LIST_HEAD(hb_disk_groups);
 static struct task_struct *hb_task = NULL;
-static struct completion hb_complete;
-static int hb_pid = -1;
 
 static struct hb_callback {
 	struct list_head list;
@@ -369,65 +367,64 @@
 	return 0;
 }
 
-
 static int hb_thread(void *data)
 {
-	void *page;
-	
-	page = (void *) __get_free_page(GFP_KERNEL);
-	if (!page)
-		return -ENOMEM;
+	void *page = data;
 
-	util_daemonize ("hb_thread", strlen("hb_thread"), 1);
-	hb_task = current;
-
-	while (1) {
+	hbprintk("hb thread running\n");
+	
+	while (!kthread_should_stop()) {
 		hb_do_disk_heartbeat(page);
-		/* when we can really tear down this can wait on a wait
-		 * queue */
 		set_current_state(TASK_UNINTERRUPTIBLE);
 		schedule_timeout(msecs_to_jiffies(HB_THREAD_MS));
 	}
 
-	flush_scheduled_work();
-	complete (&hb_complete);
-	hbprintk("quitting hb thread!!!!!!\n");
+	hbprintk("hb thread exiting\n");
+	free_page((unsigned long)page);
 	return 0;
 }
 
 /* Launch the hb thread for the mounted volume */
 static int hb_launch_thread(void)
 {
-	hb_pid = -1;
-	hb_task = NULL;
-	init_completion (&hb_complete);
+	void *page;
+	int ret;
 
+	page = (void *)__get_free_page(GFP_KERNEL);
+	if (!page) {
+		ret = -ENOMEM;
+		goto out;
+	}
+
 	hbprintk("starting hb thread...\n");
-	hb_pid = kernel_thread (hb_thread, NULL, 
-				CLONE_FS | CLONE_FILES | CLONE_SIGHAND);
-	if (hb_pid < 0) {
-		hbprintk("unable to launch hb thread, error=%d", hb_pid);
-		return -EINVAL;
+	hb_task = kthread_run(hb_thread, page, "hb_thread");
+	if (IS_ERR(hb_task)) {
+		hb_task = NULL;
+		hbprintk("unable to launch hb thread, error=%ld",
+			 PTR_ERR(hb_task));
+		ret = -EINVAL;
+		goto out;
 	}
-	hbprintk("hb thread running...\n");
-	return 0;
+
+	/* hb_thread is responsible for freeing the page if it runs */
+	page = NULL;
+	ret = 0;
+
+out:
+	if (page)
+		free_page((unsigned long)page);
+	return ret;
 }
 
 static void hb_complete_thread(void)
 {
-	hbprintk ("waiting for hb thread to exit....");
-	send_sig (SIGINT, hb_task, 0);
-	wait_for_completion (&hb_complete);
-	hbprintk ("hb thread exited\n");
-	hb_task = NULL;
+	if (hb_task) {
+		hbprintk("waiting for hb thread to exit\n");
+		kthread_stop(hb_task);
+		hb_task = NULL;
+	}
 }
 
-
-
-
-
-
-
 static int hb_init_disk_hb_group(struct inode *group, dev_t dev, u32 bits, 
 				 u32 blocks, u64 start)
 {

Modified: branches/dlm-reco-mig/fs/ocfs2/cluster/tcp.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/cluster/tcp.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/cluster/tcp.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -68,7 +68,6 @@
  * 	- find explicit stack call to drain rx queue
  * 	- add trivial version trading message at the start of a conn
  * 	- go nuts adding static
- * 	- move recv_sock into recv_thread
  * 	- nsc waiting is buggy, should be on socket.. wake w/err if socket dies
  * 	- compare socks in attach_sock so both size don't close
  * 	- implement net_remove_handlers
@@ -94,6 +93,7 @@
 #include <linux/blkdev.h>
 #include <linux/proc_fs.h>
 #include <linux/file.h>
+#include <linux/kthread.h>
 
 #include <asm/uaccess.h>
 
@@ -145,7 +145,6 @@
 #define sk_state_change		state_change
 #endif
 
-struct socket *recv_sock = NULL;
 static u16 ip_version, ip_port;
 static struct inode *net_inode = NULL;
 static u8 net_node_num;
@@ -160,9 +159,7 @@
 static spinlock_t net_active_lock = SPIN_LOCK_UNLOCKED;
 static LIST_HEAD(net_active_list);
 
-static int net_recv_pid = -1;
 static struct task_struct *net_recv_task = NULL;
-static struct completion net_recv_complete;
 
 static inline void net_abort_status_return(net_status_ctxt *nsc)
 {
@@ -180,11 +177,10 @@
 static void __exit net_driver_exit (void);
 static int net_add_handler(net_msg_handler *nmh);
 static void net_remove_handlers(void);
-static int net_init_tcp_recv_sock(void);
+static struct socket *net_init_tcp_recv_sock(void);
 static int net_receive_thread(void *data);
 static int net_receive(void);
-static void net_try_accept(void);
-static void net_release_tcp_sock(void);
+static void net_try_accept(struct socket *sock);
 static int net_process_message(struct socket *sock, net_msg *hdr);
 static int net_ioctl (struct inode *inode, struct file *filp, unsigned int cmd, unsigned long arg);
 
@@ -358,32 +354,47 @@
 
 static int net_startup(void)
 {
-	net_recv_pid = -1;
-	net_recv_task = NULL;
-	init_completion (&net_recv_complete);
+	struct socket *sock;
+	int ret = 0;
 
+	/* if the thread was setting up the rx socket we'd like to have it
+	 * communicate errors back to us here.  us setting up the socket
+	 * and passing it to the thread is easier */
+	sock = net_init_tcp_recv_sock();
+	if (IS_ERR(sock)) {
+		ret = PTR_ERR(sock);
+		goto out;
+	}
+
 	netprintk0("starting net receive thread...\n");
-	net_recv_pid = kernel_thread (net_receive_thread, NULL, CLONE_FS | CLONE_FILES | CLONE_SIGHAND);
-	if (net_recv_pid < 0) {
-		netprintk("unable to launch net receive thread, error=%d\n",
-			  net_recv_pid);
-		net_shutdown();
-		return -EINVAL;
+
+	net_recv_task = kthread_run(net_receive_thread, sock, "netrecv");
+	if (IS_ERR(net_recv_task)) {
+		ret = PTR_ERR(net_recv_task);
+		net_recv_task = NULL;
+		netprintk("unable to launch net receive thread, error=%ld\n",
+			  (long)ret);
+		goto out;
 	}
 
-	netprintk0("net thread running...\n");
+	/* once the thread is running it has ownership of the sock */
+	sock = NULL;
+
+out:
+	if (sock)
+		sock_release(sock);
 	return 0;
 }
 
 static void net_shutdown(void)
 {
-	netprintk ("waiting for net thread to exit....\n");
-	send_sig (SIGINT, net_recv_task, 0);
-	wait_for_completion (&net_recv_complete);
-	netprintk ("net thread exited\n");
+	if (net_recv_task) {
+		netprintk("waiting for net thread to exit....\n");
+		kthread_stop(net_recv_task);
+	}
 }
 
-static int net_rx_should_wake(void)
+static int net_rx_should_wake(struct socket *sock)
 {
 	int empty;
 
@@ -391,42 +402,26 @@
 	empty = list_empty(&net_active_list);
 	spin_unlock_bh(&net_active_lock);
 
-	return !empty || tcp_sk(recv_sock->sk)->accept_queue;
+	return !empty || tcp_sk(sock->sk)->accept_queue;
 }
 
 static int net_receive_thread(void *data)
 {
-	int status;
-	DECLARE_WAITQUEUE(main_wait, current);
+	struct socket *sock = data;
 
-	util_daemonize ("netrecv", strlen("netrecv"), 1);
-	net_recv_task = current;
+	netprintk0("net thread running...\n");
 
-	status = net_init_tcp_recv_sock();
-       	if (status >= 0 && recv_sock) {
-		add_wait_queue_exclusive(recv_sock->sk->sk_sleep, &main_wait);
-		while (1) {
-			net_try_accept();
-			net_receive();
+       	while(!kthread_should_stop()) {
+		net_try_accept(sock);
+		net_receive();
 
-			wait_event_interruptible(*recv_sock->sk->sk_sleep,
-						 net_rx_should_wake());
-
-			if (signal_pending(current)) {
-				netprintk0("net recv thread got signal!\n");
-				break;
-			}
-		}
-		remove_wait_queue(recv_sock->sk->sk_sleep, &main_wait);
-	} else {
-		netprintk0("failed to initialize net_thread!\n");
+		wait_event_interruptible(*sock->sk->sk_sleep,
+					 net_rx_should_wake(sock) ||
+					 kthread_should_stop());
 	}
 
-	/* Flush all scheduled tasks */
-	flush_scheduled_work();
-	net_release_tcp_sock();
-	net_recv_task = NULL;
-	complete (&net_recv_complete);
+	netprintk("net thread exiting\n");
+	sock_release(sock);
 	return 0;
 }
 
@@ -435,7 +430,7 @@
 int net_register_handler(u32 msg_type, u32 key, int flags, u32 max_len, 
 			 net_msg_handler_func *func, void *data)
 {
-	net_msg_handler *nmh, *found=NULL;
+	net_msg_handler *nmh;
 	int ret;
 
 	if (max_len > NET_MAX_PAYLOAD_BYTES) {
@@ -473,7 +468,6 @@
 
 	ret = net_add_handler(nmh);
 	if (ret) {
-		net_put_handler(found);
 		netprintk("message handler for type %u, key %u already exists!!!\n",
 		       msg_type, key);
 	}
@@ -817,8 +811,8 @@
 	if (list_empty(&net->active_item))
 		list_add_tail(&net->active_item, &net_active_list);
 
-	if (recv_sock != NULL)
-		wake_up(recv_sock->sk->sk_sleep);
+	if (net_recv_task)
+		wake_up_process(net_recv_task);
 }
 
 /* teardown can race with these guys and stop them in their read lock.. 
@@ -1452,30 +1446,31 @@
 	return ret;
 }
 
-static void net_try_accept(void)
+static void net_try_accept(struct socket *sock)
 {
 	int error, slen;
 	struct sockaddr_in sin;
-	struct socket *sock = NULL;
+	struct socket *new_sock = NULL;
 	struct inode *inode = NULL;
 	nm_node_inode_private *priv;
 
-	BUG_ON(recv_sock == NULL);
-	error = sock_create_lite(recv_sock->sk->sk_family,
-				 recv_sock->sk->sk_type,
-				 recv_sock->sk->sk_protocol,
-				 &sock);
+	BUG_ON(sock == NULL);
+	error = sock_create_lite(sock->sk->sk_family,
+				 sock->sk->sk_type,
+				 sock->sk->sk_protocol,
+				 &new_sock);
 	if (error)
 		goto out;
 
-	sock->type = recv_sock->type;
-	sock->ops = recv_sock->ops;
-	error = recv_sock->ops->accept(recv_sock, sock, O_NONBLOCK);
+	new_sock->type = sock->type;
+	new_sock->ops = sock->ops;
+	error = sock->ops->accept(sock, new_sock, O_NONBLOCK);
 	if (error < 0)
 		goto out;
 
 	slen = sizeof(sin);
-	error = sock->ops->getname(sock, (struct sockaddr *) &sin, &slen, 1);
+	error = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,
+				       &slen, 1);
 	if (error < 0)
 		goto out;
 	
@@ -1485,7 +1480,7 @@
 	inode = nm_get_node_by_ip(sin.sin_addr.s_addr);
 	if (inode == NULL) {
 		netprintk0("connect from unknown host...\n");
-		net_send_error(sock, NET_UNKNOWN_HOST);
+		net_send_error(new_sock, NET_UNKNOWN_HOST);
 		goto out;
 	}
 
@@ -1499,15 +1494,15 @@
 			  "%u.%u.%u.%u:%d\n", NIPQUAD(sin.sin_addr.s_addr),
 			  ntohs(sin.sin_port));
 
-	error = net_attach_sock(&priv->net, sock);
+	error = net_attach_sock(&priv->net, new_sock);
 	if (error == -EEXIST)
-		net_send_error(sock, NET_ALREADY_CONNECTED);
+		net_send_error(new_sock, NET_ALREADY_CONNECTED);
 
 out:
 	if (error) {
-		if (sock) {
-			net_sock_drain(sock);
-			sock_release(sock);
+		if (new_sock) {
+			net_sock_drain(new_sock);
+			sock_release(new_sock);
 		}
 		if (inode)
 			iput(inode);
@@ -1515,14 +1510,15 @@
 	return;
 }
 
-static int net_init_tcp_recv_sock(void)
+static struct socket *net_init_tcp_recv_sock(void)
 {
 	struct sockaddr_in sin;
+	struct socket *sock;
 	int error;
 
 	error = sock_create(net_ip_version_to_family(ip_version),
 			     SOCK_STREAM, IPPROTO_TCP,
-			     &recv_sock);
+			     &sock);
 	if (error < 0) {
 		netprintk("unable to create socket, error=%d\n", error);
 		goto bail;
@@ -1533,8 +1529,8 @@
 	sin.sin_addr.s_addr = htonl(INADDR_ANY);
 	sin.sin_port = ip_port;
 
-	error = recv_sock->ops->bind(recv_sock, (struct sockaddr *)&sin,
-				      sizeof(sin));
+	error = sock->ops->bind(sock, (struct sockaddr *)&sin,
+				sizeof(sin));
 	if (error < 0) {
 		netprintk ("unable to bind socket to port %d, error=%d\n", 
 			ntohs(ip_port), error);
@@ -1542,24 +1538,18 @@
 	}
 
 	/* !!! dunno about these... */
-	recv_sock->sk->sk_reuse = 1;
-	error = recv_sock->ops->listen(recv_sock, 64);
+	sock->sk->sk_reuse = 1;
+	error = sock->ops->listen(sock, 64);
 
 bail:
-	if (error && recv_sock) {
-		sock_release(recv_sock);
-		recv_sock = NULL;
+	if (error) {
+	       if (sock)
+			sock_release(sock);
+	       sock = ERR_PTR(error);
 	}
-	return error;
-}				/* net_init_tcp_recv_sock */
 
-
-static void net_release_tcp_sock(void)
-{
-	if (recv_sock) {
-		sock_release(recv_sock);
-		recv_sock = NULL;
-	}
+	BUG_ON(sock == NULL);
+	return sock;
 }
 
 MODULE_LICENSE("GPL");

Modified: branches/dlm-reco-mig/fs/ocfs2/cluster/util.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/cluster/util.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/cluster/util.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -67,48 +67,6 @@
 #endif
 }
 
-/*
- * util_daemonize() 
- *
- */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
-/* yes, len is unused but kept here for backwards compatibility. */
-void util_daemonize (char *name, int len, int shutdown_sigs)
-{
-	sigset_t tmpsig;
-
-	daemonize (name);
-
-	if (shutdown_sigs) {
-		/* Unblock SIGKILL, SIGSTOP, SIGHUP and SIGINT */
-		sigemptyset(&tmpsig);
-		sigaddsetmask(&tmpsig, SHUTDOWN_SIGS);
-		sigprocmask(SIG_UNBLOCK, &tmpsig, NULL);
-	}
-
-	return;
-}				/* util_daemonize */
-#else
-void util_daemonize (char *name, int len, int shutdown_sigs)
-{
-	daemonize ();
-	reparent_to_init ();
-
-	if (len > 0) {
-		if (len > 15)
-			BUG();
-		strncpy (current->comm, name, len);
-		current->comm[len] = '\0';
-	}
-
-	if (shutdown_sigs)
-		util_block_sigs(NULL, SHUTDOWN_SIGS);
-	else
-		util_block_sigs(NULL, 0);
-	return;
-}				/* util_daemonize */
-#endif
-
 /* prefetch has been declared to allow to build in debug mode */
 #ifdef DEBUG
 #ifndef ARCH_HAS_PREFETCH

Modified: branches/dlm-reco-mig/fs/ocfs2/cluster/util.h
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/cluster/util.h	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/cluster/util.h	2005-03-04 22:21:29 UTC (rev 1939)
@@ -40,7 +40,6 @@
 } util_timeout;
 
 void util_clear_timeout(util_timeout *to);
-void util_daemonize(char *name, int len, int shutdown_sigs);
 void util_init_timeout(util_timeout *to);
 void util_set_timeout(util_timeout *to, __u32 timeout);
 void util_show_stack(unsigned long *esp);
@@ -75,25 +74,4 @@
 void * util_rarray_idx_to_slot(util_rarray *arr, int idx);
 int util_resize_rarray(util_rarray *arr, int newelem);
 
-#ifdef __KERNEL__
-typedef struct _util_thread_info
-{
-	wait_queue_head_t thread_wq;
-	atomic_t woken;
-	struct task_struct *task;
-	struct completion complete;
-	int pid;
-} util_thread_info;
-
-
-static inline void util_thread_info_init(util_thread_info *info)
-{
-	init_waitqueue_head(&info->thread_wq);
-	atomic_set(&info->woken, 0);
-	info->task = NULL;
-	info->pid = -1;
-	init_completion(&info->complete);
-}
-#endif /* __KERNEL__ */
-
 #endif /* CLUSTER_UTIL_H */

Modified: branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -576,8 +576,7 @@
 
 	/* if the network code had any unregister calls, they would be here. */
 
-	if (dlm->thread.task)
-		dlm_complete_thread(dlm);
+	dlm_complete_thread(dlm);
 
 	/* We've left the domain. Now we can take ourselves out of the
 	 * list and allow the kref stuff to help us free the
@@ -650,8 +649,8 @@
 	INIT_LIST_HEAD(&dlm->reco.received);
 	INIT_LIST_HEAD(&dlm->master_list);
 	INIT_LIST_HEAD(&dlm->mle_hb_events);
-	util_thread_info_init(&dlm->thread);
-	util_thread_info_init(&dlm->reco.thread);
+	dlm->dlm_thread_task = NULL;
+	init_waitqueue_head(&dlm->dlm_thread_wq);
 	init_rwsem(&dlm->recovery_sem);
 
 	/* this eats the reference we got above. */

Modified: branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.h
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.h	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.h	2005-03-04 22:21:29 UTC (rev 1939)
@@ -191,7 +191,6 @@
 	u8  dead_node;
 	u8  sending_node;
 	u32 next_seq;
-	util_thread_info thread;
 	unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
 } dlm_recovery_ctxt;
 
@@ -209,7 +208,6 @@
 	spinlock_t spinlock;
 	struct rw_semaphore recovery_sem;
 	char *name;
-	util_thread_info thread;
 	struct inode *group;
 	u32 key;
 	u8  group_index;
@@ -231,6 +229,8 @@
 	unsigned int num_joins;
 	struct hb_callback_func dlm_hb_up;
 	struct hb_callback_func dlm_hb_down;
+	struct task_struct *dlm_thread_task;
+	wait_queue_head_t dlm_thread_wq;
 };
 
 #define DLM_LOCK_RES_UNINITED             0x00000001

Modified: branches/dlm-reco-mig/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlm/dlmthread.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/dlm/dlmthread.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -39,6 +39,7 @@
 #include <linux/socket.h>
 #include <linux/inet.h>
 #include <linux/timer.h>
+#include <linux/kthread.h>
 
 #include "util.h"
 
@@ -56,6 +57,7 @@
 extern u8 dlm_global_index;
 
 static int dlm_thread(void *data);
+struct task_struct *dlm_thread_task;
 
 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->group_index)
 
@@ -245,50 +247,54 @@
 		spin_unlock(&dlm->spinlock);
 	}
 
-	/* wake the dlm thread */
-	atomic_set(&dlm->thread.woken, 1);
-	wake_up(&dlm->thread.thread_wq);
+	wake_up(&dlm->dlm_thread_wq);
 }
 
 /* Launch the NM thread for the mounted volume */
 int dlm_launch_thread(dlm_ctxt *dlm)
 {
 	dlmprintk0("starting dlm thread...\n");
-	dlm->thread.pid = kernel_thread (dlm_thread, dlm, 
-					 CLONE_FS | CLONE_FILES | 
-					 CLONE_SIGHAND);
-	if (dlm->thread.pid < 0) {
-		dlmprintk("unable to launch dlm thread, error=%d", 
-			  dlm->thread.pid);
+
+	dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread");
+	if (IS_ERR(dlm->dlm_thread_task)) {
+		dlm->dlm_thread_task = NULL;
+		dlmprintk("unable to launch dlm thread, error=%ld", 
+			  PTR_ERR(dlm->dlm_thread_task));
 		return -EINVAL;
 	}
-	dlmprintk("dlm thread running for %s...\n", dlm->name);
+
 	return 0;
 }
 
 void dlm_complete_thread(dlm_ctxt *dlm)
 {
-	dlmprintk0 ("waiting for dlm thread to exit....");
-	send_sig (SIGINT, dlm->thread.task, 0);
-	wait_for_completion (&dlm->thread.complete);
-	dlmprintk0 ("dlm thread exited\n");
-	dlm->thread.task = NULL;
+	if (dlm->dlm_thread_task) {
+		dlmprintk0("waiting for dlm thread to exit\n");
+		kthread_stop(dlm->dlm_thread_task);
+		dlm->dlm_thread_task = NULL;
+	}
 }
 
+static int dlm_dirty_list_empty(dlm_ctxt *dlm)
+{
+	int empty;
 
+	spin_lock(&dlm->spinlock);
+	empty = list_empty(&dlm->dirty_list);
+	spin_unlock(&dlm->spinlock);
+	
+	return empty;
+}
 
-
 static int dlm_thread(void *data)
 {
 	struct list_head *iter, *tmpiter;
 	dlm_lock_resource *res;
 	dlm_ctxt *dlm = data;
 
-	util_daemonize ("dlm_thread", strlen("dlm_thread"), 1);
-	dlm->thread.task = current;
+	dlmprintk("dlm thread running for %s...\n", dlm->name);
 
-	while (1) {
-		atomic_set(&dlm->thread.woken, 0);
+	while (!kthread_should_stop()) {
 
 		down_read(&dlm->recovery_sem);
 		spin_lock(&dlm->spinlock);
@@ -307,17 +313,11 @@
 		spin_unlock(&dlm->spinlock);
 		up_read(&dlm->recovery_sem);
 
-		wait_event_interruptible(dlm->thread.thread_wq,
-					 atomic_read(&dlm->thread.woken));
-
-		if (signal_pending(current)) {
-			dlmprintk("DLM thread got signal while waiting\n");
-			break;
-		}
+		wait_event_interruptible(dlm->dlm_thread_wq,
+					 !dlm_dirty_list_empty(dlm) ||
+					 kthread_should_stop());
 	}
 
-	flush_scheduled_work();
-	complete (&dlm->thread.complete);
-	dlmprintk0("quitting DLM thread!!!!!!\n");
+	dlmprintk0("quitting DLM thread\n");
 	return 0;
 }

Modified: branches/dlm-reco-mig/fs/ocfs2/dlm/userdlm.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlm/userdlm.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/dlm/userdlm.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -170,9 +170,8 @@
 	if (!(lockres->l_flags & USER_LOCK_QUEUED)) {
 		user_dlm_grab_inode_ref(lockres);
 
-		PREPARE_WORK(&lockres->l_work,
-			     user_dlm_unblock_lock,
-			     lockres);
+		INIT_WORK(&lockres->l_work, user_dlm_unblock_lock,
+			  lockres);
 
 		queue_work(user_dlm_worker, &lockres->l_work);
 		lockres->l_flags |= USER_LOCK_QUEUED;

Modified: branches/dlm-reco-mig/fs/ocfs2/dlm/util.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlm/util.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/dlm/util.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -65,48 +65,6 @@
 #endif
 }
 
-/*
- * util_daemonize() 
- *
- */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
-/* yes, len is unused but kept here for backwards compatibility. */
-void util_daemonize (char *name, int len, int shutdown_sigs)
-{
-	sigset_t tmpsig;
-
-	daemonize (name);
-
-	if (shutdown_sigs) {
-		/* Unblock SIGKILL, SIGSTOP, SIGHUP and SIGINT */
-		sigemptyset(&tmpsig);
-		sigaddsetmask(&tmpsig, SHUTDOWN_SIGS);
-		sigprocmask(SIG_UNBLOCK, &tmpsig, NULL);
-	}
-
-	return;
-}				/* util_daemonize */
-#else
-void util_daemonize (char *name, int len, int shutdown_sigs)
-{
-	daemonize ();
-	reparent_to_init ();
-
-	if (len > 0) {
-		if (len > 15)
-			BUG();
-		strncpy (current->comm, name, len);
-		current->comm[len] = '\0';
-	}
-
-	if (shutdown_sigs)
-		util_block_sigs(NULL, SHUTDOWN_SIGS);
-	else
-		util_block_sigs(NULL, 0);
-	return;
-}				/* util_daemonize */
-#endif
-
 /* prefetch has been declared to allow to build in debug mode */
 #ifdef DEBUG
 #ifndef ARCH_HAS_PREFETCH

Modified: branches/dlm-reco-mig/fs/ocfs2/dlm/util.h
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlm/util.h	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/dlm/util.h	2005-03-04 22:21:29 UTC (rev 1939)
@@ -43,7 +43,6 @@
 } util_timeout;
 
 void util_clear_timeout(util_timeout *to);
-void util_daemonize(char *name, int len, int shutdown_sigs);
 void util_init_timeout(util_timeout *to);
 void util_set_timeout(util_timeout *to, __u32 timeout);
 void util_show_stack(unsigned long *esp);
@@ -78,25 +77,4 @@
 void * util_rarray_idx_to_slot(util_rarray *arr, int idx);
 int util_resize_rarray(util_rarray *arr, int newelem);
 
-#ifdef __KERNEL__
-typedef struct _util_thread_info
-{
-	wait_queue_head_t thread_wq;
-	atomic_t woken;
-	struct task_struct *task;
-	struct completion complete;
-	int pid;
-} util_thread_info;
-
-
-static inline void util_thread_info_init(util_thread_info *info)
-{
-	init_waitqueue_head(&info->thread_wq);
-	atomic_set(&info->woken, 0);
-	info->task = NULL;
-	info->pid = -1;
-	init_completion(&info->complete);
-}
-#endif /* __KERNEL__ */
-
 #endif /* CLUSTER_UTIL_H */

Modified: branches/dlm-reco-mig/fs/ocfs2/dlmglue.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlmglue.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/dlmglue.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -31,6 +31,7 @@
 #include <linux/mm.h>
 #include <linux/smp_lock.h>
 #include <linux/crc32.h>
+#include <linux/kthread.h>
 
 #include <cluster/util.h>
 #include <cluster/clcommon.h>
@@ -174,14 +175,16 @@
 static int ocfs2_cluster_lock(ocfs_super *osb,
 			      ocfs2_lock_res *lockres,
 			      int level,
-			      int lkm_flags);
+			      int lkm_flags,
+			      ocfs2_lock_callback cb,
+			      unsigned long cb_data);
 static void ocfs2_cluster_unlock(ocfs_super *osb,
 				 ocfs2_lock_res *lockres,
 				 int level);
 static inline void ocfs2_generic_handle_downconvert_action(ocfs2_lock_res *lockres);
 static inline void ocfs2_generic_handle_convert_action(ocfs2_lock_res *lockres);
 static inline void ocfs2_generic_handle_attach_action(ocfs2_lock_res *lockres);
-static void ocfs2_generic_handle_bast(ocfs2_lock_res *lockres, int level);
+static int ocfs2_generic_handle_bast(ocfs2_lock_res *lockres, int level);
 static inline void ocfs2_handle_meta_convert_action(struct inode *inode,
 						    ocfs2_lock_res *lockres);
 static void ocfs2_inc_inode_seq(ocfs_super *osb,
@@ -323,6 +326,7 @@
 	res->l_type = type;
 	res->l_level = LKM_IVMODE;
 	INIT_LIST_HEAD(&res->l_blocked_list);
+	INIT_LIST_HEAD(&res->l_flag_cb_list);
 	res->l_priv = priv;
 	LOG_EXIT();
 }
@@ -449,6 +453,37 @@
 	return new_level;
 }
 
+/* XXX must be called with lockres->l_lock held */
+static void lockres_set_flags(ocfs2_lock_res *lockres, unsigned long newflags)
+{
+	struct list_head *pos, *tmp;
+	struct ocfs2_lockres_flag_callback *fcb;
+
+	lockres->l_flags = newflags;
+
+	list_for_each_safe(pos, tmp, &lockres->l_flag_cb_list) {
+		fcb = list_entry(pos, struct ocfs2_lockres_flag_callback,
+				 fc_lockres_item);
+		if ((lockres->l_flags & fcb->fc_flag_mask) !=
+		    fcb->fc_flag_goal)
+			continue;
+
+		list_del_init(&fcb->fc_lockres_item);
+		fcb->fc_cb(0, fcb->fc_data);
+		if (fcb->fc_free_once_called)
+			kfree(fcb);
+	}
+}
+
+static void lockres_or_flags(ocfs2_lock_res *lockres, unsigned long or)
+{
+	lockres_set_flags(lockres, lockres->l_flags | or);
+}
+static void lockres_clear_flags(ocfs2_lock_res *lockres, unsigned long clear)
+{
+	lockres_set_flags(lockres, lockres->l_flags & ~clear);
+}
+
 static inline void ocfs2_generic_handle_downconvert_action(ocfs2_lock_res *lockres)
 {
 	LOG_ENTRY();
@@ -462,9 +497,9 @@
 	if (lockres->l_level <=
 	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
 		lockres->l_blocking = LKM_NLMODE;
-		lockres->l_flags &= ~OCFS2_LOCK_BLOCKED;
+		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
 	}
-	lockres->l_flags &= ~OCFS2_LOCK_BUSY;
+	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 
 	LOG_EXIT();
 }
@@ -511,10 +546,10 @@
 	 * *anything* however should mark ourselves as needing an
 	 * update */
 	if (lockres->l_level == LKM_NLMODE)
-		lockres->l_flags |= OCFS2_LOCK_NEEDS_REFRESH;
+		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
 	lockres->l_level = lockres->l_requested;
-	lockres->l_flags &= ~OCFS2_LOCK_BUSY;
+	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 	LOG_EXIT();
 }
 
@@ -542,11 +577,11 @@
 
 	if (lockres->l_requested > LKM_NLMODE &&
 	    !(lockres->l_flags & OCFS2_LOCK_LOCAL))
-		lockres->l_flags |= OCFS2_LOCK_NEEDS_REFRESH;
+		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
 	lockres->l_level = lockres->l_requested;
-	lockres->l_flags |= OCFS2_LOCK_ATTACHED;
-	lockres->l_flags &= ~OCFS2_LOCK_BUSY;
+	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
+	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 
 	LOG_EXIT();
 }
@@ -570,6 +605,7 @@
 	OCFS_ASSERT(ocfs2_is_inode_lock(lockres));
 
 	spin_lock(&lockres->l_lock);
+
 	lksb = &(lockres->l_lksb);
 	if (lksb->status != DLM_NORMAL) {
 		LOG_ERROR_ARGS("ocfs2_inode_ast_func: lksb status value of %u "
@@ -604,7 +640,7 @@
 
 	/* data locking ignores refresh flag for now. */
 	if (lockres->l_type == OCFS_TYPE_DATA)
-		lockres->l_flags &= ~OCFS2_LOCK_NEEDS_REFRESH;
+		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
 	/* set it to something invalid so if we get called again we
 	 * can catch it. */
@@ -615,17 +651,29 @@
 	LOG_EXIT();
 }
 
-static void ocfs2_generic_handle_bast(ocfs2_lock_res *lockres, int level)
+static int ocfs2_generic_handle_bast(ocfs2_lock_res *lockres, int level)
 {
+	int needs_downconvert = 0;
 	LOG_ENTRY();
 
-	spin_lock(&lockres->l_lock);
+	assert_spin_locked(&lockres->l_lock);
+
 	lockres->l_flags |= OCFS2_LOCK_BLOCKED;
-	if (level > lockres->l_blocking)
+
+	if (level > lockres->l_blocking) {
+		/* only schedule a downconvert if we haven't already scheduled
+		 * one that goes low enough to satisfy the level we're 
+		 * blocking.  this also catches the case where we get 
+		 * duplicate BASTs */
+		if (ocfs2_highest_compat_lock_level(level) <
+		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
+			needs_downconvert = 1;
+
 		lockres->l_blocking = level;
-	spin_unlock(&lockres->l_lock);
+	}
 
-	LOG_EXIT();
+	LOG_EXIT_STATUS(needs_downconvert);
+	return needs_downconvert;
 }
 
 static void ocfs2_inode_bast_func(void *opaque, int level)
@@ -633,6 +681,7 @@
 	ocfs2_lock_res *lockres = opaque;
 	struct inode *inode;
 	ocfs_super *osb;
+	int needs_downconvert;
 
 	LOG_ENTRY();
 
@@ -646,13 +695,18 @@
 
 	OCFS_ASSERT(level > LKM_NLMODE);
 
-	ocfs2_generic_handle_bast(lockres, level);
+	spin_lock(&lockres->l_lock);
 
-	ocfs2_schedule_blocked_inode_lock(inode, lockres);
+	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
+	if (needs_downconvert)
+		ocfs2_schedule_blocked_inode_lock(inode, lockres);
+
+	spin_unlock(&lockres->l_lock);
+
 	ocfs2_kick_vote_thread(osb);
-
 	/* TODO: Is a wake_up call here really necessary? */
 	wake_up(&lockres->l_event);
+
 	LOG_EXIT();
 }
 
@@ -701,15 +755,19 @@
 {
 	ocfs2_lock_res *lockres = opaque;
 	ocfs_super *osb;
+	int needs_downconvert;
 
 	LOG_ENTRY();
        	osb = ocfs2_lock_res_super(lockres);
 
 	dprintk("Superblock BAST fired\n");
 
-	ocfs2_generic_handle_bast(lockres, level);
+	spin_lock(&lockres->l_lock);
+	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
+	if (needs_downconvert)
+		ocfs2_schedule_blocked_lock(osb, lockres);
+	spin_unlock(&lockres->l_lock);
 
-	ocfs2_schedule_blocked_lock(osb, lockres);
 	ocfs2_kick_vote_thread(osb);
 
 	wake_up(&lockres->l_event);
@@ -721,7 +779,7 @@
 {
 	LOG_ENTRY();
 	spin_lock(&lockres->l_lock);
-	lockres->l_flags &= ~OCFS2_LOCK_BUSY;
+	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 	if (convert)
 		lockres->l_action = OCFS2_AST_INVALID;
 	else
@@ -751,7 +809,7 @@
 
 	lockres->l_action = OCFS2_AST_ATTACH;
 	lockres->l_requested = level;
-	lockres->l_flags |= OCFS2_LOCK_BUSY;
+	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 	spin_unlock(&lockres->l_lock);
 
 	status = dlmlock(osb->dlm,
@@ -814,6 +872,18 @@
 	LOG_EXIT();
 }
 
+static void lockres_add_flag_callback(ocfs2_lock_res *lockres,
+				      struct ocfs2_lockres_flag_callback *fcb,
+				      unsigned long mask, unsigned long goal)
+{
+	BUG_ON(!list_empty(&fcb->fc_lockres_item));
+	BUG_ON(fcb->fc_cb == NULL);
+
+	list_add_tail(&fcb->fc_lockres_item, &lockres->l_flag_cb_list);
+	fcb->fc_flag_mask = mask;
+	fcb->fc_flag_goal = goal;
+}
+
 /* predict what lock level we'll be dropping down to on behalf
  * of another node, and return true if the currently wanted
  * level will be compatible with it. */
@@ -825,21 +895,58 @@
 	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
 }
 
+/* these are generic and could be used elsewhere */
+struct ocfs2_status_completion {
+	int			sc_status;
+	struct completion	sc_complete;
+};
+
+static void ocfs2_status_completion_cb(int rc, unsigned long data)
+{
+	struct ocfs2_status_completion *sc;
+
+	sc = (struct ocfs2_status_completion *)data;
+	sc->sc_status = rc;
+	complete(&sc->sc_complete);
+}
+
 static int ocfs2_cluster_lock(ocfs_super *osb,
 			      ocfs2_lock_res *lockres,
 			      int level,
-			      int lkm_flags)
+			      int lkm_flags,
+			      ocfs2_lock_callback cb,
+			      unsigned long cb_data)
 {
+	struct ocfs2_lockres_flag_callback _fcb, *fcb = &_fcb;
+	struct ocfs2_status_completion sc;
+	dlm_status status;
 	int ret;
 	int catch_signals = 1;
-	dlm_status status;
 
 	LOG_ENTRY();
 
+	if (cb != NULL) {
+		fcb = kmalloc(sizeof(*fcb), GFP_NOFS);
+		if (fcb == NULL) {
+			ret = -ENOMEM;
+			goto out;
+		}
+		fcb->fc_cb = cb;
+		fcb->fc_data = cb_data;
+		fcb->fc_free_once_called = 1;
+	} else {
+		init_completion(&sc.sc_complete);
+		fcb->fc_cb = ocfs2_status_completion_cb;
+		fcb->fc_data = (unsigned long)&sc;
+		fcb->fc_free_once_called = 0;
+	}
+
+	INIT_LIST_HEAD(&fcb->fc_lockres_item);
+
 again:
 	if (catch_signals && signal_pending(current)) {
 		ret = -EINTR;
-		goto bail;
+		goto out;
 	}
 
 	spin_lock(&lockres->l_lock);
@@ -851,10 +958,9 @@
 	    level > lockres->l_level) {
 		/* is someone sitting in dlm_lock? If so, wait on
 		 * them. */
-		spin_unlock(&lockres->l_lock);
-
-		ocfs2_wait_on_busy_lock(lockres);
-		goto again;
+		lockres_add_flag_callback(lockres, fcb, OCFS2_LOCK_BUSY, 0);
+		ret = -EIOCBQUEUED;
+		goto unlock;
 	}
 
 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
@@ -863,7 +969,7 @@
 		ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
 		if (ret < 0) {
 			LOG_ERROR_STATUS(ret);
-			goto bail;
+			goto out;
 		}
 		goto again;
 	}
@@ -872,10 +978,9 @@
 	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
 		/* is the lock is currently blocked on behalf of
 		 * another node */
-		spin_unlock(&lockres->l_lock);
-
-		ocfs2_wait_on_blocked_lock(lockres);
-		goto again;
+		lockres_add_flag_callback(lockres, fcb, OCFS2_LOCK_BLOCKED, 0);
+		ret = -EIOCBQUEUED;
+		goto unlock;
 	}
 
 	if (level > lockres->l_level) {
@@ -908,27 +1013,37 @@
 				ret = -ENOENT;
 			}
 			ocfs2_recover_from_dlm_error(lockres, 1);
-			goto bail;
+			goto out;
 		}
 
 		dprintk("lock %s, successfull return from dlmlock\n",
 			lockres->l_name);
 
-		ocfs2_wait_on_busy_lock(lockres);
-
 		/* At this point we've gone inside the dlm and need to
 		 * complete our work regardless. */
 		catch_signals = 0;
+
+		/* wait for busy to clear and carry on */
 		goto again;
 	}
 
 	/* Ok, if we get here then we're good to go. */
 	ocfs2_inc_holders(lockres, level);
 
+	ret = 0;
+unlock:
 	spin_unlock(&lockres->l_lock);
+out:
+	if (ret == -EIOCBQUEUED && fcb->fc_cb == ocfs2_status_completion_cb) {
+		wait_for_completion(&sc.sc_complete);
+		ret = sc.sc_status;
+		if (ret == 0)
+			goto again;
+	}
 
-	ret = 0;
-bail:
+	if (ret && fcb != NULL && fcb != &_fcb)
+		kfree(fcb);
+
 	LOG_EXIT_STATUS(ret);
 	return ret;
 }
@@ -972,10 +1087,10 @@
 
 	lockres = &OCFS_I(inode)->ip_meta_lockres;
 	OCFS_ASSERT(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
-	lockres->l_flags |= OCFS2_LOCK_LOCAL;
+	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
 
 	status = ocfs2_lock_create(osb, lockres, LKM_EXMODE, LKM_LOCAL);
-	lockres->l_flags &= ~OCFS2_LOCK_LOCAL;
+	lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
 	if (status < 0) {
 		LOG_ERROR_STATUS(status);
 		goto bail;
@@ -983,10 +1098,10 @@
 
 	lockres = &OCFS_I(inode)->ip_data_lockres;
 	OCFS_ASSERT(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
-	lockres->l_flags |= OCFS2_LOCK_LOCAL;
+	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
 
 	status = ocfs2_lock_create(osb, lockres, LKM_EXMODE, LKM_LOCAL);
-	lockres->l_flags &= ~OCFS2_LOCK_LOCAL;
+	lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
 	if (status < 0) {
 		LOG_ERROR_STATUS(status);
 		goto bail;
@@ -1015,7 +1130,8 @@
 
 	level = write ? LKM_EXMODE : LKM_PRMODE;
 
-	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0);
+	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
+				    NULL, 0);
 	if (status < 0 && status != -EINTR)
 		LOG_ERROR_STATUS(status);
 
@@ -1209,7 +1325,7 @@
 	}
 
 	/* Ok, I'll be the one to refresh this lock. */
-	lockres->l_flags |= OCFS2_LOCK_REFRESHING;
+	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
 	spin_unlock(&lockres->l_lock);
 
 	status = 1;
@@ -1226,9 +1342,9 @@
 	LOG_ENTRY();
 
 	spin_lock(&lockres->l_lock);
-	lockres->l_flags &= ~OCFS2_LOCK_REFRESHING;
+	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
 	if (!status)
-		lockres->l_flags &= ~OCFS2_LOCK_NEEDS_REFRESH;
+		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 	spin_unlock(&lockres->l_lock);
 
 	wake_up(&lockres->l_event);
@@ -1298,22 +1414,24 @@
 	return status;
 }
 
-int ocfs2_meta_lock_flags(struct inode *inode,
-			  ocfs_journal_handle *handle,
-			  struct buffer_head **ret_bh,
-			  int ex,
-			  int flags)
+/* 
+ * returns < 0 error if the callback will never be called, otherwise
+ * the result of the lock will be communicated via the callback.
+ */
+int ocfs2_meta_lock_flags_async(struct inode *inode,
+			        struct buffer_head **ret_bh,
+				int ex,
+				int flags,
+				ocfs2_lock_callback cb,
+				unsigned long cb_data)
 {
 	int status, level, dlm_flags;
 	ocfs2_lock_res *lockres;
 	ocfs_super *osb = OCFS2_SB(inode->i_sb);
-	struct buffer_head *bh = NULL;
+	struct buffer_head *local_bh = NULL;
 
 	OCFS_ASSERT(inode);
 
-	if (handle && !ex)
-		BUG();
-
 	LOG_ENTRY();
 
 	dprintk("inode %llu, take %s META lock\n", OCFS_I(inode)->ip_blkno,
@@ -1333,7 +1451,8 @@
 	if (flags & OCFS2_META_LOCK_NOQUEUE)
 		dlm_flags |= LKM_NOQUEUE;
 
-	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags);
+	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, cb,
+				    cb_data);
 	if (status < 0) {
 		if (status != -EINTR && status != -EAGAIN)
 			LOG_ERROR_STATUS(status);
@@ -1349,15 +1468,46 @@
 			   ocfs_node_map_is_empty(osb,
 						  &osb->recovery_map));
 
-	status = ocfs2_meta_lock_update(inode, &bh);
+	/* it's pretty weak to do this possibly sync read here, but until
+	 * we have a real async version of it it's as good a place as any */
+	if (ret_bh == NULL)
+		ret_bh = &local_bh;
+	status = ocfs2_meta_lock_update(inode, ret_bh);
 	if (status < 0) {
 		LOG_ERROR_STATUS(status);
 		goto bail;
 	}
 
+bail:
+	if (local_bh)
+		brelse(local_bh);
+	LOG_EXIT_STATUS(status);
+	return status;
+}
+
+/* grabs the meta lock synchronusly.  */
+int ocfs2_meta_lock_flags(struct inode *inode,
+			  ocfs_journal_handle *handle,
+			  struct buffer_head **ret_bh,
+			  int ex,
+			  int flags)
+{
+	struct buffer_head *bh = NULL;
+	int status;
+
+	LOG_ENTRY();
+
+	BUG_ON(handle && !ex);
+
+	status = ocfs2_meta_lock_flags_async(inode, ret_bh, ex, flags,
+					     NULL, 0);
+	if (status)
+		goto bail;
+
 	if (ret_bh && !bh) {
 		/* caller wants a buffer head but we haven't read it yet. */
-		status = ocfs_read_block(osb, OCFS_I(inode)->ip_blkno, &bh,
+		status = ocfs_read_block(OCFS2_SB(inode->i_sb),
+					 OCFS_I(inode)->ip_blkno, &bh,
 					 OCFS_BH_CACHED, inode);
 		if (status < 0) {
 			LOG_ERROR_STATUS(status);
@@ -1407,7 +1557,7 @@
 
 	LOG_ENTRY();
 
-	status = ocfs2_cluster_lock(osb, lockres, level, 0);
+	status = ocfs2_cluster_lock(osb, lockres, level, 0, NULL, 0);
 	if (status < 0) {
 		if (status != -EINTR)
 			LOG_ERROR_STATUS(status);
@@ -1449,22 +1599,21 @@
 
 int ocfs2_dlm_init(ocfs_super *osb)
 {
-	int status, pid;
+	int status;
 	u32 dlm_key;
 	dlm_ctxt *dlm = NULL;
 
 	LOG_ENTRY();
 
 	/* launch vote thread */
-	init_completion (&osb->vote_event_init);
-	pid = kernel_thread(ocfs2_vote_thread, osb,
-			    CLONE_FS | CLONE_FILES | CLONE_SIGHAND);
-	if (pid < 0) {
-		status = pid;
+	osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote-%d",
+				     osb->osb_id);
+	if (IS_ERR(osb->vote_task)) {
+		status = PTR_ERR(osb->vote_task);
+		osb->vote_task = NULL;
 		LOG_ERROR_STATUS(status);
 		goto bail;
 	}
-	wait_for_completion(&osb->vote_event_init);
 
 	/* used by the dlm code to make message headers unique, each
 	 * node in this domain must agree on this. */
@@ -1492,24 +1641,14 @@
 
 void ocfs2_dlm_shutdown(ocfs_super *osb)
 {
-	int wait_on_vote_task = 0;
-
 	LOG_ENTRY();
 	ocfs2_drop_super_lock(osb);
 
-	/* needs to be able to deal with the dlm being in many
-	 * different states. */
-	spin_lock(&osb->vote_task_lock);
 	if (osb->vote_task) {
-		osb->vote_exit = 1;
-		ocfs2_kick_vote_thread(osb);
-		wait_on_vote_task = 1;
+		kthread_stop(osb->vote_task);
+		osb->vote_task = NULL;
 	}
-	spin_unlock(&osb->vote_task_lock);
 
-	if (wait_on_vote_task)
-		wait_for_completion(&osb->vote_event_complete);
-
 	ocfs2_lock_res_free(&osb->super_lockres);
 	dlm_unregister_domain(osb->dlm);
 	LOG_EXIT();
@@ -1539,7 +1678,7 @@
 		BUG();
 	}
 	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
-	lockres->l_flags &= ~OCFS2_LOCK_BUSY;
+	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 	spin_unlock(&lockres->l_lock);
 
 	wake_up(&lockres->l_event);
@@ -1566,14 +1705,14 @@
 		goto bail;
 	}
 
-	lockres->l_flags &= ~OCFS2_LOCK_ATTACHED;
+	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
 
 	/* make sure we never get here while waiting for an ast to
 	 * fire. */
 	OCFS_ASSERT(lockres->l_action == OCFS2_AST_INVALID);
 
 	/* is this necessary? */
-	lockres->l_flags |= OCFS2_LOCK_BUSY;
+	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
 	spin_unlock(&lockres->l_lock);
 
@@ -1676,7 +1815,7 @@
 
 	lockres->l_action = OCFS2_AST_DOWNCONVERT;
 	lockres->l_requested = new_level;
-	lockres->l_flags |= OCFS2_LOCK_BUSY;
+	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 	spin_unlock(&lockres->l_lock);
 
 	if (lvb)

Modified: branches/dlm-reco-mig/fs/ocfs2/dlmglue.h
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlmglue.h	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/dlmglue.h	2005-03-04 22:21:29 UTC (rev 1939)
@@ -103,6 +103,12 @@
 			  struct buffer_head **ret_bh,
 			  int ex,
 			  int flags);
+int ocfs2_meta_lock_flags_async(struct inode *inode,
+				struct buffer_head **ret_bh,
+				int ex,
+				int flags,
+				ocfs2_lock_callback cb,
+				unsigned long cb_data);
 void ocfs2_meta_unlock(struct inode *inode,
 		       int ex);
 int ocfs2_super_lock(ocfs_super *osb,

Modified: branches/dlm-reco-mig/fs/ocfs2/file.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/file.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/file.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -38,6 +38,7 @@
 #include "ocfs.h"
 #include "ocfs2.h"
 
+#include "aio.h"
 #include "alloc.h"
 #include "dir.h"
 #include "dlmglue.h"
@@ -174,6 +175,39 @@
 	return (err < 0) ? -EIO : 0;
 }				/* ocfs_sync_file */
 
+static void ocfs2_update_inode_size(struct inode *inode,
+				    u64 new_size)
+{
+	struct super_block *sb = inode->i_sb;
+
+	i_size_write(inode, new_size);
+	inode->i_blocks = (new_size + sb->s_blocksize - 1) >> 
+		sb->s_blocksize_bits;
+}
+
+void ocfs2_file_finish_extension(struct inode *inode, loff_t newsize,
+				 unsigned should_zero)
+{
+	LOG_TRACE_STR("Generic_file_write ok, asking for OIN update now");
+	ocfs2_update_inode_size(inode, newsize);
+
+	if (!should_zero) {
+		/*
+		 * This leaves dirty data in holes.
+		 * Caveat Emptor.
+		 */
+		OCFS_I(inode)->ip_mmu_private = newsize;
+	} else {
+		int status = ocfs2_zero_extend(inode);
+		/*
+		 * Don't overwrite the result of
+		 * generic_file_write
+		 */
+		if (status)
+			LOG_ERROR_ARGS("Unable to pre-zero extension of inode (%d)", status);
+	}
+}
+
 /*
  * ocfs_file_write()
  * Linux 2.6 TODO: Remove all O_DIRECT conditionals here, they are no longer
@@ -183,18 +217,11 @@
 		size_t count, loff_t *ppos)
 {
 	int ret = 0;
-	int extended = 0;
 	ocfs_super *osb = NULL;
 	struct dentry *dentry = filp->f_dentry;
 	struct inode *inode = dentry->d_inode;
-	int status;
-	u64 newsize;
-	struct super_block *sb = inode->i_sb;
-	int do_direct_io = 0;
 	int sector_size;
-	int have_i_sem = 0;
-	int level = filp->f_flags & O_APPEND;
-	loff_t saved_ppos;
+	struct ocfs2_write_lock_info info = {0, };
 	DECLARE_BUFFER_LOCK_CTXT(ctxt);
 
 	LOG_SET_CONTEXT(WRITE);
@@ -219,137 +246,11 @@
 	osb = OCFS_SB(inode->i_sb);
 	sector_size = 1 << osb->s_sectsize_bits;
 
-	down(&inode->i_sem);
-	have_i_sem = 1;
-
-	ret = ocfs2_setup_io_locks(inode->i_sb, inode, (char *) buf,
-				   count, &ctxt);
-	if (ret < 0) {
-		LOG_ERROR_STATUS(ret);
+	ret = ocfs_write_lock_maybe_extend(filp, buf, count, ppos, &info,
+					   &ctxt);
+	if (ret)
 		goto bail;
-	}
 
-	/* This will lock everyone in the context who's order puts
-	 * them before us. */
-	ret = ocfs2_lock_buffer_inodes(&ctxt, inode);
-	if (ret < 0) {
-		if (ret != -EINTR)
-			LOG_ERROR_STATUS(ret);
-		goto bail;
-	}
-
-	ctxt.b_lock_data_write = 1;
-lock:
-	status = ocfs2_meta_lock(inode, NULL, NULL, level);
-	if (status < 0) {
-		if (status != -EINTR)
-			LOG_ERROR_STATUS(status);
-		ret = status;
-		goto bail;
-	}
-	/* to handle extending writes, we do a bit of our own locking
-	 * here, but we setup the ctxt do unlock for us (as well as
-	 * handle locking everything else. */
-	if (level)
-		ctxt.b_lock_meta_write = 1;
-
-	/* work on a copy of ppos until we're sure that we won't have
-	 * to recalculate it due to relocking. */
-	saved_ppos = *ppos;
-
-	if (filp->f_flags & O_APPEND) {
-		saved_ppos = i_size_read(inode);
-		LOG_TRACE_ARGS("O_APPEND: inode->i_size=%llu\n", saved_ppos);
-
-		/* ugh, work around some applications which open
-		 * everything O_DIRECT + O_APPEND and really don't
-		 * mean to use O_DIRECT. */
-#warning this is wrong wrong wrong
-		filp->f_flags &= ~O_DIRECT;
-	}
-
-	if (filp->f_flags & O_DIRECT) {
-		/* anything special for o_direct? */
-		LOG_TRACE_STR ("O_DIRECT");
-		if ((saved_ppos & (sector_size - 1)) || 
-		    (count & (sector_size - 1)) || 
-		    ((unsigned long)buf & (sector_size - 1))) {
-			do_direct_io = 0;
-			filp->f_flags |= O_SYNC;
-		} else {
-			do_direct_io = 1;
-		}
-	}
-	ctxt.b_lock_direct = do_direct_io;
-
-	newsize = count + saved_ppos;
-	if (filp->f_flags & O_APPEND)
-		newsize = count + i_size_read(inode);
-
-	LOG_TRACE_ARGS ("ppos=%llu newsize=%llu cursize=%llu\n",
-			saved_ppos, newsize, i_size_read(inode));
-
-	if (newsize > i_size_read(inode)) {
-		if (!level) {
-			/* we want an extend, but need a higher
-			 * level cluster lock. */
-			LOG_TRACE_ARGS("inode %llu, had a PR, looping back "
-				       "for EX\n", OCFS_I(inode)->ip_blkno);
-			ocfs2_meta_unlock(inode, level);
-			level = 1;
-			goto lock;
-		}
-		extended = 1;
-
-		LOG_TRACE_ARGS("Writing at EOF, will need more allocation: "
-			       "i_size=%llu, need=%llu\n",
-			       i_size_read(inode), newsize);
-
-		status = ocfs_extend_file(osb, inode, newsize);
-		if (status < 0) {
-			if (status != -EINTR && status != -ENOSPC) {
-				LOG_ERROR_STATUS (status);
-				LOG_ERROR_ARGS ("Failed to extend file from "
-						"%llu to %llu",
-			     		*ppos, newsize);
-				ret = -ENOSPC;
-			} else
-				ret = status;
-
-			ocfs2_meta_unlock(inode, level);
-			goto bail_unlock;
-		}
-	}
-
-	/* we've got whatever cluster lock is appropriate now, so we
-	 * can stuff *ppos back. */
-	*ppos = saved_ppos;
-
-	if (!do_direct_io) {
-		status = ocfs2_data_lock(inode, 1);
-		if (status < 0) {
-			if (status != -EINTR)
-				LOG_ERROR_STATUS(status);
-			ret = status;
-
-			ocfs2_meta_unlock(inode, level);
-			goto bail_unlock;
-		}
-	}
-
-	/* Alright, fool the io locking stuff into thinking it's
-	 * handled our inode for us. We can now count on it to do the
-	 * unlock for us. */
-	ctxt.b_target->ba_locked = 1;
-
-	/* This will lock everyone who's order puts them *after* our inode. */
-	ret = ocfs2_lock_buffer_inodes(&ctxt, NULL);
-	if (ret < 0) {
-		if (ret != -EINTR)
-			LOG_ERROR_STATUS(ret);
-		goto bail_unlock;
-	}
-
 	down_read(&OCFS_I(inode)->ip_alloc_sem);
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
 	/* 
@@ -363,7 +264,7 @@
 		/* ick.  seems to be our only way of toggling 
 		 * directio for 2.6 */
 		unsigned int savedflags = filp->f_flags;
-		if (do_direct_io) 
+		if (info.wl_do_direct_io) 
 			filp->f_flags |= O_DIRECT;
 		else
 			filp->f_flags &= ~O_DIRECT;
@@ -371,46 +272,28 @@
 		filp->f_flags = savedflags;
 	}
 #else
-	if (do_direct_io)
+	if (info.wl_do_direct_io)
 		ret = ocfs_rw_direct (WRITE, filp, (char *) buf, count, ppos);
 	else
 		ret = generic_file_write_nolock (filp, buf, count, ppos);
 #endif
 	up_read(&OCFS_I(inode)->ip_alloc_sem);
 
-	if (extended) {
-		LOG_TRACE_STR
-		    ("Generic_file_write ok, asking for OIN update now");
-		i_size_write(inode, newsize);
-		inode->i_blocks = (newsize + sb->s_blocksize - 1) >> sb->s_blocksize_bits;
-		if (do_direct_io) {
-			/*
-			 * This leaves dirty data in holes.
-			 * Caveat Emptor.
-			 */
-			OCFS_I(inode)->ip_mmu_private = i_size_read(inode);
-		} else {
-			status = ocfs2_zero_extend(inode);
-			/*
-			 * Don't overwrite the result of
-			 * generic_file_write
-			 */
-			if (status)
-				LOG_ERROR_ARGS("Unable to pre-zero extension of inode (%d)", status);
-		}
-	}
-
-bail_unlock:
-	ocfs2_unlock_buffer_inodes(&ctxt);
-
 bail:
-	if (have_i_sem)
+	/* we might have to finish up extentions that were performed before
+	 * an error was returned by, say, data locking */
+	if (info.wl_extended)
+		ocfs2_file_finish_extension(inode, info.wl_newsize,
+					    !info.wl_do_direct_io);
+	if (info.wl_unlock_ctxt)
+		ocfs2_unlock_buffer_inodes(&ctxt);
+	if (info.wl_have_i_sem)
 		up(&inode->i_sem);
 	LOG_EXIT_INT (ret);
 
 	LOG_CLEAR_CONTEXT();
 	return ret;
-}				/* ocfs_file_write */
+}
 
 /*
  * ocfs_file_read()
@@ -424,6 +307,7 @@
 	ocfs_super *osb = NULL;
 	struct dentry *dentry = filp->f_dentry;
 	struct inode *inode = dentry->d_inode;
+	ocfs2_backing_inode *target_binode;
 	int status = 0;
 	int do_direct_io = 0;
 	int sector_size;
@@ -454,14 +338,16 @@
 			do_direct_io = 1;
 		}
 	}
-	ctxt.b_lock_direct = do_direct_io;
 
-	ret = ocfs2_setup_io_locks(inode->i_sb, inode, buf, count, &ctxt);
+	ret = ocfs2_setup_io_locks(inode->i_sb, inode, buf, count, &ctxt,
+				   &target_binode);
 	if (ret < 0) {
 		LOG_ERROR_STATUS(status);
 		goto bail;
 	}
 
+	target_binode->ba_lock_data = do_direct_io ? 0 : 1;
+
 	ret = ocfs2_lock_buffer_inodes(&ctxt, NULL);
 	if (ret < 0) {
 		if (ret != -EINTR)
@@ -516,8 +402,8 @@
 	.open = ocfs_file_open,
 	.ioctl = ocfs_ioctl,
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
-	.aio_read = generic_file_aio_read,
-	.aio_write = generic_file_aio_write,
+	.aio_read = ocfs2_file_aio_read,
+	.aio_write = ocfs2_file_aio_write,
 #else
 	.aio_read = ocfs_aio_read,
 	.aio_write = ocfs_aio_write,
@@ -867,10 +753,15 @@
  *
  * Ok, this function is heavy on the goto's - we need to clean it up a
  * bit.
+ *
+ * *bytes_extended is a measure of how much was added to
+ * dinode->i_size, NOT how much allocated was actually added to the
+ * file. It will always be correct, even when we return an error.
  */
 int ocfs_extend_file(ocfs_super *osb, 
-		     struct inode *inode,
-		     u64 new_i_size)
+		    struct inode *inode,
+		    u64 new_i_size,
+		    u64 *bytes_extended)
 {
 	int status = 0;
 	int restart_func = 0;
@@ -879,6 +770,7 @@
 	int credits, num_free_extents;
 	unsigned int overalloc_bits = 0;
 	u32 clusters_to_add;
+	u64 new_fe_size;
 	struct buffer_head *bh = NULL;
 	ocfs2_dinode *fe;
 	ocfs_journal_handle *handle = NULL;
@@ -888,6 +780,8 @@
 
 	LOG_ENTRY_ARGS("(new_i_size=%llu)\n", new_i_size);
 
+	*bytes_extended = 0;
+
 	/* setattr sometimes calls us like this. */
 	if (new_i_size == 0)
 		goto leave;
@@ -908,7 +802,7 @@
 
 	fe = (ocfs2_dinode *) bh->b_data;
 	OCFS_ASSERT(IS_VALID_FILE_ENTRY(fe));
-	OCFS_ASSERT(i_size_read(inode) == fe->i_size);
+	OCFS_ASSERT(i_size_read(inode) == (fe->i_size - *bytes_extended));
 	OCFS_ASSERT(new_i_size >= i_size_read(inode));
 
 	if (i_size_read(inode) == new_i_size)
@@ -1019,11 +913,13 @@
 		} else {
 			OCFS_ASSERT(why == RESTART_TRANS);
 
+			new_fe_size = ocfs2_clusters_to_bytes(osb->sb,
+							      fe->i_clusters);
+			*bytes_extended += new_fe_size - fe->i_size;
 			/* update i_size in case we crash after the
 			 * extend_trans */
-			fe->i_size =
-				ocfs2_clusters_to_bytes(osb->sb,
-							fe->i_clusters);
+			fe->i_size = new_fe_size;
+
 			fe->i_mtime = OCFS_CURRENT_TIME;
 
 			status = ocfs_journal_dirty(handle, bh);
@@ -1056,11 +952,13 @@
 no_alloc:
 	/* this may not be the end of our allocation so only update
 	 * i_size to what's appropriate. */
-	if (new_i_size > ocfs2_clusters_to_bytes(osb->sb, fe->i_clusters))
-		fe->i_size = ocfs2_clusters_to_bytes(osb->sb, fe->i_clusters);
-	else
-		fe->i_size = new_i_size;
+	new_fe_size = ocfs2_clusters_to_bytes(osb->sb, fe->i_clusters);
+	if (new_i_size < new_fe_size)
+		new_fe_size = new_i_size;
 
+	*bytes_extended += new_fe_size - fe->i_size;
+	fe->i_size = new_fe_size;
+
 	LOG_TRACE_ARGS("fe: i_clusters = %u, i_size=%llu\n", 
 		       fe->i_clusters, fe->i_size);
 
@@ -1100,6 +998,7 @@
 		restart_func = 0;
 		goto restart_all;
 	}
+
 	LOG_EXIT_STATUS (status);
 	return status;
 }				/* ocfs_extend_file */
@@ -1112,7 +1011,7 @@
 {
 	int status = 0;
 	int unlock = 0;
-	u64 newsize;
+	u64 newsize, bytes_added;
 	struct inode *inode = dentry->d_inode;
 	struct super_block *sb = inode->i_sb;
 	ocfs_super *osb = OCFS2_SB(sb);
@@ -1156,19 +1055,32 @@
 
 	if (attr->ia_valid & ATTR_SIZE &&
 	    newsize != i_size_read(inode)) {
+		bytes_added = 0;
+
 		if (i_size_read(inode) > newsize)
 			status = ocfs_truncate_file(osb, newsize, inode);
 		else
-			status = ocfs_extend_file(osb, inode, newsize);
-		if (status < 0) {
+			status = ocfs_extend_file(osb, inode, newsize,
+						  &bytes_added);
+		if (status < 0 && (!bytes_added)) {
 			if (status != -EINTR && status != -ENOSPC)
 				LOG_ERROR_STATUS(status);
 			status = -ENOSPC;
 			goto bail;
 		}
+
+		/* partial extend, we continue with what we've got. */
+		if (status < 0 && status != -ENOSPC && status != -EINTR)
+			LOG_ERROR_ARGS("status return of %d extending inode "
+				       "%llu\n", status,
+				       OCFS_I(inode)->ip_blkno);
+		status = 0;
+
+		newsize = bytes_added + i_size_read(inode);
+		if (bytes_added)
+			ocfs2_update_inode_size(inode, newsize);
+
 		spin_lock(&OCFS_I(inode)->ip_lock);
-		i_size_write(inode, newsize);
-		inode->i_blocks = (newsize + sb->s_blocksize - 1) >> sb->s_blocksize_bits;
 		if (OCFS_I(inode)->ip_flags & OCFS_INODE_OPEN_DIRECT) {
 			/* This is a total broken hack for O_DIRECT crack */
 			OCFS_I(inode)->ip_mmu_private = i_size_read(inode);

Modified: branches/dlm-reco-mig/fs/ocfs2/file.h
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/file.h	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/file.h	2005-03-04 22:21:29 UTC (rev 1939)
@@ -43,11 +43,12 @@
 			   struct _ocfs2_alloc_context *data_ac,
 			   struct _ocfs2_alloc_context *meta_ac,
 			   enum ocfs2_alloc_restarted *reason);
-int ocfs_extend_file(ocfs_super *osb, 
-		     struct inode *inode,
-		     u64 new_i_size);
 int ocfs_setattr(struct dentry *dentry, struct iattr *attr);
 int ocfs_sync_inode(struct inode *inode);
+int ocfs_extend_file(ocfs_super *osb, 
+		    struct inode *inode,
+		    u64 new_i_size,
+		    u64 *bytes_extended);
 
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
 int ocfs_getattr(struct vfsmount *mnt, struct dentry *dentry,
@@ -59,4 +60,7 @@
 			struct buffer_head *fe_bh,
 			u64 new_i_size);
 
+void ocfs2_file_finish_extension(struct inode *inode, loff_t newsize,
+				 unsigned should_zero);
+
 #endif /* OCFS2_FILE_H */

Modified: branches/dlm-reco-mig/fs/ocfs2/heartbeat.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/heartbeat.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/heartbeat.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -62,25 +62,21 @@
 				int node_num,
 				void *data);
 
-static void __ocfs_node_map_dup(ocfs_super *osb,
-				ocfs_node_map *target,
-				ocfs_node_map *from);
 static inline void __ocfs_node_map_set_bit(ocfs_node_map *map,
 					   int bit);
 static inline void __ocfs_node_map_clear_bit(ocfs_node_map *map,
 					     int bit);
 static inline int __ocfs_node_map_is_empty(ocfs_node_map *map);
-static void __ocfs_node_map_dup(ocfs_super *osb,
-				ocfs_node_map *target,
+static void __ocfs_node_map_dup(ocfs_node_map *target,
 				ocfs_node_map *from);
 static void __ocfs_node_map_set(ocfs_node_map *target, ocfs_node_map *from);
 
 void ocfs2_init_node_maps(ocfs_super *osb)
 {
 	spin_lock_init(&osb->node_map_lock);
-	ocfs_node_map_init(osb, &osb->mounted_map);
-	ocfs_node_map_init(osb, &osb->recovery_map);
-	ocfs_node_map_init(osb, &osb->umount_map);
+	ocfs_node_map_init(&osb->mounted_map);
+	ocfs_node_map_init(&osb->recovery_map);
+	ocfs_node_map_init(&osb->umount_map);
 }
 
 static void ocfs2_hb_node_down_cb(struct inode *group,
@@ -163,10 +159,9 @@
 
 /* special case -1 for now
  * TODO: should *really* make sure the calling func never passes -1!!  */
-void ocfs_node_map_init(ocfs_super *osb,
-			ocfs_node_map *map)
+void ocfs_node_map_init(ocfs_node_map *map)
 {
-	map->num_nodes = osb->max_nodes;
+	map->num_nodes = OCFS_NODE_MAP_MAX_NODES;
 	memset(map->map, 0, BITS_TO_LONGS(OCFS_NODE_MAP_MAX_NODES) * 
 	       sizeof(unsigned long));
 }
@@ -242,12 +237,11 @@
 	return ret;
 }
 
-static void __ocfs_node_map_dup(ocfs_super *osb,
-				ocfs_node_map *target,
+static void __ocfs_node_map_dup(ocfs_node_map *target,
 				ocfs_node_map *from)
 {
 	OCFS_ASSERT(from->num_nodes > 0);
-	ocfs_node_map_init(osb, target);
+	ocfs_node_map_init(target);
 	__ocfs_node_map_set(target, from);
 }
 
@@ -260,7 +254,7 @@
 	int ret;
 
 	spin_lock(&osb->node_map_lock);
-	__ocfs_node_map_dup(osb, &temp, target);
+	__ocfs_node_map_dup(&temp, target);
 	__ocfs_node_map_clear_bit(&temp, bit);
 	ret = __ocfs_node_map_is_empty(&temp);
 	spin_unlock(&osb->node_map_lock);

Modified: branches/dlm-reco-mig/fs/ocfs2/heartbeat.h
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/heartbeat.h	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/heartbeat.h	2005-03-04 22:21:29 UTC (rev 1939)
@@ -33,7 +33,7 @@
 
 /* node map functions - used to keep track of mounted and in-recovery
  * nodes. */
-void ocfs_node_map_init(ocfs_super *osb, ocfs_node_map *map);
+void ocfs_node_map_init(ocfs_node_map *map);
 int ocfs_node_map_is_empty(ocfs_super *osb,
 			   ocfs_node_map *map);
 void ocfs_node_map_set_bit(ocfs_super *osb,

Modified: branches/dlm-reco-mig/fs/ocfs2/journal.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/journal.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/journal.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -29,6 +29,7 @@
 #include <linux/types.h>
 #include <linux/slab.h>
 #include <linux/highmem.h>
+#include <linux/kthread.h>
 
 #include "ocfs_log.h"
 #include "ocfs.h"
@@ -60,7 +61,6 @@
 static int __ocfs_recovery_thread(void *arg);
 static int ocfs_commit_cache (ocfs_super * osb);
 static int ocfs_wait_on_mount(ocfs_super *osb);
-static int ocfs2_commit_thread_sleep(ocfs_super *osb);
 static void ocfs_handle_cleanup_locks(ocfs_journal *journal, 
 				      ocfs_journal_handle *handle,
 				      int set_id);
@@ -502,11 +502,11 @@
 	switch (type) {
 	case OCFS_JOURNAL_ACCESS_CREATE:
 	case OCFS_JOURNAL_ACCESS_WRITE:
-		status = journal_get_write_access(handle->k_handle, bh, NULL);
+		status = kapi_journal_get_write_access(handle->k_handle, bh);
 		break;
 
 	case OCFS_JOURNAL_ACCESS_UNDO:
-		status = journal_get_undo_access(handle->k_handle, bh, NULL);
+		status = kapi_journal_get_undo_access(handle->k_handle, bh);
 		break;
 
 	default:
@@ -778,14 +778,11 @@
 	/* The OCFS_JOURNAL_IN_SHUTDOWN will signal to commit_cache to not
 	 * drop the trans_lock (which we want to hold until we
 	 * completely destroy the journal. */
-	if (osb->commit && osb->commit->c_task) {
+	if (osb->commit_task) {
 		/* Wait for the commit thread */
 		LOG_TRACE_STR ("Waiting for ocfs2commit to exit....");
-		atomic_set (&osb->needs_checkpoint, 1);
-		wake_up (&osb->checkpoint_event);
-		wait_for_completion(&osb->commit->c_complete);
-		osb->commit->c_task = NULL;
-		kfree(osb->commit);
+		kthread_stop(osb->commit_task);
+		osb->commit_task = NULL;
 	}
 
 	OCFS_ASSERT(atomic_read(&(osb->journal->num_trans)) == 0);
@@ -818,7 +815,6 @@
 {
 	int status = 0;
 	int olderr = 0;
-	int child_pid;
 	ocfs_super *osb;
 
 	LOG_ENTRY();
@@ -848,22 +844,15 @@
 	}
 
 	/* Launch the commit thread */
-	osb->commit = kmalloc(sizeof(ocfs_commit_task), GFP_KERNEL);
-	if (osb->commit == NULL) {
-		LOG_ERROR_STATUS(status = -ENOMEM);
+	osb->commit_task = kthread_run(ocfs_commit_thread, osb, "ocfs2cmt-%d",
+				       osb->osb_id);
+	if (IS_ERR(osb->commit_task)) {
+		status = PTR_ERR(osb->commit_task);
+		osb->commit_task = NULL;
+		LOG_ERROR_ARGS ("unable to launch ocfs2commit thread, "
+				"error=%d", status);
 		goto done;
-	}
-	memset(osb->commit, 0, sizeof(ocfs_commit_task));
-	child_pid = kernel_thread (ocfs_commit_thread, osb,
-				   CLONE_FS | CLONE_FILES | CLONE_SIGHAND);
-	if (child_pid < 0) {
-		LOG_ERROR_ARGS ("unable to launch ocfs2commit thread, error=%d",
-				child_pid);
-		status = child_pid;
-		goto done;
-	} else {
-		init_completion (&osb->commit->c_complete);
-	}
+	} 
 
 done:
 	LOG_EXIT_STATUS(status);
@@ -969,13 +958,9 @@
 	ocfs_super *osb = arg;
 	int status = 0;
 	int node_num;
-	char proc[16];
 
 	LOG_ENTRY();
 
-	sprintf (proc, "ocfs2rec-%d", osb->osb_id);
-	ocfs_daemonize (proc, strlen(proc), 0);
-
 	status = ocfs_wait_on_mount(osb);
 	if (status < 0) {
 		if (status == -EBUSY)
@@ -1020,12 +1005,17 @@
 		goto restart;
 	}
 
-	osb->recovery_launched = 0;
+	osb->recovery_thread_task = NULL;
+	mb(); /* sync with ocfs2_recovery_thread_running */
 	wake_up(&osb->recovery_event);
 
 	up(&osb->recovery_lock);
 
 	LOG_EXIT_STATUS(status);
+	/* no one is callint kthread_stop() for us so the kthread() api
+	 * requires that we call do_exit().  And it isn't exported, but
+	 * complete_and_exit() seems to be a minimal wrapper around it. */
+	complete_and_exit(NULL, status);
 	return status;
 }
 
@@ -1035,19 +1025,25 @@
 		       node_num, osb->node_num);
 
 	down(&osb->recovery_lock);
-	if (!osb->disable_recovery) {
-		/* People waiting on recovery will wait on
-		 * the recovery map to empty. */
-		ocfs_recovery_map_set(osb, node_num);
+	if (osb->disable_recovery)
+		goto out;
+	/* People waiting on recovery will wait on
+	 * the recovery map to empty. */
+	ocfs_recovery_map_set(osb, node_num);
 
-		LOG_TRACE_STR("starting recovery thread...");
+	LOG_TRACE_STR("starting recovery thread...");
 
-		if (!osb->recovery_launched) {
-			kernel_thread(__ocfs_recovery_thread, osb,
-				      CLONE_VM | CLONE_FS | CLONE_FILES);
-			osb->recovery_launched = 1;
-		}
+	if (osb->recovery_thread_task)
+		goto out;
+
+	osb->recovery_thread_task =  kthread_run(__ocfs_recovery_thread, osb,
+						 "ocfs2rec-%d", osb->osb_id);
+	if (IS_ERR(osb->recovery_thread_task)) {
+		LOG_ERROR_STATUS((int)PTR_ERR(osb->recovery_thread_task));
+		osb->recovery_thread_task = NULL;
 	}
+
+out:
 	up(&osb->recovery_lock);
 	wake_up(&osb->recovery_event);
 
@@ -1455,104 +1451,42 @@
 	goto retry;
 }
 
-static int ocfs2_commit_thread_sleep(ocfs_super *osb)
-{
-	int status;
-	signed long timeout = OCFS_CHECKPOINT_INTERVAL;
-	DECLARE_WAITQUEUE(wait, current);
-
-	if (atomic_read(&osb->needs_checkpoint))
-		return 0;
-
-	status = 0;
-	add_wait_queue(&osb->checkpoint_event, &wait);
-	while (1) {
-		set_current_state(TASK_INTERRUPTIBLE);
-
-		if (atomic_read(&osb->needs_checkpoint))
-			break;
-
-		if (!signal_pending(current)) {
-			timeout = schedule_timeout(timeout);
-			if (!timeout) {
-				status = -ETIMEDOUT;
-				break;
-			}
-			continue;
-		}
-		status = -EINTR;
-		break;
-	}
-	set_current_state(TASK_RUNNING);
-	remove_wait_queue(&osb->checkpoint_event, &wait);
-
-	return status;
-}
-
 int ocfs_commit_thread(void *arg)
 {
-	int status = 0, misses = 0;
+	int status;
 	ocfs_super *osb = arg;
-	ocfs_commit_task *commit = osb->commit;
-	char name[16];
 	ocfs_journal *journal = osb->journal;
-	siginfo_t info;
 
-	sprintf (name, "ocfs2cmt-%d", osb->osb_id);
-	ocfs_daemonize (name, strlen(name), 0);
+	/* we can trust num_trans here because _should_stop() is only set in
+	 * shutdown and nobody other than ourselves should be able to start
+	 * transactions.  committing on shutdown might take a few iterations
+	 * as final transactions put deleted inodes on the list */
+	while (!(kthread_should_stop() && 
+		 atomic_read(&journal->num_trans) == 0)) {
 
-	commit->c_task = current;
+		status = wait_event_timeout(osb->checkpoint_event,
+					    atomic_read(&journal->num_trans) ||
+						kthread_should_stop(),
+					    OCFS_CHECKPOINT_INTERVAL);
+		if (status == 0)
+			LOG_TRACE_STR("timed out");
+		else if (status == -ERESTARTSYS)
+			LOG_TRACE_STR("signal pending\n");
+		else
+			LOG_TRACE_STR("woken");
 
-	misses = 0;
-	while (1) {
-		status = ocfs2_commit_thread_sleep(osb);
-		atomic_set (&osb->needs_checkpoint, 0);
-
-		switch (status) {
-			case -ETIMEDOUT:
-				LOG_TRACE_STR("timed out");
-				break;
-			case -EINTR:
-				LOG_ERROR_STR("Commit thread got a signal!");
-				/* ignore the actual signal */
-				if (signal_pending(current)) {
-					dequeue_signal_lock(current, 
-							    &current->blocked, 
-							    &info);
-				}
-				break;
-			case 0:
-				LOG_TRACE_STR("woken\n");
-				break;
-			default:
-				LOG_ERROR_STR("invalid status!\n");
-				break;
-		}
-
-skip_sleep:
 		status = ocfs_commit_cache(osb);
 		if (status < 0)
 			LOG_ERROR_STATUS(status);
 
-		/* journal shutdown has asked me to sync up and then
-		 * exit. We might still have transactions pending as
-		 * our last commit might've put deleted inodes on the
-		 * list so we loop back around. */
-		if (journal->state == OCFS_JOURNAL_IN_SHUTDOWN) {
-			/* we can trust num_trans here because we're
-			 * in shutdown and nobody other than ourselves
-			 * should be able to start more. */
-			if (atomic_read(&journal->num_trans) == 0)
-				break;
 #ifdef VERBOSE_COMMIT_THREAD
+		if (kthread_should_stop() && atomic_read(&journal->num_trans)){
 			printk("(%u) commit_thread: %u transactions pending "
-			       "on shutdown\n", 
-			       current->pid, atomic_read(&journal->num_trans));
-#endif
-			goto skip_sleep;
+			       "on shutdown\n", current->pid,
+			       atomic_read(&journal->num_trans));
 		}
+#endif
 	}
 
-	complete (&(commit->c_complete));
 	return 0;
 }

Modified: branches/dlm-reco-mig/fs/ocfs2/mmap.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/mmap.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/mmap.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -49,7 +49,8 @@
 static inline struct rb_node * __ocfs2_buffer_lock_ctxt_root(
 	ocfs2_buffer_lock_ctxt *ctxt);
 static int ocfs2_buffer_lock_ctxt_insert(ocfs2_buffer_lock_ctxt *ctxt,
-					 struct inode *inode);
+					 struct inode *inode,
+					 ocfs2_backing_inode **binode_ret);
 static int ocfs2_fill_ctxt_from_buf(struct super_block *sb,
 				    struct inode *target_inode,
 				    char *buf,
@@ -163,7 +164,8 @@
 }
 
 static int ocfs2_buffer_lock_ctxt_insert(ocfs2_buffer_lock_ctxt *ctxt,
-					 struct inode *inode)
+					 struct inode *inode,
+					 ocfs2_backing_inode **binode_ret)
 {
 	u64 blkno;
 	ocfs2_backing_inode *tmp, *binode;
@@ -194,14 +196,15 @@
 			return 0; /* Don't insert duplicates */
 	}
 
-	binode = kmalloc(sizeof(ocfs2_backing_inode), GFP_KERNEL);
+	binode = kcalloc(1, sizeof(ocfs2_backing_inode), GFP_KERNEL);
 	if (!binode)
 		return -ENOMEM;
-	memset(binode, 0, sizeof(ocfs2_backing_inode));
 	binode->ba_inode = inode;
-	binode->ba_locked = 0;
 	ocfs2_init_io_marker(&binode->ba_task);
 
+	if (binode_ret)
+		*binode_ret = binode;
+
 	rb_link_node(&binode->ba_node, parent, p);
 	rb_insert_color(&binode->ba_node, &ctxt->b_inodes);
 
@@ -231,7 +234,8 @@
 			if (inode->i_sb == sb &&
 			    inode != target_inode) {
 				status = ocfs2_buffer_lock_ctxt_insert(ctxt,
-								       inode);
+								       inode,
+								       NULL);
 				if (status < 0)
 					goto bail;
 			}
@@ -246,12 +250,12 @@
 			 struct inode *target_inode,
 			 char *buf,
 			 size_t size,
-			 ocfs2_buffer_lock_ctxt *ctxt)
+			 ocfs2_buffer_lock_ctxt *ctxt,
+			 ocfs2_backing_inode **target_binode)
 {
 	int skip_sem = current->flags & PF_DUMPCORE;
 	int status;
 	struct mm_struct *mm = current->mm;
-	struct rb_node *first;
 
 	OCFS_ASSERT(mm);
 
@@ -260,15 +264,12 @@
 
 	OCFS_ASSERT(!__ocfs2_buffer_lock_ctxt_root(ctxt));
 
-	/* We always insert target because it might not be backing
-	   part of the buffer - but it needs to be in there so that
-	   it's lock gets ordered with everything else */
-	status = ocfs2_buffer_lock_ctxt_insert(ctxt, target_inode);
+	/* We always insert target because it might not be backing part of the
+	 * buffer - but it needs to be in there so that it's lock gets ordered
+	 * with everything else */
+	status = ocfs2_buffer_lock_ctxt_insert(ctxt, target_inode,
+					       target_binode);
 	if (!status) {
-		/* The assert above guarantees that this will work. */
-		ctxt->b_target = rb_entry(__ocfs2_buffer_lock_ctxt_root(ctxt),
-					  ocfs2_backing_inode, ba_node);
-
 		/* Now fill the tree with any inodes that back this
 		 * buffer. If target inode is in there, it will be
 		 * skipped over. */
@@ -285,14 +286,46 @@
 		goto bail;
 	}
 
-	first = rb_first(&ctxt->b_inodes);
-	ctxt->b_head = rb_entry(first, ocfs2_backing_inode, ba_node);
-
 	status = 0;
 bail:
 	return status;
 }
 
+/* starting from pos, which can be null for the first call, give the
+ * next buffer that needs unlocking.  we return null when there are none
+ * left or we see last_inode */
+static ocfs2_backing_inode *ocfs2_next_unlocked(ocfs2_buffer_lock_ctxt *ctxt,
+						struct inode *last_inode,
+						ocfs2_backing_inode *pos)
+{
+	ocfs2_backing_inode *binode = NULL;
+	struct rb_node *node = NULL;
+
+	if (pos == NULL) {
+		if (ctxt->b_next_unlocked)
+			binode = ctxt->b_next_unlocked;
+		else
+			node = rb_first(&ctxt->b_inodes);
+	} else 
+		node = rb_next(&pos->ba_node);
+
+	if (node)
+		binode = rb_entry(node, ocfs2_backing_inode, ba_node);
+
+	if (binode && last_inode && binode->ba_inode == last_inode)
+		binode = NULL;
+
+	/* this is just an optimization to skip nodes in the tree
+	 * that we've already seen.  If we're moving from one we've locked
+	 * to one we haven't then we mark this node in the ctxt so that
+	 * we'll return to it in a future after, say, hitting last_inode
+	 * or EIOCBQUEUED in lock_buffer_inodes */
+	if (pos && pos->ba_locked && binode)
+		ctxt->b_next_unlocked = binode;
+
+	return binode;
+}
+
 /* Will take locks on all inodes in the ctxt up until 'last_inode'. If
  * last_inode is NULL, then we take locks on everything. We mark lock
  * status on the context so we skip any that have already been
@@ -303,43 +336,40 @@
 int ocfs2_lock_buffer_inodes(ocfs2_buffer_lock_ctxt *ctxt,
 			     struct inode *last_inode)
 {
-	int status, meta_level, data_level;
-	ocfs2_backing_inode *binode;
+	int status, data_level;
+	ocfs2_backing_inode *binode = NULL;
 	struct inode *inode;
-	struct rb_node *node;
 
-	binode = ctxt->b_head;
-
-	while(binode) {
+	while((binode = ocfs2_next_unlocked(ctxt, last_inode, binode))) {
+		/* the tricksy caller might have locked inodes themselves
+		 * between calls. */ 
+		if (binode->ba_locked)
+			continue;
 		inode = binode->ba_inode;
-		if (inode == last_inode)
-			break;
 
-		if (binode->ba_locked)
-			goto skip_locking;
-
-		meta_level = 0;
-		if (ocfs2_buffer_lock_is_target(ctxt, inode))
-			meta_level = ctxt->b_lock_meta_write;
-
-		status = ocfs2_meta_lock(inode, NULL, NULL, meta_level);
-		if (status < 0) {
-			if (status != -EINTR)
-				LOG_ERROR_STATUS(status);
-			goto bail;
+		if (!binode->ba_meta_locked) {
+			status = ocfs2_meta_lock_flags_async(inode, NULL,
+						binode->ba_lock_meta_level,
+						0,
+						ctxt->b_cb,
+						ctxt->b_cb_data);
+			if (status < 0) {
+				if (status != -EINTR)
+					LOG_ERROR_STATUS(status);
+				goto bail;
+			}
+			binode->ba_meta_locked = 1;
 		}
 
-		/* If we're doing direct IO, then skip data locking on
-		 * the target. */
-		if (!ocfs2_buffer_lock_is_target(ctxt, inode) ||
-		    !ctxt->b_lock_direct) {
-			data_level = 0;
-			if (ocfs2_buffer_lock_is_target(ctxt, inode))
-				data_level = ctxt->b_lock_data_write;
-
+		/* ba_lock_data isn't set for direct io */
+		if (binode->ba_lock_data) {
+			data_level = binode->ba_lock_data_level;
 			status = ocfs2_data_lock(inode, data_level);
 			if (status < 0) {
-				ocfs2_meta_unlock(inode, meta_level);
+				if (status == -EIOCBQUEUED)
+					goto bail;
+				ocfs2_meta_unlock(inode,
+						  binode->ba_lock_meta_level);
 
 				if (status != -EINTR)
 					LOG_ERROR_STATUS(status);
@@ -347,17 +377,9 @@
 			}
 		}
 		ocfs2_add_io_marker(inode, &binode->ba_task);
-
 		binode->ba_locked = 1;
-skip_locking:
-		node = rb_next(&binode->ba_node);
-		binode = NULL;
-		if (node)
-			binode = rb_entry(node, ocfs2_backing_inode, ba_node);
 	}
 
-	ctxt->b_head = binode;
-
 	status = 0;
 bail:
 	return status;
@@ -365,44 +387,233 @@
 
 void ocfs2_unlock_buffer_inodes(ocfs2_buffer_lock_ctxt *ctxt)
 {
-	int level;
 	ocfs2_backing_inode *binode;
-	struct inode *inode;
-	struct rb_node *node, *tmp;
+	struct rb_node *node;
 
-	node = rb_first(&ctxt->b_inodes);
-	while(node) {
+	/* dlm locks don't mask ints.. this should be lower down */
+	BUG_ON(in_interrupt());
+
+	/* unlock in reverse order to minimize waking forward lockers */
+	while ((node = rb_last(&ctxt->b_inodes)) != NULL) {
 		binode = rb_entry(node, ocfs2_backing_inode, ba_node);
-		if (!binode->ba_locked)
-			goto skip_unlock;
-		inode = binode->ba_inode;
 
-		ocfs2_del_io_marker(inode, &binode->ba_task);
+		ocfs2_del_io_marker(binode->ba_inode, &binode->ba_task);
 
-		if (!ocfs2_buffer_lock_is_target(ctxt, inode) ||
-		    !ctxt->b_lock_direct) {
-			level = 0;
-			if (ocfs2_buffer_lock_is_target(ctxt, inode))
-				level = ctxt->b_lock_data_write;
+		if (binode->ba_locked && binode->ba_lock_data)
+			ocfs2_data_unlock(binode->ba_inode,
+					  binode->ba_lock_data_level);
 
-			ocfs2_data_unlock(inode, level);
+		if (binode->ba_locked || binode->ba_meta_locked)
+			ocfs2_meta_unlock(binode->ba_inode,
+					  binode->ba_lock_meta_level);
+
+		rb_erase(node, &ctxt->b_inodes);
+		kfree(binode);
+	}
+
+	ctxt->b_next_unlocked = NULL;
+}
+
+/*
+ * This builds up the locking state that will be used by a write.  both normal
+ * file writes and AIO writes come in through here.  This function does no
+ * teardown on its own.  The caller must examine the info struct to see if it
+ * needs to release locks or i_sem, etc.  This function is also restartable in
+ * that it can return EIOCBQUEUED if it would have blocked in the dlm.  It
+ * stores its partial progress in the info struct so the caller can call back
+ * in when it thinks the dlm won't block any more.  Thus, the caller must zero
+ * the info struct before calling in the first time.
+ */
+ssize_t ocfs_write_lock_maybe_extend(struct file *filp, const char *buf,
+				     size_t count, loff_t *ppos,
+				     struct ocfs2_write_lock_info *info,
+				     ocfs2_buffer_lock_ctxt *ctxt)
+{
+	int ret = 0;
+	ocfs_super *osb = NULL;
+	struct dentry *dentry = filp->f_dentry;
+	struct inode *inode = dentry->d_inode;
+	int status;
+	int sector_size;
+	int level = filp->f_flags & O_APPEND;
+	loff_t saved_ppos;
+	u64 bytes_added = 0;
+
+	osb = OCFS_SB(inode->i_sb);
+	sector_size = 1 << osb->s_sectsize_bits;
+
+	/* the target inode is different from the other inodes.  in o_direct it
+	 * doesn't get a data lock and when appending it gets a level 1 meta
+	 * lock.  we use target_binode to set its flags accordingly */ 
+	if (info->wl_target_binode == NULL) {
+		ret = ocfs2_setup_io_locks(inode->i_sb, inode, (char *) buf,
+					   count, ctxt, 
+					   &info->wl_target_binode);
+		if (ret < 0) {
+			BUG_ON(ret == -EIOCBQUEUED);
+			LOG_ERROR_STATUS(ret);
+			goto bail;
 		}
+	}
 
-		level = 0;
-		if (ocfs2_buffer_lock_is_target(ctxt, inode))
-			level = ctxt->b_lock_meta_write;
+	/* This will lock everyone in the context who's order puts
+	 * them before us. */
+	if (!info->wl_have_before) {
+		info->wl_unlock_ctxt = 1;
+		ret = ocfs2_lock_buffer_inodes(ctxt, inode);
+		if (ret < 0) {
+			if (ret != -EINTR)
+				LOG_ERROR_STATUS(ret);
+			goto bail;
+		}
+		info->wl_have_before = 1;
+		/* we're writing so get an ex data cluster lock */
+		info->wl_target_binode->ba_lock_data_level = 1;
+	}
 
-		ocfs2_meta_unlock(inode, level);
+	if (!info->wl_have_i_sem) {
+		down(&inode->i_sem);
+		info->wl_have_i_sem = 1;
+	}
 
-skip_unlock:
-		tmp = node;
-		node = rb_next(node);
+lock:
+	if (!info->wl_have_target_meta) {
+		status = ocfs2_meta_lock(inode, NULL, NULL, level);
+		if (status < 0) {
+			if (status != -EINTR)
+				LOG_ERROR_STATUS(status);
+			ret = status;
+			goto bail;
+		}
+		info->wl_have_target_meta = 1;
+	}
+	/* to handle extending writes, we do a bit of our own locking
+	 * here, but we setup the ctxt do unlock for us (as well as
+	 * handle locking everything else. */
+	if (level)
+		info->wl_target_binode->ba_lock_meta_level = 1;
 
-		rb_erase(tmp, &ctxt->b_inodes);
-		kfree(binode);
+	/* work on a copy of ppos until we're sure that we won't have
+	 * to recalculate it due to relocking. */
+	saved_ppos = *ppos;
+
+	if (filp->f_flags & O_APPEND) {
+		saved_ppos = i_size_read(inode);
+		LOG_TRACE_ARGS("O_APPEND: inode->i_size=%llu\n", saved_ppos);
+
+		/* ugh, work around some applications which open
+		 * everything O_DIRECT + O_APPEND and really don't
+		 * mean to use O_DIRECT. */
+#warning this is wrong wrong wrong
+		filp->f_flags &= ~O_DIRECT;
 	}
 
-	ctxt->b_target = ctxt->b_head = NULL;
+	if (filp->f_flags & O_DIRECT) {
+		/* anything special for o_direct? */
+		LOG_TRACE_STR ("O_DIRECT");
+		if ((saved_ppos & (sector_size - 1)) || 
+		    (count & (sector_size - 1)) || 
+		    ((unsigned long)buf & (sector_size - 1))) {
+			info->wl_do_direct_io = 0;
+			filp->f_flags |= O_SYNC;
+		} else {
+			info->wl_do_direct_io = 1;
+		}
+	}
+	info->wl_target_binode->ba_lock_data = info->wl_do_direct_io ? 0 : 1;
+
+	info->wl_newsize = count + saved_ppos;
+	if (filp->f_flags & O_APPEND)
+		info->wl_newsize = count + i_size_read(inode);
+
+	LOG_TRACE_ARGS ("ppos=%llu newsize=%llu cursize=%llu\n",
+			saved_ppos, info->wl_newsize, i_size_read(inode));
+
+	if (info->wl_newsize > i_size_read(inode)) {
+		if (!level) {
+			/* we want an extend, but need a higher
+			 * level cluster lock. */
+			LOG_TRACE_ARGS("inode %llu, had a PR, looping back "
+				       "for EX\n", OCFS_I(inode)->ip_blkno);
+			ocfs2_meta_unlock(inode, level);
+			info->wl_have_target_meta = 0;
+			level = 1;
+			goto lock;
+		}
+		info->wl_extended = 1;
+
+		LOG_TRACE_ARGS("Writing at EOF, will need more allocation: "
+			       "i_size=%llu, need=%llu\n",
+			       i_size_read(inode), info->wl_newsize);
+
+		/* If we extend AT ALL here then we update our state
+		 * and continue the write call, regardless of error --
+		 * this is basically a short write. */
+		status = ocfs_extend_file(osb, inode, info->wl_newsize,
+					  &bytes_added);
+		if (status < 0 && (!bytes_added)) {
+			if (status != -EINTR && status != -ENOSPC) {
+				LOG_ERROR_STATUS (status);
+				LOG_ERROR_ARGS("Failed to extend inode %llu "
+					       "from %llu to %llu",
+						OCFS_I(inode)->ip_blkno,
+					       *ppos, info->wl_newsize);
+			}
+			ret = status;
+
+			info->wl_have_target_meta = 0;
+			ocfs2_meta_unlock(inode, level);
+			goto bail;
+		}
+
+		/* We need to recalulate newsize and count according
+		 * to what extend could give us. If we got the whole
+		 * extend then this doesn't wind up changing the
+		 * values. */
+		info->wl_newsize = i_size_read(inode) + bytes_added;
+		count = info->wl_newsize - saved_ppos;
+
+		if (status < 0 && status != -ENOSPC && status != -EINTR)
+			LOG_ERROR_ARGS("status return of %d extending inode "
+				       "%llu\n", status,
+				       OCFS_I(inode)->ip_blkno);
+		status = 0;
+	}
+
+	/* we've got whatever cluster lock is appropriate now, so we
+	 * can stuff *ppos back. */
+	*ppos = saved_ppos;
+
+	if (!info->wl_do_direct_io && !info->wl_have_data_lock) {
+		status = ocfs2_data_lock(inode, 1);
+		if (status < 0) {
+			if (status != -EINTR)
+				LOG_ERROR_STATUS(status);
+			ret = status;
+
+			info->wl_have_target_meta = 0;
+			ocfs2_meta_unlock(inode, level);
+			goto bail;
+		}
+		info->wl_have_data_lock = 1;
+	}
+
+	/* Alright, fool the io locking stuff into thinking it's
+	 * handled our inode for us. We can now count on it to do the
+	 * unlock for us. */
+	info->wl_target_binode->ba_locked = 1;
+
+	/* This will lock everyone who's order puts them *after* our inode. */
+	ret = ocfs2_lock_buffer_inodes(ctxt, NULL);
+	if (ret < 0) {
+		if (ret != -EINTR)
+			LOG_ERROR_STATUS(ret);
+		goto bail;
+	}
+
+bail:
+	LOG_EXIT_INT(ret);
+	return ret;
 }
 
 #if 0

Modified: branches/dlm-reco-mig/fs/ocfs2/mmap.h
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/mmap.h	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/mmap.h	2005-03-04 22:21:29 UTC (rev 1939)
@@ -42,7 +42,8 @@
 	ocfs_inode_private *oip = OCFS_I(inode);
 
 	spin_lock(&oip->ip_lock);
-	list_del_init(&task->io_list);
+	if (!list_empty(&task->io_list))
+		list_del_init(&task->io_list);
 	spin_unlock(&oip->ip_lock);
 }
 
@@ -70,46 +71,61 @@
 typedef struct _ocfs2_backing_inode {
 	struct rb_node           ba_node;
 	struct inode            *ba_inode;
-	int                      ba_locked;
+	unsigned		 ba_meta_locked:1, 	/* meta is locked */
+				 ba_locked:1,		/* both are locked */
+				 ba_lock_data:1,	/* should lock data */
+				 ba_lock_meta_level:1, 
+				 ba_lock_data_level:1;
 	struct _ocfs2_io_marker  ba_task;
 } ocfs2_backing_inode;
 
 /* Used to manage the locks taken during I/O. */
 typedef struct _ocfs2_buffer_lock_ctxt {
-	/* target flags */
-	unsigned                b_lock_direct:1,
-				b_lock_meta_write:1,
-				b_lock_data_write:1;
 	struct rb_root          b_inodes;
-	ocfs2_backing_inode    *b_target;
-	ocfs2_backing_inode    *b_head;
+	ocfs2_backing_inode	*b_next_unlocked;
+	ocfs2_lock_callback	b_cb;
+	unsigned long		b_cb_data;
 } ocfs2_buffer_lock_ctxt;
 
-#define __BUFFERLOCK_INITIALIZER(name) {				\
+#define __BUFFERLOCK_INITIALIZER {					\
 	.b_inodes               = RB_ROOT,				\
-	.b_target               = NULL,					\
-	.b_head			= NULL }
+	.b_next_unlocked	= NULL,					\
+	.b_cb			= NULL,					\
+	.b_cb_data		= 0 }
 
 #define DECLARE_BUFFER_LOCK_CTXT(name)					\
-	ocfs2_buffer_lock_ctxt name = __BUFFERLOCK_INITIALIZER(name)
+	ocfs2_buffer_lock_ctxt name = __BUFFERLOCK_INITIALIZER
 
+#define INIT_BUFFER_LOCK_CTXT(ctxt)	\
+	*(ctxt) = (ocfs2_buffer_lock_ctxt) __BUFFERLOCK_INITIALIZER
+
 int ocfs2_setup_io_locks(struct super_block *sb,
 			 struct inode *target_inode,
 			 char *buf,
 			 size_t size,
-			 ocfs2_buffer_lock_ctxt *ctxt);
+			 ocfs2_buffer_lock_ctxt *ctxt,
+			 ocfs2_backing_inode **target_binode);
 
 int ocfs2_lock_buffer_inodes(ocfs2_buffer_lock_ctxt *ctxt,
 			     struct inode *last_inode);
 
 void ocfs2_unlock_buffer_inodes(struct _ocfs2_buffer_lock_ctxt *ctxt);
 
-static inline int ocfs2_buffer_lock_is_target(ocfs2_buffer_lock_ctxt *ctxt,
-					      struct inode *inode)
-{
-	if (!ctxt->b_target)
-		return 0;
-	return inode == ctxt->b_target->ba_inode;
-}
+struct ocfs2_write_lock_info {
+	u64			wl_newsize;
+	unsigned		wl_extended:1,
+				wl_do_direct_io:1,
+				wl_have_i_sem:1,
+				wl_unlock_ctxt:1,
+				wl_have_before:1,
+				wl_have_target_meta:1,
+				wl_have_data_lock:1;
+	ocfs2_backing_inode	*wl_target_binode;
+};
 
+ssize_t ocfs_write_lock_maybe_extend(struct file *filp, const char *buf,
+				     size_t count, loff_t *ppos,
+				     struct ocfs2_write_lock_info *info,
+				     ocfs2_buffer_lock_ctxt *ctxt);
+
 #endif  /* OCFS2_MMAP_H */

Modified: branches/dlm-reco-mig/fs/ocfs2/ocfs.h
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/ocfs.h	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/ocfs.h	2005-03-04 22:21:29 UTC (rev 1939)
@@ -163,15 +163,30 @@
 
 struct ocfs2_lock_res_ops;
 
+typedef void (*ocfs2_lock_callback)(int status, unsigned long data);
+
+struct ocfs2_lockres_flag_callback {
+	struct list_head	fc_lockres_item;
+	unsigned		fc_free_once_called:1;
+
+	unsigned long		fc_flag_mask;
+	unsigned long		fc_flag_goal;
+
+	ocfs2_lock_callback	fc_cb;
+	unsigned long		fc_data;
+};
+
+
 typedef struct _ocfs2_lock_res {
 	void                    *l_priv;
 	struct ocfs2_lock_res_ops *l_ops;
 	spinlock_t               l_lock;
 
 	struct list_head         l_blocked_list;
+	struct list_head         l_flag_cb_list;
 
 	enum ocfs2_lock_type     l_type;
-	int                      l_flags;
+	unsigned long		 l_flags;
 	char                    *l_name;
 	int                      l_level;
 	unsigned int             l_ro_holders;
@@ -186,6 +201,7 @@
 	int                      l_blocking;
 
 	wait_queue_head_t l_event;
+
 } ocfs2_lock_res;
 
 /* OCFS2 Inode Private Data */
@@ -271,12 +287,6 @@
 }
 ocfs_vol_state;
 
-typedef struct _ocfs_commit_task
-{
-	struct completion c_complete;
-	struct task_struct *c_task;
-} ocfs_commit_task;
-
 typedef struct _ocfs_alloc_stats
 {
 	atomic_t moves;
@@ -301,7 +311,7 @@
 {
 	struct list_head osb_next;	/* list of ocfs_super(s) */
 	u32 osb_id;		/* id used by the proc interface */
-	ocfs_commit_task *commit;
+	struct task_struct *commit_task;
 	struct super_block *sb;
 	struct inode *root_inode;
 	struct inode *sys_root_inode;
@@ -344,7 +354,7 @@
 
 	atomic_t vol_state;
 	struct semaphore recovery_lock;
-	int recovery_launched;
+	struct task_struct *recovery_thread_task;
 	int disable_recovery;
 	wait_queue_head_t checkpoint_event;
 	atomic_t needs_checkpoint;
@@ -373,8 +383,8 @@
 	spinlock_t vote_task_lock;
 	struct task_struct *vote_task;
 	wait_queue_head_t vote_event;
-	atomic_t wake_vote_task;
-	int vote_exit;
+	unsigned long vote_wake_sequence;
+	unsigned long vote_work_sequence;
 
 	struct list_head blocked_lock_list;
 	unsigned long blocked_lock_count;
@@ -382,9 +392,6 @@
 	struct list_head vote_list;
 	int vote_count;
 
-	struct completion vote_event_complete;
-	struct completion vote_event_init;
-
 	u32 net_key;
 	spinlock_t net_response_lock;
 	unsigned int net_response_ids;

Modified: branches/dlm-reco-mig/fs/ocfs2/proc.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/proc.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/proc.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -432,7 +432,7 @@
 	osb = data;
 
 	if (osb) {
-		for (i = 0; i < osb->max_nodes; i++) {
+		for (i = 0; i < OCFS_NODE_MAP_MAX_NODES; i++) {
 			mount = ocfs_node_map_test_bit(osb, &osb->mounted_map, i) ? 'M' : ' ';
 			len += sprintf(page + len, "%2d %c\n", i, mount);
 		}

Modified: branches/dlm-reco-mig/fs/ocfs2/suballoc.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/suballoc.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/suballoc.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -321,7 +321,8 @@
 				     &bit_off,
 				     &num_bits);
 	if (status < 0) {
-		LOG_ERROR_STATUS(status);
+		if (status != -ENOSPC)
+			LOG_ERROR_STATUS(status);
 		goto bail;
 	}
 
@@ -456,7 +457,8 @@
 
 		status = ocfs_block_group_alloc(osb, alloc_inode, bh);
 		if (status < 0) {
-			LOG_ERROR_STATUS(status);
+			if (status != -ENOSPC)
+				LOG_ERROR_STATUS(status);
 			goto bail;
 		}
 		/* You should never ask for this much metadata */
@@ -558,7 +560,8 @@
 
 	status = ocfs_reserve_suballoc_bits(osb, *ac);
 	if (status < 0) {
-		LOG_ERROR_STATUS(status);
+		if (status != -ENOSPC)
+			LOG_ERROR_STATUS(status);
 		goto bail;
 	}
 

Modified: branches/dlm-reco-mig/fs/ocfs2/super.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/super.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/super.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -1010,6 +1010,15 @@
 }				/* ocfs_mount_volume */
 
 
+/* we can't grab the goofy sem lock from inside wait_event, so we use
+ * memory barriers to make sure that we'll see the null task before
+ * being woken up */
+static int ocfs2_recovery_thread_running(ocfs_super *osb)
+{
+	mb();
+	return osb->recovery_thread_task != NULL;
+}
+
 /*
  * ocfs_dismount_volume()
  *
@@ -1031,13 +1040,8 @@
 	 * running ones to exit. Do this before setting the vol_state. */
 	down(&osb->recovery_lock);
 	osb->disable_recovery = 1;
-	while (osb->recovery_launched) {
-		up(&osb->recovery_lock);
-		LOG_TRACE_STR("Waiting on a recovery thread to complete.");
-		schedule();
-		down(&osb->recovery_lock);
-	}
 	up(&osb->recovery_lock);
+	wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
 
 	ocfs_journal_shutdown(osb);
 
@@ -1135,9 +1139,10 @@
 	}
 
 	init_waitqueue_head(&osb->recovery_event);
-	atomic_set(&osb->wake_vote_task, 0);
 	spin_lock_init(&osb->vote_task_lock);
 	init_waitqueue_head(&osb->vote_event);
+	osb->vote_work_sequence = 0;
+	osb->vote_wake_sequence = 0;
 	INIT_LIST_HEAD(&osb->blocked_lock_list);
 	osb->blocked_lock_count = 0;
 	INIT_LIST_HEAD(&osb->vote_list);
@@ -1171,7 +1176,7 @@
 	init_MUTEX (&(osb->recovery_lock));
 
 	osb->disable_recovery = 0;
-	osb->recovery_launched = 0;
+	osb->recovery_thread_task = NULL;
 
 	init_waitqueue_head (&osb->checkpoint_event);
 	atomic_set (&osb->needs_checkpoint, 0);

Modified: branches/dlm-reco-mig/fs/ocfs2/util.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/util.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/util.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -69,45 +69,6 @@
 #endif
 }
 
-/*
- * ocfs_daemonize() 
- *
- */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
-/* yes, len is unused but kept here for backwards compatibility. */
-void ocfs_daemonize (char *name, int len, int shutdown_sigs)
-{
-	sigset_t tmpsig;
-
-	daemonize (name);
-
-	if (shutdown_sigs) {
-		/* Unblock SIGKILL, SIGSTOP, SIGHUP and SIGINT */
-		sigemptyset(&tmpsig);
-		sigaddsetmask(&tmpsig, SHUTDOWN_SIGS);
-		sigprocmask(SIG_UNBLOCK, &tmpsig, NULL);
-	}
-}				/* ocfs_daemonize */
-#else
-void ocfs_daemonize (char *name, int len, int shutdown_sigs)
-{
-	daemonize ();
-	reparent_to_init ();
-
-	if (len > 0) {
-		if (len > 15)
-			BUG();
-		strncpy (current->comm, name, len);
-		current->comm[len] = '\0';
-	}
-
-	if (shutdown_sigs)
-		ocfs_block_sigs(NULL, SHUTDOWN_SIGS);
-	else
-		ocfs_block_sigs(NULL, 0);
-}				/* ocfs_daemonize */
-#endif
-
 /* prefetch has been declared to allow to build in debug mode */
 #ifdef DEBUG
 #ifndef ARCH_HAS_PREFETCH

Modified: branches/dlm-reco-mig/fs/ocfs2/util.h
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/util.h	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/util.h	2005-03-04 22:21:29 UTC (rev 1939)
@@ -26,7 +26,6 @@
 #ifndef OCFS2_UTIL_H
 #define OCFS2_UTIL_H
 
-void ocfs_daemonize(char *name, int len, int shutdown_sigs);
 void ocfs_show_stack(unsigned long *esp);
 void ocfs_show_trace(unsigned long *stack);
 void ocfs_block_sigs(sigset_t *oldsigs, unsigned long mask);

Modified: branches/dlm-reco-mig/fs/ocfs2/vote.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/vote.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/vote.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -29,6 +29,7 @@
 #include <linux/slab.h>
 #include <linux/highmem.h>
 #include <linux/smp_lock.h>
+#include <linux/kthread.h>
 
 #include <cluster/util.h>
 #include <cluster/clcommon.h>
@@ -327,6 +328,10 @@
 	LOG_ENTRY();
 
 	spin_lock(&osb->vote_task_lock);
+	/* grab this early so we know to try again if a state change and
+	 * wake happens part-way through our work  */
+	osb->vote_work_sequence = osb->vote_wake_sequence;
+
 	processed = osb->blocked_lock_count;
 	while (processed) {
 		OCFS_ASSERT(!list_empty(&osb->blocked_lock_list));
@@ -363,60 +368,56 @@
 	LOG_EXIT();
 }
 
-static inline int ocfs2_vote_thread_has_work(ocfs_super *osb)
+
+static int ocfs2_vote_thread_lists_empty(ocfs_super *osb)
 {
-	if (list_empty(&osb->blocked_lock_list) &&
-	    list_empty(&osb->vote_list))
-		return 0;
+	int empty = 0;
 
-	return 1;
+	spin_lock(&osb->vote_task_lock);
+       if (list_empty(&osb->blocked_lock_list) &&
+	   list_empty(&osb->vote_list))
+		empty = 1;
+
+	spin_unlock(&osb->vote_task_lock);
+	return empty;
 }
 
+static int ocfs2_vote_thread_should_wake(ocfs_super *osb)
+{
+	int should_wake = 0;
+
+	spin_lock(&osb->vote_task_lock);
+	if (osb->vote_work_sequence != osb->vote_wake_sequence)
+		should_wake = 1;
+	spin_unlock(&osb->vote_task_lock);
+
+	return should_wake;
+}
+
 int ocfs2_vote_thread(void *arg)
 {
 	int status = 0;
 	ocfs_super *osb = arg;
-	char proc[16];
 
-	sprintf (proc, "ocfs2vote-%d", osb->osb_id);
-	ocfs_daemonize (proc, strlen(proc), 0);
+	/* only quit once we've been asked to stop and there is no more
+	 * work available */
+	while (!(kthread_should_stop() &&
+		 ocfs2_vote_thread_lists_empty(osb))) {
 
-	spin_lock(&osb->vote_task_lock);
-	osb->vote_task = current;
-	init_completion (&osb->vote_event_complete);
-
-	complete(&osb->vote_event_init);
-
-	while (1) {
-		if (osb->vote_exit) {
-			if (!ocfs2_vote_thread_has_work(osb))
-				break;
-			/* don't want to sleep if we're supposed to quit. */
-			atomic_set(&osb->wake_vote_task, 1);
-		}
-		spin_unlock(&osb->vote_task_lock);
-
 		wait_event_interruptible(osb->vote_event,
-					 atomic_read(&osb->wake_vote_task));
+					 ocfs2_vote_thread_should_wake(osb) ||
+					 kthread_should_stop());
 
-		atomic_set(&osb->wake_vote_task, 0);
-
 		LOG_TRACE_STR("vote_thread: awoken");
 
 		ocfs2_vote_thread_do_work(osb);
-		spin_lock(&osb->vote_task_lock);
 	}
 
 	osb->vote_task = NULL;
-	spin_unlock(&osb->vote_task_lock);
-
-	complete(&osb->vote_event_complete);
-
 	return status;
 }
 
-static ocfs2_net_wait_ctxt *ocfs2_new_net_wait_ctxt(ocfs_super *osb,
-						    unsigned int response_id)
+static ocfs2_net_wait_ctxt *ocfs2_new_net_wait_ctxt(unsigned int response_id)
 {
 	ocfs2_net_wait_ctxt *w;
 
@@ -429,7 +430,7 @@
 
 	INIT_LIST_HEAD(&w->n_list);
 	init_waitqueue_head(&w->n_event);
-	ocfs_node_map_init(osb, &w->n_node_map);
+	ocfs_node_map_init(&w->n_node_map);
 	w->n_response_id = response_id;
 bail:
 	return w;
@@ -509,7 +510,7 @@
 
 	LOG_ENTRY();
 
-	w = ocfs2_new_net_wait_ctxt(osb, response_id);
+	w = ocfs2_new_net_wait_ctxt(response_id);
 	if (!w) {
 		status = -ENOMEM;
 		LOG_ERROR_STATUS(status);

Modified: branches/dlm-reco-mig/fs/ocfs2/vote.h
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/vote.h	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/ocfs2/vote.h	2005-03-04 22:21:29 UTC (rev 1939)
@@ -30,7 +30,11 @@
 int ocfs2_vote_thread(void *arg);
 static inline void ocfs2_kick_vote_thread(ocfs_super *osb)
 {
-	atomic_set(&osb->wake_vote_task, 1);
+	spin_lock(&osb->vote_task_lock);
+	/* make sure the voting thread gets a swipe at whatever changes
+	 * the caller may have made to the voting state */
+	osb->vote_wake_sequence++;
+	spin_unlock(&osb->vote_task_lock);
 	wake_up(&osb->vote_event);
 }
 

Modified: branches/dlm-reco-mig/fs/usysfs/dir.c
===================================================================
--- branches/dlm-reco-mig/fs/usysfs/dir.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/usysfs/dir.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -673,29 +673,27 @@
 	if (!dentry)
 		goto out_release;
 	
+	d_add(dentry, NULL);
+
 	err = usysfs_create_dir(&k->kobj, dentry);
-	if (err)
-		goto out_dput;
+	if (!err) {
+		err = populate_dir(&k->kobj);
+		if (!err)
+			dentry = NULL;
+		else {
+			usysfs_remove_dir(&k->kobj);
+			d_delete(dentry);
+		}
+	}
 
-	d_rehash(dentry);
 	up(&usysfs_sb->s_root->d_inode->i_sem);
 
-	err = populate_dir(&k->kobj);
-	if (err) {
-		down(&usysfs_sb->s_root->d_inode->i_sem);
-		usysfs_remove_dir(&k->kobj);
-		up(&usysfs_sb->s_root->d_inode->i_sem);
-		d_delete(dentry);
-		goto out_dput;
+	if (dentry) {
+	    dput(dentry);
+out_release:
+	    usysfs_release_fs();
 	}
 
-	return 0;
-
-out_dput:
-	dput(dentry);
-out_release:
-	usysfs_release_fs();
-
 	return err;
 }
 

Modified: branches/dlm-reco-mig/fs/usysfs/symlink.c
===================================================================
--- branches/dlm-reco-mig/fs/usysfs/symlink.c	2005-03-04 20:26:31 UTC (rev 1938)
+++ branches/dlm-reco-mig/fs/usysfs/symlink.c	2005-03-04 22:21:29 UTC (rev 1939)
@@ -1,4 +1,6 @@
-/*
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
  * symlink.c - operations for usysfs symlinks.
  */
 
@@ -95,6 +97,53 @@
 }
 
 
+int usysfs_symlink(struct inode *dir, struct dentry *dentry, const char *symname)
+{
+	int ret;
+	struct nameidata nd;
+	struct kobject *parent_kobj;
+	struct kobject *target_kobj;
+	struct ukobj_type *uktype;
+
+	if (dentry->d_parent == usysfs_sb->s_root)
+		return -EPERM;
+
+	parent_kobj = usysfs_get_kobject(dentry->d_parent);
+	uktype = to_uktype(parent_kobj->ktype);
+
+	if (!uktype || !uktype->allow_link) {
+		kobject_put(parent_kobj);
+		return -EPERM;  /* What lack-of-symlink returns */
+	}
+
+	ret = path_lookup(symname, LOOKUP_FOLLOW|LOOKUP_DIRECTORY, &nd);
+	if (ret) {
+		kobject_put(parent_kobj);
+		return ret;
+	}
+
+	ret = -ENOENT;
+	target_kobj = usysfs_get_kobject(nd.dentry);
+	if (!target_kobj)
+		goto out_release;
+
+	ret = uktype->allow_link(parent_kobj, target_kobj);
+	if (ret)
+		goto out_target;
+
+	return 0;
+
+out_target:
+	kobject_put(target_kobj);
+
+out_release:
+	path_release(&nd);
+
+	kobject_put(parent_kobj);
+
+	return ret;
+}
+
 /**
  *	usysfs_remove_link - remove symlink in object's directory.
  *	@kobj:	object we're acting for.

Copied: branches/dlm-reco-mig/kapi-compat/include/journal_access.h (from rev 1938, trunk/kapi-compat/include/journal_access.h)



More information about the Ocfs2-commits mailing list