[Ocfs2-commits] khackel commits r1781 - trunk/cluster

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Mon Jan 17 19:13:42 CST 2005


Author: khackel
Date: 2005-01-17 19:13:40 -0600 (Mon, 17 Jan 2005)
New Revision: 1781

Added:
   trunk/cluster/dlmast.c
   trunk/cluster/dlmconvert.c
   trunk/cluster/dlmlock.c
   trunk/cluster/dlmunlock.c
Modified:
   trunk/cluster/Makefile
   trunk/cluster/dlmmaster.c
   trunk/cluster/dlmmod.c
   trunk/cluster/dlmmod.h
Log:
* added dlmunlock.c, dlmlock.c, dlmast.c and dlmconvert.c to
  split out the functionality into something more obvious 
* cleanup/addition of many comments
* prettied some stuff up
* revised the locking in local and remote lock, unlock and convert
  cases.  almost every function now drops all locks taken in the
  function, and expects no locks to be held coming in.  exceptions
  should be commented.
* MORE WORK still needs to be done on ASTs and BASTs.  many paths were 
  cleaned up to make sure that no spinlocks (usually lockres and 
  sometimes both lockres and lock) are held when an ast is called.  
  more cleanup needs to be done, especially in dlmthread.c.  this will
  come in the next commit.



Modified: trunk/cluster/Makefile
===================================================================
--- trunk/cluster/Makefile	2005-01-18 00:55:48 UTC (rev 1780)
+++ trunk/cluster/Makefile	2005-01-18 01:13:40 UTC (rev 1781)
@@ -41,6 +41,10 @@
 	compat_libfs.c	\
 	dlmmaster.c	\
 	dlmmod.c	\
+	dlmast.c	\
+	dlmconvert.c	\
+	dlmlock.c	\
+	dlmunlock.c	\
 	dlmrecovery.c	\
 	dlmthread.c	\
 	heartbeat.c	\
@@ -144,7 +148,8 @@
 ocfs2_cluster_test.o: test.o util.o compat_libfs.o
 	$(CC) $(OPTS) -Wl,-r -o $@ $^ $(LDADD)
 
-ocfs2_dlm.o: dlmmod.o dlmthread.o dlmrecovery.o util.o compat_libfs.o dlmmaster.o
+ocfs2_dlm.o: dlmmod.o dlmthread.o dlmrecovery.o util.o compat_libfs.o dlmmaster.o \
+		dlmast.o dlmconvert.o dlmlock.o dlmunlock.o
 	$(CC) $(OPTS) -Wl,-r -o $@ $^ $(LDADD)
 
 ocfs2_nodemanager.o: nodemanager.o util.o compat_libfs.o
@@ -222,7 +227,8 @@
 
 # list of object files that are used to create our module
 ocfs2_cluster_test-objs := test.o util.o compat_libfs.o
-ocfs2_dlm-objs := dlmmod.o dlmthread.o dlmrecovery.o util.o compat_libfs.o dlmmaster.o
+ocfs2_dlm-objs := dlmmod.o dlmthread.o dlmrecovery.o util.o compat_libfs.o dlmmaster.o \
+		  dlmast.o dlmconvert.o dlmlock.o dlmunlock.o
 ocfs2_nodemanager-objs := nodemanager.o util.o compat_libfs.o
 ocfs2_heartbeat-objs := heartbeat.o util.o compat_libfs.o
 ocfs2_tcp-objs := tcp.o util.o compat_libfs.o

Added: trunk/cluster/dlmast.c
===================================================================
--- trunk/cluster/dlmast.c	2005-01-18 00:55:48 UTC (rev 1780)
+++ trunk/cluster/dlmast.c	2005-01-18 01:13:40 UTC (rev 1781)
@@ -0,0 +1,286 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * dlmast.c
+ *
+ * AST and BAST functionality for local and remote nodes
+ *
+ * Copyright (C) 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ *
+ * Authors: Kurt Hackel
+ */
+
+#include "warning_hack.h"
+
+#include "dlm_compat.h"
+#include "util.h"
+#include "dlmcommon.h"
+
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+#include <linux/utsname.h>
+#include <linux/init.h>
+#include <linux/sysctl.h>
+#include <linux/random.h>
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
+#include <linux/statfs.h>
+#include <linux/moduleparam.h>
+#endif
+#include <linux/blkdev.h>
+#include <linux/socket.h>
+#include <linux/inet.h>
+#include <linux/spinlock.h>
+
+
+#include "heartbeat.h"
+#include "nodemanager.h"
+#include "tcp.h"
+#include "dlmmod.h"
+
+
+int dlm_do_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock)
+{
+	int ret;
+
+	dlm_astlockfunc_t *fn;
+	dlm_lockstatus *lksb;
+
+	dlmprintk0("\n");
+
+	DLM_ASSERT(lock);
+	DLM_ASSERT(res);
+	DLM_ASSERT(lock->lksb);
+
+	lksb = lock->lksb;
+	fn = lock->ast;
+
+	if (res->owner == dlm->group_index) {
+		/* this node is the lockres master */
+		if (lksb->flags & DLM_LKSB_GET_LVB) {
+			dlmprintk("getting lvb from lockres for %s node\n",
+				  lock->node == dlm->group_index ? "master" :
+				  "remote");
+			memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN);
+		} else if (lksb->flags & DLM_LKSB_PUT_LVB) {
+			dlmprintk("setting lvb from lockres for %s node\n",
+				  lock->node == dlm->group_index ? "master" :
+				  "remote");
+			memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
+		}
+	}
+
+	ret = 0;
+	if (lock->node != dlm->group_index) {
+		/* lock request came from another node
+		 * go do the ast over there */
+		ret = dlm_send_proxy_ast(dlm, res, lock, DLM_AST, 0);
+	} else {
+		DLM_ASSERT(fn);
+		(*fn)(lock->astdata);
+	}
+
+	/* reset any lvb flags on the lksb */
+	lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
+	return ret;
+}
+
+
+int dlm_do_bast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int blocked_type)
+{
+	dlm_bastlockfunc_t *fn = lock->bast;
+	
+	dlmprintk0("\n");
+
+	if (lock->node != dlm->group_index) {
+		return dlm_send_proxy_ast(dlm, res, lock, DLM_BAST, blocked_type);
+	}
+
+	if (!fn) {
+		dlmprintk("eek! lock has no bast %*s!  cookie=%llu\n", 
+		       res->lockname.len, res->lockname.name, lock->cookie);
+		return -EINVAL;
+	}
+	(*fn)(lock->astdata, blocked_type);
+	return 0;
+}
+
+
+int dlm_send_proxy_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int type, int blocked_type)
+{
+	int ret = 0;
+	dlm_proxy_ast past;
+	struct inode *inode = NULL;
+	struct iovec iov[2];
+	size_t iovlen = 1;
+
+	dlmprintk("res %*s, to=%u, type=%d, blocked_type=%d\n",res->lockname.len, res->lockname.name, lock->node, type, blocked_type);
+
+
+	memset(&past, 0, sizeof(dlm_proxy_ast));
+	past.node_idx = dlm->group_index;
+	past.type = type;
+	past.blocked_type = blocked_type;
+	past.namelen = res->lockname.len;
+	strncpy(past.name, res->lockname.name, past.namelen);
+	past.cookie = lock->cookie;
+
+	iov[0].iov_len = sizeof(dlm_proxy_ast);
+	iov[0].iov_base = &past;
+	if (lock->lksb->flags & DLM_LKSB_GET_LVB) {
+		past.flags |= LKM_GET_LVB;
+		iov[1].iov_len = DLM_LVB_LEN;
+		iov[1].iov_base = lock->lksb->lvb;
+		iovlen++;
+	}
+
+	ret = -EINVAL;
+	inode = nm_get_group_node_by_index(dlm->group, lock->node);
+	if (inode) {
+		dlm_proxy_ast_to_net(&past);
+		ret = net_send_message_iov(DLM_PROXY_AST_MSG, dlm->key, 
+					   iov, iovlen, inode, NULL);
+		iput(inode);
+	}
+	if (ret < 0) {
+		dlmprintk("(%d) dlm_send_proxy_ast: returning %d\n", current->pid, ret);
+	}
+	return ret;
+}
+
+int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data)
+{
+	int status;
+	dlm_ctxt *dlm = data;
+	dlm_lock_resource *res;
+	dlm_lock *lock = NULL;
+	dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
+	struct qstr lockname;
+	struct list_head *iter, *head=NULL;
+	u64 cookie;
+	u32 flags;
+
+	dlm_proxy_ast_to_host(past);
+	lockname.name = past->name;
+	lockname.len = past->namelen;
+	cookie = past->cookie;
+	flags = past->flags;
+
+	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
+	     (LKM_PUT_LVB|LKM_GET_LVB)) {
+		dlmprintk("both PUT and GET lvb specified\n");
+		return DLM_BADARGS;
+	}
+
+	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : 
+		  (flags & LKM_GET_LVB ? "get lvb" : "none"));
+
+	lockname.hash = full_name_hash(lockname.name, lockname.len);
+	
+	dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
+
+	if (past->type != DLM_AST && 
+	    past->type != DLM_BAST) {
+		dlmprintk("Eeeek unknown ast type! %d, cookie=%llu, name=%*s\n", 
+		       past->type, cookie, lockname.len, lockname.name);
+		return 0;
+	}
+
+	res = dlm_lookup_lock(dlm, &lockname);
+	if (!res) {
+		dlmprintk("eek! got %sast for unknown lockres!  cookie=%llu, name=%*s, namelen=%d\n", 
+		       past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
+		return 0;
+	}
+
+	dlmprintk("lockres %*s\n", res->lockname.len, res->lockname.name);
+	if (!dlm_is_recovery_lock(past->name, past->namelen))
+		down_read(&dlm->recovery_sem);
+	spin_lock(&res->spinlock);
+
+	/* try convert queue for both ast/bast */
+	head = &res->converting;
+	lock = NULL;
+	list_for_each(iter, head) {
+		lock = list_entry (iter, dlm_lock, list);
+		if (lock->cookie == cookie)
+			goto do_ast;
+	}
+
+	/* if not on convert, try blocked for ast, granted for bast */
+	if (past->type == DLM_AST)
+		head = &res->blocked;
+	else 
+		head = &res->granted;
+
+	list_for_each(iter, head) {
+		lock = list_entry (iter, dlm_lock, list);
+		if (lock->cookie == cookie)
+			goto do_ast;
+	}
+
+	dlmprintk("eek! got %sast for unknown lock!  cookie=%llu, name=%*s, namelen=%d\n", 
+	       past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
+	spin_unlock(&res->spinlock);
+	if (!dlm_is_recovery_lock(past->name, past->namelen))
+		up_read(&dlm->recovery_sem);
+	return 0;
+		
+do_ast:
+	if (past->type == DLM_AST) {
+		list_del(&lock->list);
+		list_add_tail(&lock->list, &res->granted);
+		dlmprintk("ast: adding to granted list... type=%d, convert_type=%d\n",
+			  lock->type, lock->convert_type);
+		if (lock->convert_type != LKM_IVMODE) {
+			lock->type = lock->convert_type;
+			lock->convert_type = LKM_IVMODE;
+		} else {
+			// should already be there....
+		}
+		
+		lock->lksb->status = DLM_NORMAL;
+
+		/* if we requested the lvb, fetch it into our lksb now */
+		if (flags & LKM_GET_LVB) {
+			DLM_ASSERT(lock->lksb->flags & DLM_LKSB_GET_LVB);
+			memcpy(lock->lksb->lvb, past->lvb, DLM_LVB_LEN);
+		}
+		status = dlm_do_ast(dlm, res, lock);
+		dlmprintk("ast done: now... type=%d, convert_type=%d\n",
+			  lock->type, lock->convert_type);
+	} else {
+		dlmprintk("bast: before... type=%d, convert_type=%d\n",
+			  lock->type, lock->convert_type);
+		status = dlm_do_bast(dlm, res, lock, past->blocked_type);
+		dlmprintk("bast: after... type=%d, convert_type=%d\n",
+			  lock->type, lock->convert_type);
+	}
+
+	if (status < 0)
+		dlmprintk("eeek: ast/bast returned %d\n", status);
+
+	spin_unlock(&res->spinlock);
+	if (!dlm_is_recovery_lock(past->name, past->namelen))
+		up_read(&dlm->recovery_sem);
+	return 0;
+}
+
+

Added: trunk/cluster/dlmconvert.c
===================================================================
--- trunk/cluster/dlmconvert.c	2005-01-18 00:55:48 UTC (rev 1780)
+++ trunk/cluster/dlmconvert.c	2005-01-18 01:13:40 UTC (rev 1781)
@@ -0,0 +1,453 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * dlmconvert.c
+ *
+ * underlying calls for lock conversion
+ *
+ * Copyright (C) 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ *
+ * Authors: Kurt Hackel
+ */
+
+#include "warning_hack.h"
+
+#include "dlm_compat.h"
+#include "util.h"
+#include "dlmcommon.h"
+
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+#include <linux/utsname.h>
+#include <linux/init.h>
+#include <linux/sysctl.h>
+#include <linux/random.h>
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
+#include <linux/statfs.h>
+#include <linux/moduleparam.h>
+#endif
+#include <linux/blkdev.h>
+#include <linux/socket.h>
+#include <linux/inet.h>
+#include <linux/spinlock.h>
+
+
+#include "heartbeat.h"
+#include "nodemanager.h"
+#include "tcp.h"
+#include "dlmmod.h"
+
+/* NOTE: __dlmconvert_master is the only function in here that
+ * needs a spinlock held on entry (res->spinlock) and it is the
+ * only one that holds a lock on exit (res->spinlock).
+ * All other functions in here need no locks and drop all of 
+ * the locks that they acquire. */
+static dlm_status __dlmconvert_master(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			       dlm_lock *lock, int flags, int type,
+			       int *call_ast, int *kick_thread);
+
+/* 
+ * locking:
+ *   caller needs:  none
+ *   taken:         takes and drops res->spinlock
+ *   held on exit:  none
+ * returns: see __dlmconvert_master
+ */
+dlm_status dlmconvert_master(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			     dlm_lock *lock, int flags, int type)
+{
+	int call_ast = 0, kick_thread = 0;
+	dlm_status status;
+
+#warning i think i need some IN_PROGRESS work here
+	spin_lock(&res->spinlock);
+	status = __dlmconvert_master(dlm, res, lock, flags, type, 
+				     &call_ast, &kick_thread);
+	spin_unlock(&res->spinlock);
+
+#warning fix all ast calling!!!
+	if (call_ast)
+		if (dlm_do_ast(dlm, res, lock) < 0)
+			dlmprintk0("eek\n");
+	
+	if (kick_thread)
+		dlm_kick_thread(dlm, res);
+
+	return status;
+}
+
+/* performs lock conversion at the lockres master site
+ * locking:
+ *   caller needs:  res->spinlock
+ *   taken:         takes and drops lock->spinlock
+ *   held on exit:  res->spinlock
+ * returns: DLM_NORMAL, DLM_NOTQUEUED, DLM_DENIED
+ *   call_ast: whether ast should be called for this lock
+ *   kick_thread: whether dlm_kick_thread should be called
+ */
+static dlm_status __dlmconvert_master(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			       dlm_lock *lock, int flags, int type,
+			       int *call_ast, int *kick_thread)
+{
+	dlm_status status = DLM_NORMAL;
+	struct list_head *iter;
+	dlm_lock *tmplock=NULL;
+
+	dlmprintk("type=%d, convert_type=%d, new convert_type=%d\n", lock->type,
+		  lock->convert_type, type);
+
+	spin_lock(&lock->spinlock);
+
+	/* already converting? */
+	if (lock->convert_type != LKM_IVMODE) {
+		dlmprintk0("attempted to convert a lock with a lock conversion "
+			   "pending\n");
+		status = DLM_DENIED;
+		goto unlock_exit;
+	}
+
+	/* must be on grant queue to convert */
+	if (!dlm_lock_on_list(&res->granted, lock)) {
+		dlmprintk0("attempted to convert a lock not on grant queue\n");
+		status = DLM_DENIED;
+		goto unlock_exit;
+	}
+
+	if (flags & LKM_VALBLK) {
+		switch (lock->type) {
+			case LKM_EXMODE:
+				/* EX + LKM_VALBLK + convert == set lvb */
+				dlmprintk("will set lvb: converting %s->%s\n",
+					dlm_lock_mode_name(lock->type), 
+					dlm_lock_mode_name(type));
+				lock->lksb->flags |= DLM_LKSB_PUT_LVB;
+				break;
+			case LKM_PRMODE:
+			case LKM_NLMODE:
+				/* refetch if new level is not NL */
+				if (type > LKM_NLMODE) {
+					dlmprintk("will fetch new value into "
+						  "lvb: converting %s->%s\n",
+						dlm_lock_mode_name(lock->type),
+					       	dlm_lock_mode_name(type));
+					lock->lksb->flags |= DLM_LKSB_GET_LVB;
+				} else {
+					dlmprintk("will NOT fetch new value "
+						  "into lvb: converting "
+						  "%s->%s\n",
+						dlm_lock_mode_name(lock->type),
+					       	dlm_lock_mode_name(type));
+					flags &= ~(LKM_VALBLK);
+				}
+				break;
+		}
+	}
+
+	
+	/* in-place downconvert? */
+	if (type <= lock->type)
+		goto grant;
+
+	/* upconvert from here on */
+	status = DLM_NORMAL;
+	list_for_each(iter, &res->granted) {
+		tmplock = list_entry(iter, dlm_lock, list);
+		if (tmplock == lock)
+			continue;
+		if (!dlm_lock_compatible(tmplock->type, type))
+			goto switch_queues;
+	}
+
+	list_for_each(iter, &res->converting) {
+		tmplock = list_entry(iter, dlm_lock, list);
+		if (!dlm_lock_compatible(tmplock->type, type))
+			goto switch_queues;
+		/* existing conversion requests take precedence */
+		if (!dlm_lock_compatible(tmplock->convert_type, type))
+			goto switch_queues;
+	}
+
+	/* fall thru to grant */
+
+grant:
+	dlmprintk("res %*s, granting %s lock\n", res->lockname.len,
+		  res->lockname.name, dlm_lock_mode_name(type));
+	/* immediately grant the new lock type */
+	lock->lksb->status = DLM_NORMAL;
+	if (lock->node == dlm->group_index)
+		dlmprintk0("doing in-place convert for nonlocal lock\n");
+	lock->type = type;
+	status = DLM_NORMAL;
+	*call_ast = 1;
+	goto unlock_exit;
+
+switch_queues:
+	if (flags & LKM_NOQUEUE) {
+		dlmprintk("failed to convert NOQUEUE lock %*s from "
+			  "%d to %d...\n", res->lockname.len, 
+			  res->lockname.name, lock->type, type);
+		status = DLM_NOTQUEUED;
+		goto unlock_exit;
+	}
+	dlmprintk("res %*s, queueing...\n", res->lockname.len,
+		  res->lockname.name);
+
+	lock->convert_type = type;
+	list_del(&lock->list);
+	list_add_tail(&lock->list, &res->converting);
+
+unlock_exit:	
+	spin_unlock(&lock->spinlock);
+	if (status == DLM_NORMAL)
+		*kick_thread = 1;
+	return status;
+}
+
+/* messages the master site to do lock conversion
+ * locking:
+ *   caller needs:  none
+ *   taken:         takes and drops res->spinlock, uses DLM_LOCK_RES_IN_PROGRESS
+ *   held on exit:  none
+ * returns: DLM_NORMAL, DLM_RECOVERING, status from remote node
+ */
+dlm_status dlmconvert_remote(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			     dlm_lock *lock, int flags, int type)
+{
+	dlm_status status;
+	
+	dlmprintk("type=%d, convert_type=%d, busy=%d\n", lock->type, 
+		  lock->convert_type, res->state & DLM_LOCK_RES_IN_PROGRESS);
+	
+	spin_lock(&res->spinlock);
+	if (res->state & DLM_LOCK_RES_RECOVERING) {
+		status = DLM_RECOVERING;
+		goto bail;
+	}
+	/* will exit this call with spinlock held */
+	__dlm_wait_on_lockres(res);
+
+	res->state |= DLM_LOCK_RES_IN_PROGRESS;
+
+	/* move lock to local convert queue */
+	list_del(&lock->list);
+	list_add_tail(&lock->list, &res->converting);
+	if (lock->convert_type != LKM_IVMODE) {
+		dlmprintk0("error! converting a remote lock that is already "
+			   "converting!\n");
+		/* TODO: return correct error */
+		BUG();
+	}
+	lock->convert_type = type;
+
+	if (flags & LKM_VALBLK) {
+		if (lock->type == LKM_EXMODE) {
+			flags |= LKM_PUT_LVB;
+			lock->lksb->flags |= DLM_LKSB_PUT_LVB;
+		} else {
+			if (lock->convert_type == LKM_NLMODE) {
+				dlmprintk0("erm, no point in specifying "
+					   "LKM_VALBLK if converting to NL\n");
+				flags &= ~LKM_VALBLK;
+			} else {
+				flags |= LKM_GET_LVB;
+				lock->lksb->flags |= DLM_LKSB_GET_LVB;
+			}
+		}
+	}
+	spin_unlock(&res->spinlock);
+
+	/* no locks held here.
+	 * need to wait for a reply as to whether it got queued or not. */
+	status = dlm_send_remote_convert_request(dlm, res, lock, flags, type);
+	
+	spin_lock(&res->spinlock);
+	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
+
+	/* if it failed, move it back to granted queue */
+	if (status != DLM_NORMAL) {
+		list_del(&lock->list);
+		list_add_tail(&lock->list, &res->granted);
+		lock->convert_type = LKM_IVMODE;
+		lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
+	}
+bail:
+	spin_unlock(&res->spinlock);
+
+	/* TODO: should this be a wake_one? */
+	/* wake up any IN_PROGRESS waiters */
+	wake_up(&res->wq);
+
+	return status;
+}
+
+/* sends DLM_CONVERT_LOCK_MSG to master site
+ * locking:
+ *   caller needs:  none
+ *   taken:         none
+ *   held on exit:  none
+ * returns: DLM_NOLOCKMGR, status from remote node
+ */
+dlm_status dlm_send_remote_convert_request(dlm_ctxt *dlm, 
+					   dlm_lock_resource *res, 
+					   dlm_lock *lock, int flags, int type)
+{
+	struct inode *inode = NULL;
+	dlm_convert_lock convert;
+	int tmpret;
+	dlm_status ret;
+	int status = 0;
+	struct iovec iov[2];
+	size_t iovlen = 1;
+
+	dlmprintk0("\n");
+
+	memset(&convert, 0, sizeof(dlm_convert_lock));
+	convert.node_idx = dlm->group_index;
+	convert.requested_type = type;
+	convert.cookie = lock->cookie;
+	convert.namelen = res->lockname.len;
+	convert.flags = flags;
+	strncpy(convert.name, res->lockname.name, convert.namelen);
+	
+	iov[0].iov_len = sizeof(dlm_convert_lock);
+	iov[0].iov_base = &convert;
+	
+	if (flags & LKM_PUT_LVB) {
+		/* extra data to send if we are updating lvb */
+		iov[1].iov_len = DLM_LVB_LEN;
+		iov[1].iov_base = lock->lksb->lvb;
+		iovlen++;
+	}
+
+	ret = DLM_NOLOCKMGR;
+	inode = nm_get_group_node_by_index(dlm->group, res->owner);
+	if (inode) {
+		dlm_convert_lock_to_net(&convert);
+		tmpret = net_send_message_iov(DLM_CONVERT_LOCK_MSG, dlm->key, 
+					      iov, iovlen, inode, &status);
+		if (tmpret >= 0) {
+			// successfully sent and received
+			ret = status;  // this is already a dlm_status
+		} else {
+			dlmprintk("error occurred in net_send_message: %d\n", 
+				  tmpret);
+			ret = dlm_err_to_dlm_status(tmpret);
+		}
+		iput(inode);
+	}
+
+	return ret;
+}
+
+
+/* handler for DLM_CONVERT_LOCK_MSG on master site
+ * locking:
+ *   caller needs:  none
+ *   taken:         takes and drop res->spinlock
+ *   held on exit:  none
+ * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS, 
+ *          status from __dlmconvert_master
+ */
+int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data)
+{
+	dlm_ctxt *dlm = data;
+	dlm_convert_lock *cnv = (dlm_convert_lock *)msg->buf;
+	dlm_lock_resource *res = NULL;
+	struct list_head *iter;
+	dlm_lock *lock = NULL;
+	dlm_lockstatus *lksb;
+	dlm_status status = DLM_NORMAL;
+	struct qstr lockname;
+	u32 flags;
+	int call_ast = 0, kick_thread = 0;
+	int found = 0;
+
+	dlm_convert_lock_to_host(cnv);
+	lockname.name = cnv->name;
+	lockname.len = cnv->namelen;
+	flags = cnv->flags;
+
+	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
+	     (LKM_PUT_LVB|LKM_GET_LVB)) {
+		dlmprintk("both PUT and GET lvb specified\n");
+		status = DLM_BADARGS;
+		goto leave;
+	}
+
+	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : 
+		  (flags & LKM_GET_LVB ? "get lvb" : "none"));
+	
+	lockname.hash = full_name_hash(lockname.name, lockname.len);
+
+	status = DLM_IVLOCKID;
+	res = dlm_lookup_lock(dlm, &lockname);
+	if (!res)
+		goto leave;
+
+	spin_lock(&res->spinlock);
+	list_for_each(iter, &res->granted) {
+		lock = list_entry(iter, dlm_lock, list);
+		if (lock->cookie == cnv->cookie &&
+		    lock->node == cnv->node_idx) {
+			found = 1;
+			break;
+		}
+	}
+	spin_unlock(&res->spinlock);
+	if (!found)
+		goto leave;
+
+	/* found the lock */
+	lksb = lock->lksb;
+
+	/* see if caller needed to get/put lvb */
+	if (flags & LKM_PUT_LVB) {
+		DLM_ASSERT(!(lksb->flags &
+			     (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)));
+		lksb->flags |= DLM_LKSB_PUT_LVB;
+		memcpy(&lksb->lvb[0], &cnv->lvb[0], DLM_LVB_LEN);
+	} else if (flags & LKM_GET_LVB) {
+		DLM_ASSERT(!(lksb->flags &
+			     (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)));
+		lksb->flags |= DLM_LKSB_GET_LVB;
+	}
+
+#warning i think we need some handling of IN_PROGRESS here!
+	status = __dlmconvert_master(dlm, res, lock, flags, cnv->requested_type,
+				     &call_ast, &kick_thread);
+
+	if (status != DLM_NORMAL)
+		lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
+
+leave:
+	if (!lock)
+		dlmprintk("did not find lock to convert on "
+			  "grant queue! cookie=%llu\n", cnv->cookie);
+
+#warning fix all ast calling!!!
+	if (call_ast)
+		if (dlm_do_ast(dlm, res, lock) < 0)
+			dlmprintk0("eek\n");
+	if (kick_thread)
+		dlm_kick_thread(dlm, res);
+
+	return status;
+}

Added: trunk/cluster/dlmlock.c
===================================================================
--- trunk/cluster/dlmlock.c	2005-01-18 00:55:48 UTC (rev 1780)
+++ trunk/cluster/dlmlock.c	2005-01-18 01:13:40 UTC (rev 1781)
@@ -0,0 +1,324 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * dlmlock.c
+ *
+ * underlying calls for lock creation
+ *
+ * Copyright (C) 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ *
+ * Authors: Kurt Hackel
+ */
+
+#include "warning_hack.h"
+
+#include "dlm_compat.h"
+#include "util.h"
+#include "dlmcommon.h"
+
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+#include <linux/utsname.h>
+#include <linux/init.h>
+#include <linux/sysctl.h>
+#include <linux/random.h>
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
+#include <linux/statfs.h>
+#include <linux/moduleparam.h>
+#endif
+#include <linux/blkdev.h>
+#include <linux/socket.h>
+#include <linux/inet.h>
+#include <linux/spinlock.h>
+
+
+#include "heartbeat.h"
+#include "nodemanager.h"
+#include "tcp.h"
+#include "dlmmod.h"
+
+/* performs lock creation at the lockres master site
+ * locking:
+ *   caller needs:  none
+ *   taken:         takes and drops res->spinlock
+ *   held on exit:  none
+ * returns: DLM_NORMAL, DLM_NOTQUEUED
+ */
+dlm_status dlmlock_master(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags)
+{
+	struct list_head *iter;
+	dlm_lock *tmplock;
+	int call_ast = 0;
+	dlm_status status = DLM_NORMAL;
+
+	DLM_ASSERT(lock);
+	DLM_ASSERT(res);
+	DLM_ASSERT(dlm);
+	DLM_ASSERT(lock->lksb);
+
+	dlmprintk("type=%d\n", lock->type);
+
+	/* this will effectively spin_lock(&res->spinlock) */
+	dlm_wait_on_lockres(res);
+	res->state |= DLM_LOCK_RES_IN_PROGRESS;
+
+	/* for NOQUEUE request, unless we get 
+	 * lock right away, return DLM_NOTQUEUED */
+	if (flags & LKM_NOQUEUE)
+		status = DLM_NOTQUEUED;
+
+	list_for_each(iter, &res->granted) {
+		tmplock = list_entry(iter, dlm_lock, list);
+		if (!dlm_lock_compatible(tmplock->type, lock->type)) {
+			list_add_tail(&lock->list, &res->blocked);
+			goto done;
+		}
+	}
+
+	list_for_each(iter, &res->converting) {
+		tmplock = list_entry(iter, dlm_lock, list);
+		if (!dlm_lock_compatible(tmplock->type, lock->type)) {
+			list_add_tail(&lock->list, &res->blocked);
+			goto done;
+		}
+	}
+
+	/* got it right away */
+	lock->lksb->status = DLM_NORMAL;
+	status = DLM_NORMAL;
+	list_add_tail(&lock->list, &res->granted);
+	call_ast = 1;
+
+done:
+	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
+	spin_unlock(&res->spinlock);
+	wake_up(&res->wq);
+
+	dlm_kick_thread(dlm, res);
+
+	if (call_ast) {
+#warning fix all ast calling!!!
+		if (dlm_do_ast(dlm, res, lock) < 0)
+			dlmprintk0("eek\n");
+	}
+
+	return status;
+}
+
+/* 
+ * locking:
+ *   caller needs:  none
+ *   taken:         takes and drops res->spinlock
+ *   held on exit:  none
+ * returns: DLM_DENIED, DLM_RECOVERING, or net status
+ */
+dlm_status dlmlock_remote(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			  dlm_lock *lock, int flags)
+{
+	dlm_status status = DLM_DENIED;
+	
+	dlmprintk("type=%d\n", lock->type);
+
+	spin_lock(&res->spinlock);
+	if (res->state & DLM_LOCK_RES_RECOVERING) {
+		status = DLM_RECOVERING;
+		goto bail;
+	}
+
+	/* will exit this call with spinlock held */
+	__dlm_wait_on_lockres(res);
+	res->state |= DLM_LOCK_RES_IN_PROGRESS;
+	/* add lock to local (secondary) queue */
+	list_add_tail(&lock->list, &res->blocked);
+	spin_unlock(&res->spinlock);
+
+	/* spec seems to say that you will get DLM_NORMAL when the lock 
+	 * has been queued, meaning we need to wait for a reply here. */
+	status = dlm_send_remote_lock_request(dlm, res, lock, flags);
+	
+	spin_lock(&res->spinlock);
+	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
+	if (status != DLM_NORMAL) {
+		/* remove from local queue if it failed */
+		list_del(&lock->list);
+	}
+bail:
+	spin_unlock(&res->spinlock);
+	wake_up(&res->wq);
+	return status;
+}
+
+
+/* for remote lock creation.
+ * locking:
+ *   caller needs:  none, but need res->state & DLM_LOCK_RES_IN_PROGRESS
+ *   taken:         none
+ *   held on exit:  none
+ * returns: DLM_NOLOCKMGR, or net status
+ */
+dlm_status dlm_send_remote_lock_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags)
+{
+	struct inode *inode = NULL;
+	dlm_create_lock create;
+	int tmpret, status = 0;
+	dlm_status ret;
+
+	dlmprintk0("\n");
+
+	memset(&create, 0, sizeof(create));
+	create.node_idx = dlm->group_index;
+	create.requested_type = lock->type;
+	create.cookie = lock->cookie;
+	create.namelen = res->lockname.len;
+	create.flags = flags;
+	strncpy(create.name, res->lockname.name, create.namelen);
+
+	ret = DLM_NOLOCKMGR;
+	inode = nm_get_group_node_by_index(dlm->group, res->owner);
+	if (inode) {
+		dlm_create_lock_to_net(&create);
+		tmpret = net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, sizeof(create), inode, &status);
+		if (tmpret >= 0) {
+			// successfully sent and received
+			ret = status;  // this is already a dlm_status
+		} else {
+			dlmprintk("error occurred in net_send_message: %d\n", tmpret);
+			ret = dlm_err_to_dlm_status(tmpret);
+		}
+		iput(inode);
+	}
+
+	return ret;
+}
+
+/* handler for lock creation net message
+ * locking:
+ *   caller needs:  none
+ *   taken:         takes and drops res->spinlock
+ *   held on exit:  none
+ * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
+ */
+int dlm_create_lock_handler(net_msg *msg, u32 len, void *data)
+{
+	dlm_ctxt *dlm = data;
+	dlm_create_lock *create = (dlm_create_lock *)msg->buf;
+	dlm_lock_resource *res;
+	dlm_lock *newlock = NULL, *tmplock;
+	dlm_lockstatus *lksb = NULL;
+	dlm_status status = DLM_NORMAL;
+	struct qstr lockname;
+	struct list_head *iter;
+	int call_ast = 0;
+
+	DLM_ASSERT(dlm);
+
+	dlm_create_lock_to_host(create);
+	lockname.name = create->name;
+	lockname.len = create->namelen;
+	
+	dlmprintk0("\n");
+
+	lockname.hash = full_name_hash(lockname.name, lockname.len);
+
+	status = DLM_SYSERR;
+	newlock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
+	if (!newlock)
+		goto leave;
+	
+	lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
+	if (!lksb)
+		goto leave;
+		
+	memset(newlock, 0, sizeof(dlm_lock));
+	INIT_LIST_HEAD(&newlock->list);
+	INIT_LIST_HEAD(&newlock->ast_list);
+	spin_lock_init(&newlock->spinlock);
+	newlock->type = create->requested_type;
+	newlock->convert_type = LKM_IVMODE;
+	newlock->highest_blocked = LKM_IVMODE;
+	newlock->node = create->node_idx;
+	newlock->ast = NULL;
+	newlock->bast = NULL;
+	newlock->astdata = NULL;
+	newlock->cookie = create->cookie;
+
+	memset(lksb, 0, sizeof(dlm_lockstatus));
+	newlock->lksb = lksb;
+	lksb->lockid = newlock;
+	lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
+
+	status = DLM_IVLOCKID;
+	res = dlm_lookup_lock(dlm, &lockname);
+	if (!res)
+		goto leave;
+
+	/* found lock resource */
+	status = DLM_NORMAL;
+	spin_lock(&res->spinlock);
+	newlock->lockres = res;
+
+	/* for NOQUEUE request, unless we get 
+	 * lock right away, return DLM_NOTQUEUED */
+	if (create->flags & LKM_NOQUEUE)
+		status = DLM_NOTQUEUED;
+
+	/* see if any granted locks are blocking us */
+	list_for_each(iter, &res->granted) {
+		tmplock = list_entry(iter, dlm_lock, list);
+		if (!dlm_lock_compatible(tmplock->type, newlock->type)) {
+			list_add_tail(&newlock->list, &res->blocked);
+			goto blocked;
+		}
+	}
+	list_for_each(iter, &res->converting) {
+		tmplock = list_entry(iter, dlm_lock, list);
+		if (!dlm_lock_compatible(tmplock->type, newlock->type)){
+			list_add_tail(&newlock->list, &res->blocked);
+			goto blocked;
+		}
+	}
+
+	/* got it right away */
+	newlock->lksb->status = DLM_NORMAL;
+	status = DLM_NORMAL;
+	list_add_tail(&newlock->list, &res->granted);
+	call_ast = 1;
+
+blocked:
+	spin_unlock(&res->spinlock);
+
+#warning fix all ast calling!!!
+	if (call_ast)
+		if (dlm_do_ast(dlm, res, newlock) < 0)
+			dlmprintk0("eek\n");
+	
+	dlm_kick_thread(dlm, res);
+
+leave:
+	if (status != DLM_NORMAL) {
+		if (newlock)
+			kfree(newlock);
+		if (lksb)
+			kfree(lksb);
+	}
+
+	return status;
+}

Modified: trunk/cluster/dlmmaster.c
===================================================================
--- trunk/cluster/dlmmaster.c	2005-01-18 00:55:48 UTC (rev 1780)
+++ trunk/cluster/dlmmaster.c	2005-01-18 01:13:40 UTC (rev 1781)
@@ -55,12 +55,13 @@
 #include "tcp.h"
 #include "dlmmod.h"
 
-
-
 spinlock_t dlm_master_lock = SPIN_LOCK_UNLOCKED;
 LIST_HEAD(dlm_master_list);
 
+/* gives a really vague idea of the system load */
+atomic_t dlm_num_resources = ATOMIC_INIT(0);
 
+
 static int dlm_init_mle(dlm_master_list_entry *mle, int type, dlm_ctxt *dlm, 
 			 dlm_lock_resource *res, struct qstr *name, int locked);
 
@@ -95,9 +96,12 @@
 	clear_bit(dlm->group_index, mle->vote_map);
 	clear_bit(dlm->group_index, mle->node_map);
 
-#warning cannot do this here cuz this kmallocs and we are under a spinlock dammit
-	if (hb_register_callback(HB_NODE_DOWN_CB, dlm_mle_node_down, mle, DLM_HB_NODE_DOWN_PRI+1) ||
-	    hb_register_callback(HB_NODE_UP_CB, dlm_mle_node_up, mle, DLM_HB_NODE_UP_PRI+1)) {
+#warning cannot do this here cuz this kmallocs and we are under a spinlock 
+	if (hb_register_callback(HB_NODE_DOWN_CB, dlm_mle_node_down, mle, 
+				 DLM_HB_NODE_DOWN_PRI+1) 
+	    ||
+	    hb_register_callback(HB_NODE_UP_CB, dlm_mle_node_up, mle, 
+				 DLM_HB_NODE_UP_PRI+1)) {
 		ret = -EINVAL;
 	}
 
@@ -178,7 +182,8 @@
  * to assert_master (or die).
  *
  */
-dlm_lock_resource * dlm_get_lock_resource(dlm_ctxt *dlm, struct qstr *lockname, int flags)
+dlm_lock_resource * dlm_get_lock_resource(dlm_ctxt *dlm, 
+					  struct qstr *lockname, int flags)
 {
 	dlm_lock_resource *tmpres=NULL, *res=NULL;
 	struct list_head *bucket;
@@ -194,12 +199,13 @@
 	mle = kmalloc(sizeof(dlm_master_list_entry), GFP_KERNEL);
 	res = kmalloc(sizeof(dlm_lock_resource), GFP_KERNEL);
 	if (!mle || !res) {
-		dlmprintk0("could not allocate memory for new lock resource!\n");
+		dlmprintk0("could not allocate memory for new lock resource\n");
 		if (mle)
 			kfree(mle);
 		if (res)
 			kfree(res);
-		return NULL;
+		res = NULL;
+		goto leave;
 	}
 
 	/* check for pre-existing lock */
@@ -210,10 +216,8 @@
 		/* TODO: return error, or return the lockres ?!? */
 		kfree(res);
 		kfree(mle);
-		/* waits for any outstanding work to finish 
-		 * will hold tmpres->spinlock on exit */
-		dlm_wait_on_lockres(tmpres);
-		return tmpres;
+		res = tmpres;
+		goto leave;
 	}
 
 	dlm_init_lockres(res, lockname);
@@ -223,20 +227,13 @@
 		 * DONE!  return right away */
 		list_add_tail(&res->list, bucket);
 		res->owner = dlm->group_index;
-		res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
-	
-		/* return with res->spinlock held */
-
-		/* lock ordering note: this lockres will not be 
-		 * visible until i release dlm->spinlock, so it 
-		 * is ok to release dlm->spinlock out of order here */
-		spin_lock(&res->spinlock);
-		
+		atomic_inc(&dlm_num_resources);
 		spin_unlock(&dlm->spinlock);
-		return res;
+		/* lockres still marked IN_PROGRESS */
+		goto wake_waiters;
 	}
 		
-	/* look in master list to see if another node has started mastering this */
+	/* check master list to see if another node has started mastering it */
 	spin_lock(&dlm_master_lock);
 	list_for_each(iter, &dlm_master_list) {
 		tmpmle = list_entry(iter, dlm_master_list_entry, list);
@@ -244,12 +241,12 @@
 			continue;
 
 		if (tmpmle->type == DLM_MLE_MASTER) {
-			dlmprintk0("impossible!  master entry for nonexistent lock!\n");
+			dlmprintk0("eek! master entry for nonexistent lock!\n");
 			BUG();
 		}
 		dlm_get_mle(tmpmle);
 		blocked = 1;
-		// found a block!  must wait for lock to be mastered by another node
+		// found a block, wait for lock to be mastered by another node
 		break;
 	}
 
@@ -265,15 +262,17 @@
 
 	/* at this point there is either a DLM_MLE_BLOCK or a DLM_MLE_MASTER 
 	 * on the master list, so it's safe to add the lockres to the hashtable.
-	 * anyone who finds the lock will still have to wait on the IN_PROGRESS. 
+	 * anyone who finds the lock will still have to wait on the IN_PROGRESS.
 	 * also, any new nodes that try to join at this point will have to wait
 	 * until my dlm_master_lock list is empty, so they cannot possibly 
 	 * do any master requests yet... TODO
 	 * ?? should i have a special type of mle just for joining nodes ?? 
-	 * ?? could allow them to come in and put their mle on the list and sleep ?? */
+	 * ?? could allow them to come in and put their mle 
+	 *    on the list and sleep ?? */
 
 	/* finally add the lockres to its hash bucket */
 	list_add_tail(&res->list, bucket);
+	atomic_inc(&dlm_num_resources);
 	spin_unlock(&dlm->spinlock);
 
 	if (blocked) {
@@ -295,7 +294,7 @@
 		ret = dlm_do_master_request(mle, bit);
 		if (ret < 0) {
 			// TODO
-			//dlmprintk("dlm_do_master_request returned %d!\n", ret);
+			//dlmprintk("dlm_do_master_request returned %d\n", ret);
 		}
 		if (mle->master != NM_MAX_NODES) {
 			// found a master!
@@ -326,64 +325,86 @@
 			break;
 		}
 		restart = 0;
-		map_changed = (memcmp(mle->vote_map, mle->node_map, sizeof(mle->vote_map)) != 0);
-		if (memcmp(mle->vote_map, mle->response_map, sizeof(mle->vote_map)) == 0) {
+		map_changed = (memcmp(mle->vote_map, mle->node_map, 
+				      sizeof(mle->vote_map)) != 0);
+		if (memcmp(mle->vote_map, mle->response_map, 
+			   sizeof(mle->vote_map)) == 0) {
 			// dlmprintk("every node has responded...\n");
 			if (map_changed) {
-				dlmprintk0("eek! got all original nodes, but nodemap changed while collecting responses\n");
+				dlmprintk0("eek! got all original nodes, but "
+					   "nodemap changed while collecting "
+					   "responses\n");
 				restart = 1;
 			}
 
 			if (mle->error) {
-				dlmprintk0("ugh.  some node hit an error (-ENOMEM).  try the whole thing again\n"); 
+				dlmprintk0("ugh.  some node hit an error "
+					   "(-ENOMEM).  try the whole thing "
+					   "again\n"); 
 				mle->error = 0;
-				/* TODO: treat this just like the dead node case below,
-				 * cleanup and start over, but keep the error node around */
+				/* TODO: treat this just like the dead node 
+				 * case below, cleanup and start over, but 
+				 * keep the error node around */
 				restart = 1;
 			}
 
-			if ((bit = find_next_bit (mle->maybe_map, NM_MAX_NODES, 0)) >= NM_MAX_NODES) {
-				/* no other nodes are in-progress */
-				/* those nodes should all be locking out this lockid until I assert */
-				/* they should have put a dummy entry on dlm_master_list */
-				/* need to assert myself as the master */
-				
-				// dlmprintk0("I am the only node in-progress!  asserting myself as master\n");
+			bit = find_next_bit(mle->maybe_map, NM_MAX_NODES, 0);
+			if (bit >= NM_MAX_NODES) {
+				/* No other nodes are in-progress. Those nodes 
+				 * should all be locking out this lockid until 
+				 * I assert. They should have put a dummy entry
+				 * on dlm_master_list. Need to assert myself as
+				 * the master. */ 
+				// dlmprintk0("I am the only node in-progress!"
+				// "  asserting myself as master\n");
 				assert = 1;
 			} else {
 				/* other nodes are in-progress */
-				if (map_changed && !test_bit(bit, mle->node_map)) {
-					/* TODO: need to copy the node_map into the vote_map, zero 
-					 * everything out and start over */
-					dlmprintk("need to handle this case!  winning node %u just died!\n", bit);
+				if (map_changed && 
+				    !test_bit(bit, mle->node_map)) {
+					/* TODO: need to copy the node_map into
+					 * the vote_map, zero everything out 
+					 * and start over */
+					dlmprintk("need to handle this case.  "
+						  "winning node %u just died\n",
+						  bit);
 					restart = 1;
 				}
 
 				if (bit > dlm->group_index) {
-					// dlmprintk("next in-progress node (%u) is higher than me (%u)\n",
+					// dlmprintk("next in-progress node "
+					// "(%u) is higher than me (%u)\n",
 					//        bit, dlm->group_index);
 
-					/* nodes not in-progress should be locking out this lockid until I assert */
-					/* in-progress nodes should match me up with their lowest maybe_map bit */
-					/* need to assert myself as the master */
-
-					// dlmprintk("I am the lowest node!  asserting myself as master\n");
+					/* Nodes not in-progress should be 
+					 * locking out this lockid until I 
+					 * assert. In-progress nodes should 
+					 * match me up with their lowest 
+					 * maybe_map bit. Need to assert myself
+					 * as the master */
+					// dlmprintk("I am the lowest node!  "
+					// "asserting myself as master\n");
 					assert = 1;
 				} else {
-					/* need to sit around and wait for assert */
-					/* my lowest maybe_map bit should be the one to assert */
-					/* just fall through and sleep. should be woken by the handler */
-
-					// dlmprintk("sleeping while waiting for %u to assert himself as master\n", bit);
+					/* Need to sit around and wait for 
+					 * assert. My lowest maybe_map bit 
+					 * should be the one to assert. Just 
+					 * fall through and sleep. Should be 
+					 * woken by the handler. */
+					// dlmprintk("sleeping while waiting "
+					// "for %u to assert himself as "
+					// "master\n", bit);
 				}
 			}
 		} else {
 			if (map_changed) {
 				/* TODO: need to handle this */
-				dlmprintk0("eek! nodemap changed while collecting responses\n");
+				dlmprintk0("eek! nodemap changed while "
+					   "collecting responses\n");
 				restart = 1;
 			}
-			// dlmprintk0("still waiting for all nodes to respond...\n");
+			// dlmprintk0("still waiting for all nodes to "
+			// "respond...\n");
 		}
 
 		if (restart && assert)
@@ -408,7 +429,8 @@
 				restart = 1;
 		}
 		if (restart) {
-			dlmprintk0("something happened such that the master process needs to be restarted!\n");
+			dlmprintk0("something happened such that the master "
+				   "process needs to be restarted!\n");
 			/* TODO: clear it all out and start over */
 		}
 
@@ -427,11 +449,13 @@
 	}
 	dlm_put_mle(mle);
 
+wake_waiters:
 	spin_lock(&res->spinlock);
 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
+	spin_unlock(&res->spinlock);
 	wake_up(&res->wq);
 
-	/* exits holding res->spinlock */
+leave:
 	return res;
 }
 	
@@ -491,7 +515,8 @@
 		 * being blocked, or it is actively trying to
 		 * master this lock. */
 		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
-			dlmprintk0("bug! lock with no owner should be in-progress!\n");
+			dlmprintk0("bug! lock with no owner should be "
+				   "in-progress!\n");
 			BUG();
 		}
 
@@ -506,10 +531,12 @@
 			dlm_get_mle(tmpmle);
 			spin_lock(&tmpmle->spinlock);
 			if (tmpmle->type == DLM_MLE_BLOCK) {
-				// dlmprintk0("this node is waiting for lockres to be mastered\n");
+				// dlmprintk0("this node is waiting for "
+				// "lockres to be mastered\n");
 				response = DLM_MASTER_RESP_NO;
 			} else {
-				// dlmprintk0("this node is attempting to master lockres\n");
+				// dlmprintk0("this node is attempting to "
+				// "master lockres\n");
 				response = DLM_MASTER_RESP_MAYBE;
 			}
 			set_bit(request->node_idx, tmpmle->maybe_map);
@@ -553,13 +580,15 @@
 			spin_unlock(&dlm_master_lock);
 			spin_unlock(&dlm->spinlock);
 	
-			mle = kmalloc(sizeof(dlm_master_list_entry) + lockname.len, GFP_KERNEL);
+			mle = kmalloc(sizeof(dlm_master_list_entry) + 
+				      lockname.len, GFP_KERNEL);
 			if (!mle) {
 				// bad bad bad... this sucks.
 				response = DLM_MASTER_RESP_ERROR;
 				goto send_response;
 			}
-			if (dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, &lockname, 0)) {
+			if (dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, 
+					 &lockname, 0)) {
 				dlmprintk0("eeek!\n");
 				response = DLM_MASTER_RESP_ERROR;
 				dlm_put_mle(mle);
@@ -568,7 +597,8 @@
 			goto way_up_top;
 		}
 
-		// dlmprintk0("this is second time thru, already allocated, add the block.\n");
+		// dlmprintk0("this is second time thru, already allocated, "
+		// "add the block.\n");
 		set_bit(request->node_idx, mle->maybe_map);
 		list_add(&mle->list, &dlm_master_list);
 		response = DLM_MASTER_RESP_NO;
@@ -587,15 +617,16 @@
 	spin_unlock(&dlm->spinlock);
 
 send_response:
-	//ret = dlm_do_master_request_resp(dlm, &lockname, response, request->node_idx);
+	//ret = dlm_do_master_request_resp(dlm, &lockname, response, 
+	//				   request->node_idx);
 	//dlmprintk("response returned %d\n", ret);
-	
-	// dlmprintk("sending response %d to other node\n", response);
+	//dlmprintk("sending response %d to other node\n", response);
 	return response;
 }
 
-/* NOTE: when doing node recovery, run the dlm_master_list looking for the dead node in 
- * any maybe_map... clear that bit, and if now empty, clear the whole thing */
+/* NOTE: when doing node recovery, run the dlm_master_list looking for the 
+ * dead node in any maybe_map... clear that bit, and if now empty, clear the 
+ * whole thing */
 
 /*
  * locks that can be taken here:
@@ -636,25 +667,32 @@
 		switch (resp->response) {
 			case DLM_MASTER_RESP_YES:
 				set_bit(resp->node_idx, mle->response_map);
-				// dlmprintk("woot!  node %u is the master!\n", resp->node_idx);
+				// dlmprintk("woot!  node %u is the master!\n",
+				// resp->node_idx);
 				mle->master = resp->node_idx;
 				wake = 1;
 				break;
 			case DLM_MASTER_RESP_NO:
-				// dlmprintk("node %u is not the master, not in-progress\n", resp->node_idx);
+				// dlmprintk("node %u is not the master, not "
+				// "in-progress\n", resp->node_idx);
 				set_bit(resp->node_idx, mle->response_map);
-				if (memcmp(mle->response_map, mle->vote_map, sizeof(mle->vote_map))==0)
+				if (memcmp(mle->response_map, mle->vote_map, 
+					   sizeof(mle->vote_map))==0)
 					wake = 1;
 				break;
 			case DLM_MASTER_RESP_MAYBE:
-				// dlmprintk("node %u is not the master, but IS in-progress\n", resp->node_idx);
+				// dlmprintk("node %u is not the master, but IS"
+				// " in-progress\n", resp->node_idx);
 				set_bit(resp->node_idx, mle->response_map);
 				set_bit(resp->node_idx, mle->maybe_map);
-				if (memcmp(mle->response_map, mle->vote_map, sizeof(mle->vote_map))==0)
+				if (memcmp(mle->response_map, mle->vote_map, 
+					   sizeof(mle->vote_map))==0)
 					wake = 1;
 				break;
 			case DLM_MASTER_RESP_ERROR:
-				dlmprintk("node %u hit an -ENOMEM!  try this whole thing again\n", resp->node_idx);
+				dlmprintk("node %u hit an -ENOMEM!  try this "
+					  "whole thing again\n", 
+					  resp->node_idx);
 				mle->error = 1;
 				wake = 1;
 				break;
@@ -674,7 +712,8 @@
 	if (found)
 		dlm_put_mle(mle);
 	else
-		dlmprintk0("hrrm... got a master resp but found no matching request\n");
+		dlmprintk0("hrrm... got a master resp but found no matching "
+			   "request\n");
 	return 0;
 }
 
@@ -715,19 +754,21 @@
 		mle = NULL;
 	}
 	if (!mle) {
-		dlmprintk("EEEEEEK!  just got an assert_master from %u, but no MLE for it!\n",
+		dlmprintk("EEEEEEK!  just got an assert_master from %u, but no "
+			  "MLE for it!\n",
 		       assert->node_idx);
 		spin_unlock(&dlm_master_lock);
 		goto check_lockres;
 	}
-	if ((bit = find_next_bit (mle->maybe_map, NM_MAX_NODES, 0)) >= NM_MAX_NODES) {
-		dlmprintk("EEK! no bits set in the maybe_map, but %u is asserting!\n",
-		       assert->node_idx);
+	bit = find_next_bit (mle->maybe_map, NM_MAX_NODES, 0);
+	if (bit >= NM_MAX_NODES) {
+		dlmprintk("EEK! no bits set in the maybe_map, but %u is "
+			  "asserting!\n", assert->node_idx);
 		BUG();
 	} else if (bit != assert->node_idx) {
 		/* TODO: is this ok?  */
-		dlmprintk("EEK! expected %u to be the master, but %u is asserting!\n", 
-		       bit, assert->node_idx);
+		dlmprintk("EEK! expected %u to be the master, but %u is "
+			  "asserting!\n", bit, assert->node_idx);
 		BUG();
 	}
 	spin_unlock(&dlm_master_lock);
@@ -740,20 +781,24 @@
 		spin_lock(&res->spinlock);
 		if (!mle) {
 			if (res->owner != assert->node_idx) {
-				dlmprintk("EEEEeeEEeeEEEK!  assert_master from %u, but current owner is %u!\n",
+				dlmprintk("EEEEeeEEeeEEEK!  assert_master from "
+					  "%u, but current owner is %u!\n",
 				       assert->node_idx, res->owner);
 				BUG();
 			}
 		} else {
 			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
-				dlmprintk("EEEEEEEEEEEEEEEEEK!!! got assert_master from node %u, but %u is the owner!\n",
-			       		assert->node_idx, res->owner);
+				dlmprintk("EEEEEEEEEEEEEEEEEK!!! got "
+					  "assert_master from node %u, but %u "
+					  "is the owner!\n", assert->node_idx, 
+					  res->owner);
 				dlmprintk0("goodnite!\n");
 				BUG();
 			}
 			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
-				dlmprintk("bug! got assert from %u, but lock with no owner should be in-progress!\n",
-			       		assert->node_idx);
+				dlmprintk("bug! got assert from %u, but lock "
+					  "with no owner should be "
+					  "in-progress!\n", assert->node_idx);
 				BUG();
 			}
 		}
@@ -761,7 +806,8 @@
 	}
 	spin_unlock(&dlm->spinlock);
 
-	// dlmprintk("woo!  got an assert_master from node %u!\n", assert->node_idx);
+	// dlmprintk("woo!  got an assert_master from node %u!\n", 
+	// 	     assert->node_idx);
 	if (mle) {
 		spin_lock(&mle->spinlock);
 		mle->master = assert->node_idx;
@@ -790,38 +836,46 @@
 		strncpy(request.name, mle->u.name.name, request.namelen);
 	} else {
 		request.namelen = mle->u.res->lockname.len;
-		strncpy(request.name, mle->u.res->lockname.name, request.namelen);
+		strncpy(request.name, mle->u.res->lockname.name, 
+			request.namelen);
 	}
 
 	ret = -EINVAL;
 	inode = nm_get_group_node_by_index(dlm->group, to);
 	if (inode) {
 		dlm_master_request_to_net(&request);
-		ret = net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request, sizeof(request), inode, &response);
+		ret = net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, 
+				       &request, sizeof(request), 
+				       inode, &response);
 		iput(inode);
 		if (ret >= 0) {
 			spin_lock(&mle->spinlock);
 			switch (response) {
 				case DLM_MASTER_RESP_YES:
 					set_bit(to, mle->response_map);
-					// dlmprintk("woot!  node %u is the master!\n", to);
+					// dlmprintk("woot!  node %u is the "
+					// "master!\n", to);
 					mle->master = to;
 					break;
 				case DLM_MASTER_RESP_NO:
-					// dlmprintk("node %u is not the master, not in-progress\n", to);
+					// dlmprintk("node %u is not the "
+					// "master, not in-progress\n", to);
 					set_bit(to, mle->response_map);
 					break;
 				case DLM_MASTER_RESP_MAYBE:
-					// dlmprintk("node %u is not the master, but IS in-progress\n", to);
+					// dlmprintk("node %u is not the "
+					// "master, but IS in-progress\n", to);
 					set_bit(to, mle->response_map);
 					set_bit(to, mle->maybe_map);
 					break;
 				case DLM_MASTER_RESP_ERROR:
-					dlmprintk("node %u hit an -ENOMEM!  try this whole thing again\n", to);
+					dlmprintk("node %u hit an -ENOMEM!  "
+						  "try everything again\n", to);
 					mle->error = 1;
 					break;
 				default:
-					dlmprintk("bad response! %u\n", response);
+					dlmprintk("bad response! %u\n", 
+						  response);
 					ret = -EINVAL;
 					break;
 			}
@@ -830,12 +884,14 @@
 			dlmprintk("net_send_message returned %d!\n", ret);
 		}
 	} else {
-		dlmprintk("nm_get_group_node_by_index failed to find inode for node %d!\n", to);
+		dlmprintk("nm_get_group_node_by_index failed to find inode "
+			  "for node %d!\n", to);
 	}	
 	return ret;
 }
 
-int dlm_do_master_request_resp(dlm_ctxt *dlm, struct qstr *name, int response, int to)
+int dlm_do_master_request_resp(dlm_ctxt *dlm, struct qstr *name, 
+			       int response, int to)
 {
 	struct inode *inode = NULL;
 	dlm_master_request_resp resp;
@@ -852,7 +908,8 @@
 		return -EINVAL;
 
 	dlm_master_request_resp_to_net(&resp);
-	ret = net_send_message(DLM_MASTER_REQUEST_RESP_MSG, dlm->key, &resp, sizeof(resp), inode, NULL);
+	ret = net_send_message(DLM_MASTER_REQUEST_RESP_MSG, dlm->key, 
+			       &resp, sizeof(resp), inode, NULL);
 	iput(inode);
 	return ret;
 }
@@ -884,19 +941,22 @@
 			strncpy(assert.name, mle->u.name.name, assert.namelen);
 		} else {
 			assert.namelen = mle->u.res->lockname.len;
-			strncpy(assert.name, mle->u.res->lockname.name, assert.namelen);
+			strncpy(assert.name, mle->u.res->lockname.name, 
+				assert.namelen);
 		}
 
 		inode = nm_get_group_node_by_index(dlm->group, to);
 		if (!inode) {
 			tmpret = -EINVAL;
-			dlmprintk("could not get nm info for node %d!  need to retry this whole thing\n", to);
+			dlmprintk("could not get nm info for node %d!  "
+				  "need to retry this whole thing\n", to);
 			ret = tmpret;
 			break;
 		}
 
 		dlm_assert_master_to_net(&assert);
-		tmpret = net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &assert, sizeof(assert), inode, NULL);
+		tmpret = net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, 
+					  &assert, sizeof(assert), inode, NULL);
 		iput(inode);
 
 		if (tmpret < 0) {
@@ -916,7 +976,8 @@
 
 
 
-void dlm_mle_node_down(struct inode *group, struct inode *node, int idx, void *data)
+void dlm_mle_node_down(struct inode *group, struct inode *node, 
+		       int idx, void *data)
 {
 	//int ret;
 	//struct inode *node = ptr2;
@@ -953,7 +1014,8 @@
 	spin_unlock(&mle->spinlock);
 }
 
-void dlm_mle_node_up(struct inode *group, struct inode *node, int idx, void *data)
+void dlm_mle_node_up(struct inode *group, struct inode *node, 
+		     int idx, void *data)
 {
 	//struct inode *node = ptr2;
 	dlm_master_list_entry *mle;
@@ -976,7 +1038,8 @@
 
 #if 0	
 	if (test_bit(idx, mle->recovery_map))
-		dlmprintk("BUG!!! node up message on node in recovery (%u)!!!\n", idx);
+		dlmprintk("BUG!!! node up message on node "
+			  "in recovery (%u)!!!\n", idx);
 	else 
 #endif
 	{

Modified: trunk/cluster/dlmmod.c
===================================================================
--- trunk/cluster/dlmmod.c	2005-01-18 00:55:48 UTC (rev 1780)
+++ trunk/cluster/dlmmod.c	2005-01-18 01:13:40 UTC (rev 1781)
@@ -80,37 +80,9 @@
 static spinlock_t dlm_cookie_lock = SPIN_LOCK_UNLOCKED;
 static u64 dlm_next_cookie = 1;
 
-dlm_status dlm_send_remote_convert_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags, int type);
-dlm_status dlm_send_remote_lock_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags);
-int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
-int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
-static dlm_ctxt * __dlm_lookup_domain(char *domain);
-int dlm_send_proxy_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int type, int blocked_type);
 
-void dlm_wait_on_lockres(dlm_lock_resource *res);
-void __dlm_wait_on_lockres(dlm_lock_resource *res);
 
 
-
-/* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
-static inline void dlm_get_next_cookie(u16 node_num, u64 *cookie)
-{
-	/* why did I make node_num 16 bit to begin with? */
-	u64 tmpnode = (u8)(node_num & (u16)0x00ff);
-
-	/* shift single byte of node num into top 8 bits */
-	tmpnode <<= 56;
-
-	spin_lock(&dlm_cookie_lock);
-	*cookie = (dlm_next_cookie | tmpnode);
-	if (++dlm_next_cookie & 0xff00000000000000ull) {
-		dlmprintk0("eek! this node's cookie will now wrap!\n");
-		dlm_next_cookie = 1;
-	}
-	spin_unlock(&dlm_cookie_lock);
-}
-
-
 /* ----------------------------------------------------------------- */
 
 extern spinlock_t dlm_master_lock;
@@ -165,6 +137,26 @@
 	return;
 }				/* dlm_driver_exit */
 
+
+/* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
+static inline void dlm_get_next_cookie(u16 node_num, u64 *cookie)
+{
+	/* why did I make node_num 16 bit to begin with? */
+	u64 tmpnode = (u8)(node_num & (u16)0x00ff);
+
+	/* shift single byte of node num into top 8 bits */
+	tmpnode <<= 56;
+
+	spin_lock(&dlm_cookie_lock);
+	*cookie = (dlm_next_cookie | tmpnode);
+	if (++dlm_next_cookie & 0xff00000000000000ull) {
+		dlmprintk0("eek! this node's cookie will now wrap!\n");
+		dlm_next_cookie = 1;
+	}
+	spin_unlock(&dlm_cookie_lock);
+}
+ 
+
 dlm_status dlmlock(dlm_ctxt *dlm, int mode, dlm_lockstatus *lksb, int flags, char *name, 
 		   dlm_astlockfunc_t *ast, void *data, dlm_bastlockfunc_t *bast)
 {
@@ -192,17 +184,22 @@
 		 convert) ) {
 		goto error_status;
 	}
+	if (convert && (flags & LKM_LOCAL)) {
+		dlmprintk0("strange LOCAL convert request!\n");
+		goto error_status;
+	}
 
-
 	if (convert) {
+		/* CONVERT request */
+
 		/* if converting, must pass in a valid dlm_lock */
 		if (!lksb->lockid || !lksb->lockid->lockres)
 			goto error_status;
 		lock = lksb->lockid;
 	
-		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are essentially
-	 	 * static after the original lock call.  convert requests will check
-	 	 * to ensure that everything is the same and pass DLM_BADARGS if not.
+		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are 
+	 	 * static after the original lock call.  convert requests will 
+		 * ensure that everything is the same, or return DLM_BADARGS.
 	 	 * this means that DLM_DENIED_NOASTS will never be returned.
 	 	 */
 #warning differs from spec here!
@@ -210,28 +207,23 @@
 		if (lock->lksb != lksb || lock->ast != ast ||
 		    lock->bast != bast || lock->astdata != data) {
 			status = DLM_BADARGS;
-			dlmprintk("ERROR new args:  lksb=%p, ast=%p, bast=%p, astdata=%p\n", 
-			       lksb, ast, bast, data);
-			dlmprintk("      orig args: lksb=%p, ast=%p, bast=%p, astdata=%p\n", 
-			       lock->lksb, lock->ast, lock->bast, lock->astdata);
+			dlmprintk("ERROR new args:  lksb=%p, ast=%p, bast=%p, "
+				  "astdata=%p\n", lksb, ast, bast, data);
+			dlmprintk("      orig args: lksb=%p, ast=%p, bast=%p, "
+				  "astdata=%p\n", lock->lksb, lock->ast, 
+				  lock->bast, lock->astdata);
 			goto error_status;
 		}
 		res = lock->lockres;
-
 		down_read(&dlm->recovery_sem);
-		spin_lock(&res->spinlock);
-		if (flags & LKM_LOCAL) {
-			dlmprintk0("strange LOCAL convert request!\n");
-			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
-				spin_unlock(&res->spinlock);
-				status = DLM_BADPARAM;
-				goto up_error;
-			}
-			res->owner = dlm->group_index;
-			dlmprintk0("set owner to this node.  you SURE thats what you want!?\n");
-		}
-		status = do_dlmconvert(dlm, res, lock, flags, mode);
+
+		if (res->owner == dlm->group_index)
+			status = dlmconvert_master(dlm, res, lock, flags, mode);
+		else 
+			status = dlmconvert_remote(dlm, res, lock, flags, mode);
+
 	} else {
+		/* LOCK request */
 		status = DLM_BADARGS;
 		if (!name)
 			goto error;
@@ -243,34 +235,59 @@
 
 		/* take care of all allocs before any locking */
 		status = DLM_SYSERR;
-		buf = kmalloc(q.len+1, GFP_KERNEL);
+		buf = kmalloc(q.len+1, GFP_KERNEL);  /* lockres name */
 		if (!buf)
 			goto error;
 
-		lock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
-		if (!lock)
-			goto error;
-
-		lksb->lockid = lock;
-
 		memcpy(buf, name, q.len);
 		buf[q.len] = 0;
 		q.name = buf;
 		q.hash = full_name_hash(q.name, q.len);
 
+		lock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);  /* dlm_lock */
+		if (!lock)
+			goto error;
+
+		lksb->lockid = lock;
+
 		if (!recovery)		
 			down_read(&dlm->recovery_sem);
+
+		/* find or create the lock resource */
 		res = dlm_get_lock_resource(dlm, &q, flags);
 		if (!res) {
 			status = DLM_IVLOCKID;
 			goto up_error;
 		}
-		status = do_dlmlock(dlm, res, lksb, flags, mode, ast, bast, data);
+
+		dlmprintk("type=%d\n", mode);
+		dlmprintk("creating lock: lock=%p res=%p\n", lock, res);
+
+		memset(lock, 0, sizeof(dlm_lock));
+		INIT_LIST_HEAD(&lock->list);
+		INIT_LIST_HEAD(&lock->ast_list);
+		spin_lock_init(&lock->spinlock);
+		lock->lockres = res;
+		lock->type = mode;
+		lock->convert_type = LKM_IVMODE;
+		lock->highest_blocked = LKM_IVMODE;
+		lock->node = dlm->group_index;
+		lock->ast = ast;
+		lock->bast = bast;
+		lock->astdata = data;
+		lock->lksb = lksb;
+	
+		dlm_get_next_cookie(lock->node, &lock->cookie);
+	
+		if (res->owner == dlm->group_index)
+			status = dlmlock_master(dlm, res, lock, flags);
+		else 
+			status = dlmlock_remote(dlm, res, lock, flags);
+
 		if (status != DLM_NORMAL)
 			goto up_error;
 	}
 
-	/* TODO: lvb */
 	if (!recovery)
 		up_read(&dlm->recovery_sem);
 	return status;
@@ -293,352 +310,13 @@
 }
 EXPORT_SYMBOL(dlmlock);
 
-dlm_status do_dlmlock(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lockstatus *lksb, int flags,
-		      int type, dlm_astlockfunc_t *ast, dlm_bastlockfunc_t *bast, void *data)
-{
-	dlm_lock *tmplock;
 
-	dlmprintk("type=%d\n", type);
-
-	tmplock = lksb->lockid;
-
-	DLM_ASSERT(tmplock);
-
-	memset(tmplock, 0, sizeof(dlm_lock));
-	INIT_LIST_HEAD(&tmplock->list);
-	INIT_LIST_HEAD(&tmplock->ast_list);
-	spin_lock_init(&tmplock->spinlock);
-	tmplock->lockres = res;
-dlmprintk("creating lock: lock=%p res=%p\n", tmplock, res);
-	tmplock->type = type;
-	tmplock->convert_type = LKM_IVMODE;
-	tmplock->highest_blocked = LKM_IVMODE;
-	tmplock->node = dlm->group_index;
-	tmplock->ast = ast;
-	tmplock->bast = bast;
-	tmplock->astdata = data;
-	tmplock->lksb = lksb;
-
-	dlm_get_next_cookie(tmplock->node, &tmplock->cookie);
-
-	if (res->owner == dlm->group_index)
-		return dlmlock_local(dlm, res, tmplock, flags);
-	else 
-		return dlmlock_remote(dlm, res, tmplock, flags);
-}
-
-
-
-
-/* caller must be already holding lockres->spinlock 
- * will return with lockres->spinlock UNLOCKED */
-dlm_status dlmlock_local(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags)
-{
-	struct list_head *iter;
-	dlm_lock *tmplock;
-	int got_it = 0;
-
-	DLM_ASSERT(lock);
-	DLM_ASSERT(res);
-	DLM_ASSERT(dlm);
-	DLM_ASSERT(lock->lksb);
-
-	dlmprintk("type=%d\n", lock->type);
-
-	list_for_each(iter, &res->granted) {
-		tmplock = list_entry(iter, dlm_lock, list);
-		if (!dlm_lock_compatible(tmplock->type, lock->type)) {
-			list_add_tail(&lock->list, &res->blocked);
-			goto done;
-		}
-	}
-
-	list_for_each(iter, &res->converting) {
-		tmplock = list_entry(iter, dlm_lock, list);
-		if (!dlm_lock_compatible(tmplock->type, lock->type)) {
-			list_add_tail(&lock->list, &res->blocked);
-			goto done;
-		}
-	}
-
-	/* got it right away */
-
-	lock->lksb->status = DLM_NORMAL;
-
-	list_add_tail(&lock->list, &res->granted);
-
-	if (dlm_do_ast(dlm, res, lock) < 0)
-		dlmprintk0("eek\n");
-	got_it = 1;
-
-done:
-	spin_unlock(&res->spinlock);
-	dlm_kick_thread(dlm, res);
-	if (!got_it && (flags & LKM_NOQUEUE)) {
-		dlmprintk("did not get NOQUEUE lock %*s at level %d\n", 
-			  res->lockname.len, res->lockname.name, lock->type);
-		return DLM_NOTQUEUED;
-	}
-	return DLM_NORMAL;
-}
-
-/* must be already holding lockres->spinlock */
-dlm_status dlmlock_remote(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags)
-{
-	dlm_status status = DLM_DENIED;
-	
-	dlmprintk("type=%d\n", lock->type);
-
-	if (res->state & DLM_LOCK_RES_RECOVERING) {
-		status = DLM_RECOVERING;
-		goto bail;
-	}
-
-	/* will exit this call with spinlock held */
-	__dlm_wait_on_lockres(res);
-	res->state |= DLM_LOCK_RES_IN_PROGRESS;
-	/* add lock to local (secondary) queue */
-	list_add_tail(&lock->list, &res->blocked);
-	spin_unlock(&res->spinlock);
-
-	/* spec seems to say that you will get DLM_NORMAL when the lock 
-	 * has been queued, meaning we need to wait for a reply here. */
-	status = dlm_send_remote_lock_request(dlm, res, lock, flags);
-	
-	spin_lock(&res->spinlock);
-	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
-	if (status != DLM_NORMAL) {
-		/* remove from local queue if it failed */
-		list_del(&lock->list);
-	}
-bail:
-	spin_unlock(&res->spinlock);
-	wake_up(&res->wq);
-	return status;
-}
-
-
-/* must be already holding lockres->spinlock */
-dlm_status do_dlmconvert(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags, int type)
-{
-	dlm_status status;
-
-	if (res->owner == dlm->group_index)
-		status = dlmconvert_local(dlm, res, lock, flags, type);
-	else 
-		status = dlmconvert_remote(dlm, res, lock, flags, type);
-
-	return status;
-}
-
-static inline const char * dlm_lock_mode_name(int mode)
-{
-	switch (mode) {
-		case LKM_EXMODE:
-			return "EX";
-		case LKM_PRMODE:
-			return "PR";
-		case LKM_NLMODE:
-			return "NL";
-	}
-	return "UNKNOWN";
-}
-
-
-/* must be already holding lockres->spinlock */
-dlm_status dlmconvert_local(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags, int type)
-{
-	dlm_status status = DLM_NORMAL;
-	struct list_head *iter;
-	dlm_lock *tmplock=NULL;
-
-	dlmprintk("type=%d, convert_type=%d, new convert_type=%d\n", lock->type, lock->convert_type, type);
-
-	spin_lock(&lock->spinlock);
-
-	/* already converting? */
-	if (lock->convert_type != LKM_IVMODE) {
-		dlmprintk0("attempted to convert a lock with a lock conversion pending\n");
-		spin_unlock(&lock->spinlock);
-		spin_unlock(&res->spinlock);
-		return DLM_DENIED;
-	}
-
-	/* must be on grant queue to convert */
-	if (!dlm_lock_on_list(&res->granted, lock)) {
-		dlmprintk0("attempted to convert a lock not on grant queue\n");
-		spin_unlock(&lock->spinlock);
-		spin_unlock(&res->spinlock);
-		return DLM_DENIED;
-	}
-
-	if (flags & LKM_VALBLK) {
-		switch (lock->type) {
-			case LKM_EXMODE:
-				/* EX + LKM_VALBLK + convert == set lvb */
-				dlmprintk("will set lvb: converting %s->%s\n",
-					dlm_lock_mode_name(lock->type), dlm_lock_mode_name(type));
-				lock->lksb->flags |= DLM_LKSB_PUT_LVB;
-				break;
-			case LKM_PRMODE:
-			case LKM_NLMODE:
-				/* refetch if new level is not NL */
-				if (type > LKM_NLMODE) {
-					dlmprintk("will fetch new value into lvb: converting %s->%s\n",
-						dlm_lock_mode_name(lock->type), dlm_lock_mode_name(type));
-					lock->lksb->flags |= DLM_LKSB_GET_LVB;
-				} else {
-					dlmprintk("will NOT fetch new value into lvb: converting %s->%s\n",
-						dlm_lock_mode_name(lock->type), dlm_lock_mode_name(type));
-					flags &= ~(LKM_VALBLK);
-				}
-				break;
-		}
-	}
-
-	
-	/* in-place downconvert? */
-	if (type <= lock->type)
-		goto grant;
-
-	/* upconvert from here on */
-	status = DLM_NORMAL;
-	list_for_each(iter, &res->granted) {
-		tmplock = list_entry(iter, dlm_lock, list);
-		if (tmplock == lock)
-			continue;
-		if (!dlm_lock_compatible(tmplock->type, type))
-			goto switch_queues;
-	}
-
-	list_for_each(iter, &res->converting) {
-		tmplock = list_entry(iter, dlm_lock, list);
-		if (!dlm_lock_compatible(tmplock->type, type))
-			goto switch_queues;
-		/* existing conversion requests take precedence */
-		if (!dlm_lock_compatible(tmplock->convert_type, type))
-			goto switch_queues;
-	}
-
-	/* fall thru to grant */
-
-grant:
-	dlmprintk("res %*s, granting %s lock\n", res->lockname.len,
-		  res->lockname.name, dlm_lock_mode_name(type));
-	/* immediately grant the new lock type */
-
-	lock->lksb->status = DLM_NORMAL;
-	if (lock->node == dlm->group_index)
-		dlmprintk0("doing in-place convert for nonlocal lock\n");
-
-
-	lock->type = type;
-	status = DLM_NORMAL;
-
-	if (dlm_do_ast(dlm, res, lock) < 0)
-		dlmprintk0("eek\n");
-
-	spin_unlock(&lock->spinlock);
-	spin_unlock(&res->spinlock);
-
-	/* if successful, kick the queue runner */
-	if (status == DLM_NORMAL) {
-		dlm_kick_thread(dlm, res);
-	}
-
-	return status;
-
-switch_queues:
-	if (flags & LKM_NOQUEUE) {
-		dlmprintk("failed to convert NOQUEUE lock %*s from %d to %d...\n",
-			  res->lockname.len, res->lockname.name, lock->type, type);
-		spin_unlock(&lock->spinlock);
-		spin_unlock(&res->spinlock);
-		return DLM_NOTQUEUED;
-	}
-	dlmprintk("res %*s, queueing...\n", res->lockname.len,
-		  res->lockname.name);
-
-	lock->convert_type = type;
-	list_del(&lock->list);
-	list_add_tail(&lock->list, &res->converting);
-	
-	spin_unlock(&lock->spinlock);
-	spin_unlock(&res->spinlock);
-	
-	dlm_kick_thread(dlm, res);
-	return status;
-}
-
-/* must be already holding lockres->spinlock */
-dlm_status dlmconvert_remote(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags, int type)
-{
-	dlm_status status = DLM_DENIED;
-	
-	dlmprintk("type=%d, convert_type=%d, busy=%d\n", lock->type, lock->convert_type, res->state & DLM_LOCK_RES_IN_PROGRESS);
-	
-	if (res->state & DLM_LOCK_RES_RECOVERING) {
-		status = DLM_RECOVERING;
-		goto bail;
-	}
-	/* will exit this call with spinlock held */
-	__dlm_wait_on_lockres(res);
-
-	res->state |= DLM_LOCK_RES_IN_PROGRESS;
-
-	/* move lock to local convert queue */
-	list_del(&lock->list);
-	list_add_tail(&lock->list, &res->converting);
-	if (lock->convert_type != LKM_IVMODE) {
-		dlmprintk0("error! converting a remote lock that is already converting!\n");
-		/* TODO: return correct error */
-		BUG();
-	}
-	lock->convert_type = type;
-
-	if (flags & LKM_VALBLK) {
-		if (lock->type == LKM_EXMODE) {
-			flags |= LKM_PUT_LVB;
-			lock->lksb->flags |= DLM_LKSB_PUT_LVB;
-		} else {
-			if (lock->convert_type == LKM_NLMODE) {
-				dlmprintk0("erm, no point in specifying LKM_VALBLK if converting to NL\n");
-				flags &= ~LKM_VALBLK;
-			} else {
-				flags |= LKM_GET_LVB;
-				lock->lksb->flags |= DLM_LKSB_GET_LVB;
-			}
-		}
-	}
-	spin_unlock(&res->spinlock);
-
-	/* spec seems to say that you will get DLM_NORMAL when the lock 
-	 * has been queued, meaning we need to wait for a reply here. */
-	status = dlm_send_remote_convert_request(dlm, res, lock, flags, type);
-	
-	spin_lock(&res->spinlock);
-	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
-
-	/* if it failed, move it back to granted queue */
-	if (status != DLM_NORMAL) {
-		list_del(&lock->list);
-		list_add_tail(&lock->list, &res->granted);
-		lock->convert_type = LKM_IVMODE;
-		lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
-	}
-bail:
-	spin_unlock(&res->spinlock);
-	wake_up(&res->wq);
-	return status;
-}
-
-
-
 /* there seems to be no point in doing this async
  * since (even for the remote case) there is really
  * no work to queue up... so just do it and fire the
  * unlockast by hand when done... */
-dlm_status dlmunlock(dlm_ctxt *dlm, dlm_lockstatus *lksb, int flags, dlm_astunlockfunc_t *unlockast, void *data)
+dlm_status dlmunlock(dlm_ctxt *dlm, dlm_lockstatus *lksb, int flags, 
+		     dlm_astunlockfunc_t *unlockast, void *data)
 {
 	dlm_status status;
 	dlm_lock_resource *res;
@@ -666,325 +344,31 @@
 	
 	DLM_ASSERT(lock);
 	DLM_ASSERT(res);
-dlmprintk("lock=%p res=%p\n", lock, res);
+	dlmprintk("lock=%p res=%p\n", lock, res);
 
-	spin_lock(&res->spinlock);
-	spin_lock(&lock->spinlock);
-
-	status = dlmunlock_local(dlm, res, lock, lksb, flags, &call_ast);
-	if (call_ast)
-		(*unlockast)(data, lksb->status);
-	return status;
-}
-EXPORT_SYMBOL(dlmunlock);
-
-
-/* called holding both res->spinlock and lock->spinlock */
-dlm_status dlmunlock_local(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, dlm_lockstatus *lksb, int flags, int *call_ast)
-{
-	dlm_status status;
-	int free_lock = 0, remote_ready = 0;
-	int local = 0, remove = 0, regrant = 0;
-	
-	dlmprintk0("\n");
-
-	/* according to spec and opendlm code
-	 *  flags & LKM_CANCEL != 0: must be converting or blocked
-	 *  flags & LKM_CANCEL == 0: must be granted
-	 * iow, to unlock a converting lock, you must first LKM_CANCEL
-	 * the convert, then call the unlock again with no LKM_CANCEL
-	 */
-	*call_ast = 0;
-
-recheck:
-
-	local = (res->owner == dlm->group_index);
-
-	dlmprintk0("checking flags...\n");
-
-	if (flags & LKM_CANCEL) {
-		dlmprintk0("cancel request\n");
-		/* cancel request */
-		if (dlm_lock_on_list(&res->blocked, lock)) {
-			dlmprintk0("on blocked list\n");
-			/* cancel this outright */
-			lksb->status = DLM_NORMAL;
-			status = DLM_NORMAL;
-			free_lock = 1;
-			*call_ast = 1;
-			remove = 1;
-			regrant = 0;
-		} else if (dlm_lock_on_list(&res->converting, lock)) {
-			dlmprintk0("on converting list\n");
-			/* cancel the request, put back on granted */
-			lksb->status = DLM_NORMAL;
-			status = DLM_NORMAL;
-			free_lock = 0;
-			*call_ast = 1;
-			remove = 1;
-			regrant = 1;
-		} else if (dlm_lock_on_list(&res->granted, lock)) {
-			dlmprintk0("on granted list\n");
-			/* too late, already granted.  DLM_CANCELGRANT */
-			lksb->status = DLM_CANCELGRANT;
-			status = DLM_NORMAL;
-			free_lock = 0;
-			*call_ast = 1;
-			remove = 0;
-			regrant = 0;
-		} else {
-			/* err. um. eek! */
-			dlmprintk0("on NO list!\n");
-			dlmprintk0("lock to cancel is not on any list!  bug!\n");
-			lksb->status = DLM_IVLOCKID;
-			status = DLM_IVLOCKID;
-			free_lock = 0;
-			*call_ast = 0;
-			remove = 0;
-			regrant = 0;
-		}
+	if (res->owner == dlm->group_index) {
+		status = dlmunlock_master(dlm, res, lock, lksb, flags, 
+					  &call_ast);
+		dlmprintk("done calling dlmunlock_master: returned %d, "
+			  "call_ast is %d\n", status, call_ast);
 	} else {
-		dlmprintk0("unlock request\n");
-
-		/* unlock request */
-		if (!dlm_lock_on_list(&res->granted, lock)) {
-			dlmprintk0("not on granted list\n");
-			lksb->status = DLM_DENIED;
-			status = DLM_DENIED;
-			free_lock = 0;
-			*call_ast = 0;
-			remove = 0;
-			regrant = 0;
-		} else {
-			dlmprintk0("on granted list\n");
-			/* unlock granted lock */
-			lksb->status = DLM_NORMAL;
-			status = DLM_NORMAL;
-			free_lock = 1;
-			*call_ast = 1;
-			remove = 1;
-			regrant = 0;
-			
-			/* make the final update to the lvb */
-			if (local && lksb->flags & DLM_LKSB_PUT_LVB)
-				memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
-		}
+		status = dlmunlock_remote(dlm, res, lock, lksb, flags, 
+					  &call_ast);
+		dlmprintk("done calling dlmunlock_remote: returned %d, "
+			  "call_ast is %d\n", status, call_ast);
 	}
 
-	dlmprintk0("checking local/remote\n");
-
-	/* lockres mastered locally or remote? */
-	if (!local) {
-		dlmprintk0("nonlocal\n");
-		/* safe since nothing can change on this 
-		 * secondary queue without lockres lock */
-		spin_unlock(&lock->spinlock);
-		dlmprintk0("unlocked lock spinlock\n");
-
-		/* if there was an outstanding change on the
-		 * lockres, conditions could have changed */
-		if (!remote_ready &&
-		    res->state & DLM_LOCK_RES_IN_PROGRESS) {
-			__dlm_wait_on_lockres(res);
-			res->state |= DLM_LOCK_RES_IN_PROGRESS;
-			remote_ready = 1;
-			dlmprintk0("unlocked lockres spinlock, not ready, rechecking!\n");
-			spin_lock(&lock->spinlock);
-			goto recheck;
-		}
-
-		if (res->state & DLM_LOCK_RES_RECOVERING) {
-			/* !!!!! */
-			dlmprintk0("unlocking lock spinlock\n");
-			spin_unlock(&res->spinlock);
-			dlmprintk0("lockres is recovering!\n");
-			return DLM_RECOVERING;
-		} else {
-			dlmprintk0("unlocking lockres spinlock\n");
-			spin_unlock(&res->spinlock);
-			status = dlm_send_remote_unlock_request(dlm, res, lock, lksb, flags);
-			dlmprintk0("locking lockres spinlock\n");
-			spin_lock(&res->spinlock);
-			res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
-		}
-		dlmprintk0("locking lock spinlock\n");
-		spin_lock(&lock->spinlock);
-		wake_up(&res->wq);
+	if (call_ast) {
+		dlmprintk("calling unlockast(%p, %d)\n",
+			  data, lksb->status);
+		(*unlockast)(data, lksb->status);
 	}
-
-
-	if (remove) {
-		dlmprintk0("removing lock from list\n");
-		list_del(&lock->list);
-	}
-	if (regrant) {
-		dlmprintk0("moving lock to granted list\n");
-		list_add_tail(&lock->list, &res->granted);
-	}
-
-	dlmprintk0("unlocking lock spinlock\n");
-	spin_unlock(&lock->spinlock);
-	dlmprintk0("unlocking lockres spinlock\n");
-	spin_unlock(&res->spinlock);
-	
-	dlmprintk0("done with locks\n");
-
-	if (free_lock) {
-		dlmprintk0("need to free lock\n");
-
-#warning this must change to proper refcounting
-		/* TODO: refcounting... tho for now this will work because 
-		 * the middle layer is keeping track of everything */
-		kfree(lock);
-		lksb->lockid = NULL;
-		dlmprintk0("done freeing\n");
-	}
-
-	/* if cancel or unlock succeeded, lvb work is done */
-	if (status == DLM_NORMAL)
-		lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
-
-	dlmprintk("aha done with everything, returning %d\n", status);
+	dlmprintk("returning status=%d!\n", status);
 	return status;
 }
-	
+EXPORT_SYMBOL(dlmunlock);
 
-dlm_status dlm_send_remote_unlock_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, dlm_lockstatus *lksb, int flags)
-{
-	struct inode *inode = NULL;
-	dlm_unlock_lock unlock;
-	int tmpret;
-	dlm_status ret;
-	int status = 0;
-	struct iovec iov[2];
-	size_t iovlen = 1;
 
-	dlmprintk0("\n");
-
-	memset(&unlock, 0, sizeof(unlock));
-	unlock.node_idx = dlm->group_index;
-	unlock.flags = flags;
-	unlock.cookie = lock->cookie;
-	unlock.namelen = res->lockname.len;
-	strncpy(unlock.name, res->lockname.name, unlock.namelen);
-
-	iov[0].iov_len = sizeof(dlm_unlock_lock);
-	iov[0].iov_base = &unlock;
-
-	if (flags & LKM_PUT_LVB) {
-		/* extra data to send if we are updating lvb */
-		iov[1].iov_len = DLM_LVB_LEN;
-		iov[1].iov_base = lock->lksb->lvb;
-		iovlen++;
-	}
-
-	ret = DLM_NOLOCKMGR;
-	lksb->status = DLM_NOLOCKMGR;
-	inode = nm_get_group_node_by_index(dlm->group, res->owner);
-	if (inode) {
-		dlm_unlock_lock_to_net(&unlock);
-		tmpret = net_send_message_iov(DLM_UNLOCK_LOCK_MSG, dlm->key,
-					      iov, iovlen, inode, &status);
-		if (tmpret >= 0) {
-			// successfully sent and received
-			if (status == DLM_CANCELGRANT)
-				ret = DLM_NORMAL;
-			else
-				ret = status;
-			lksb->status = status;
-		} else {
-			dlmprintk("error occurred in net_send_message: %d\n", tmpret);
-			ret = dlm_err_to_dlm_status(tmpret);
-			lksb->status = ret;
-		}
-		iput(inode);
-	}
-
-	return ret;
-}
-
-int dlm_unlock_lock_handler(net_msg *msg, u32 len, void *data)
-{
-	dlm_ctxt *dlm = data;
-	dlm_unlock_lock *unlock = (dlm_unlock_lock *)msg->buf;
-	dlm_lock_resource *res;
-	struct list_head *iter, *queue;
-	dlm_lock *lock;
-	dlm_status status = DLM_NORMAL;
-	int found = 0;
-	dlm_lockstatus *lksb = NULL;
-	int ignore;
-	struct qstr lockname;
-	u32 flags;
-
-	dlm_unlock_lock_to_host(unlock);
-	lockname.name = unlock->name;
-	lockname.len = unlock->namelen;
-	flags = unlock->flags;
-
-	if (flags & LKM_GET_LVB) {
-		dlmprintk0("bad args!  GET_LVB specified on unlock!\n");
-		return DLM_BADARGS;
-	}
-
-	if ((flags & (LKM_PUT_LVB|LKM_CANCEL)) == 
-	        (LKM_PUT_LVB|LKM_CANCEL)) {
-		dlmprintk0("bad args!  cannot modify lvb on a CANCEL request!\n");
-		return DLM_BADARGS;
-	}
-
-
-	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : "none");
-
-	lockname.hash = full_name_hash(lockname.name, lockname.len);
-
-	status = DLM_IVLOCKID;
-	res = dlm_lookup_lock(dlm, &lockname);
-	if (res) {
-		queue = &res->granted;
-again:
-		found = 0;
-		spin_lock(&res->spinlock);
-		list_for_each(iter, queue) {
-			lock = list_entry(iter, dlm_lock, list);
-			if (lock->cookie == unlock->cookie &&
-			    lock->node == unlock->node_idx) {
-				found = 1;
-				spin_lock(&lock->spinlock);
-				lksb = lock->lksb;
-				/* unlockast only called on originating node */
-				if (flags & LKM_PUT_LVB) {
-					lksb->flags |= DLM_LKSB_PUT_LVB;
-					memcpy(&lksb->lvb[0], &unlock->lvb[0], DLM_LVB_LEN);
-				}
-				status = dlmunlock_local(dlm, res, lock, lksb, flags, &ignore);
-				if (flags & LKM_PUT_LVB)
-					lksb->flags &= ~DLM_LKSB_PUT_LVB;
-				break;
-			}
-		}
-		if (!found)
-			spin_unlock(&res->spinlock);
-		if (queue == &res->granted) {
-			queue = &res->converting;
-			goto again;
-		} else if (queue == &res->converting) {
-			queue = &res->blocked;
-			goto again;
-		}
-	}
-	if (!found)
-		dlmprintk("failed to find lock to unlock!  cookie=%llu\n", unlock->cookie);
-	else
-		status = lksb->status;
-
-	return status;
-}
-
-
-
-
-
 static dlm_ctxt * __dlm_lookup_domain(char *domain)
 {
 	dlm_ctxt *tmp = NULL;
@@ -1298,6 +682,7 @@
 	add_wait_queue(&res->wq, &wait);
 repeat:
 	set_current_state(TASK_UNINTERRUPTIBLE);
+		
 	spin_lock(&res->spinlock);
 	if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
 		spin_unlock(&res->spinlock);
@@ -1326,465 +711,10 @@
 	current->state = TASK_RUNNING;
 }
 
-  
 
-int dlm_do_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock)
-{
-	int ret;
 
-	dlm_astlockfunc_t *fn;
-	dlm_lockstatus *lksb;
 
-	dlmprintk0("\n");
 
-	DLM_ASSERT(lock);
-	DLM_ASSERT(res);
-	DLM_ASSERT(lock->lksb);
-
-	lksb = lock->lksb;
-	fn = lock->ast;
-
-	if (res->owner == dlm->group_index) {
-		/* this node is the lockres master */
-		if (lksb->flags & DLM_LKSB_GET_LVB) {
-			dlmprintk("getting lvb from lockres for %s node\n",
-				  lock->node == dlm->group_index ? "master" :
-				  "remote");
-			memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN);
-		} else if (lksb->flags & DLM_LKSB_PUT_LVB) {
-			dlmprintk("setting lvb from lockres for %s node\n",
-				  lock->node == dlm->group_index ? "master" :
-				  "remote");
-			memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
-		}
-	}
-
-	ret = 0;
-	if (lock->node != dlm->group_index) {
-		/* lock request came from another node
-		 * go do the ast over there */
-		ret = dlm_send_proxy_ast(dlm, res, lock, DLM_AST, 0);
-	} else {
-		DLM_ASSERT(fn);
-		(*fn)(lock->astdata);
-	}
-
-	/* reset any lvb flags on the lksb */
-	lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
-	return ret;
-}
-
-
-int dlm_do_bast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int blocked_type)
-{
-	dlm_bastlockfunc_t *fn = lock->bast;
-	
-	dlmprintk("res %*s\n", res->lockname.len, res->lockname.name);
-
-	if (lock->node != dlm->group_index) {
-		return dlm_send_proxy_ast(dlm, res, lock, DLM_BAST, blocked_type);
-	}
-
-	if (!fn) {
-		dlmprintk("eek! lock has no bast %*s!  cookie=%llu\n", 
-		       res->lockname.len, res->lockname.name, lock->cookie);
-		return -EINVAL;
-	}
-	(*fn)(lock->astdata, blocked_type);
-	return 0;
-}
-
-
-int dlm_send_proxy_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int type, int blocked_type)
-{
-	int ret = 0;
-	dlm_proxy_ast past;
-	struct inode *inode = NULL;
-	struct iovec iov[2];
-	size_t iovlen = 1;
-	
-	dlmprintk("res %*s, to=%u, type=%d, blocked_type=%d\n",res->lockname.len, res->lockname.name, lock->node, type, blocked_type);
-
-	memset(&past, 0, sizeof(dlm_proxy_ast));
-	past.node_idx = dlm->group_index;
-	past.type = type;
-	past.blocked_type = blocked_type;
-	past.namelen = res->lockname.len;
-	strncpy(past.name, res->lockname.name, past.namelen);
-	past.cookie = lock->cookie;
-
-	iov[0].iov_len = sizeof(dlm_proxy_ast);
-	iov[0].iov_base = &past;
-	if (lock->lksb->flags & DLM_LKSB_GET_LVB) {
-		past.flags |= LKM_GET_LVB;
-		iov[1].iov_len = DLM_LVB_LEN;
-		iov[1].iov_base = lock->lksb->lvb;
-		iovlen++;
-	}
-
-	ret = -EINVAL;
-	inode = nm_get_group_node_by_index(dlm->group, lock->node);
-	if (inode) {
-		dlm_proxy_ast_to_net(&past);
-		ret = net_send_message_iov(DLM_PROXY_AST_MSG, dlm->key,
-					   iov, iovlen, inode, NULL);
-		iput(inode);
-	}
-	if (ret < 0) {
-		dlmprintk("(%d) dlm_send_proxy_ast: returning %d\n", current->pid, ret);
-	}
-	return ret;
-}
-
-int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data)
-{
-	int status;
-	dlm_ctxt *dlm = data;
-	dlm_lock_resource *res;
-	dlm_lock *lock = NULL;
-	dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
-	struct qstr lockname;
-	struct list_head *iter, *head=NULL;
-	u64 cookie;
-	u32 flags;
-
-	dlm_proxy_ast_to_host(past);
-	lockname.name = past->name;
-	lockname.len = past->namelen;
-	cookie = past->cookie;
-	flags = past->flags;
-
-	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
-	     (LKM_PUT_LVB|LKM_GET_LVB)) {
-		dlmprintk("both PUT and GET lvb specified\n");
-		return DLM_BADARGS;
-	}
-
-	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : 
-		  (flags & LKM_GET_LVB ? "get lvb" : "none"));
-
-	lockname.hash = full_name_hash(lockname.name, lockname.len);
-	
-	dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
-
-	if (past->type != DLM_AST && 
-	    past->type != DLM_BAST) {
-		dlmprintk("Eeeek unknown ast type! %d, cookie=%llu, name=%*s\n", 
-		       past->type, cookie, lockname.len, lockname.name);
-		return 0;
-	}
-
-	res = dlm_lookup_lock(dlm, &lockname);
-	if (!res) {
-		dlmprintk("eek! got %sast for unknown lockres!  cookie=%llu, name=%*s, namelen=%d\n", 
-		       past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
-		return 0;
-	}
-
-	dlmprintk("lockres %*s\n", res->lockname.len, res->lockname.name);
-
-	if (!dlm_is_recovery_lock(past->name, past->namelen))
-		down_read(&dlm->recovery_sem);
-	spin_lock(&res->spinlock);
-
-	/* try convert queue for both ast/bast */
-	head = &res->converting;
-	lock = NULL;
-	list_for_each(iter, head) {
-		lock = list_entry (iter, dlm_lock, list);
-		if (lock->cookie == cookie)
-			goto do_ast;
-	}
-
-	/* if not on convert, try blocked for ast, granted for bast */
-	if (past->type == DLM_AST)
-		head = &res->blocked;
-	else 
-		head = &res->granted;
-
-	list_for_each(iter, head) {
-		lock = list_entry (iter, dlm_lock, list);
-		if (lock->cookie == cookie)
-			goto do_ast;
-	}
-
-	dlmprintk("eek! got %sast for unknown lock!  cookie=%llu, name=%*s, namelen=%d\n", 
-	       past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
-	spin_unlock(&res->spinlock);
-	if (!dlm_is_recovery_lock(past->name, past->namelen))
-		up_read(&dlm->recovery_sem);
-	return 0;
-		
-do_ast:
-	if (past->type == DLM_AST) {
-		list_del(&lock->list);
-		list_add_tail(&lock->list, &res->granted);
-		dlmprintk("ast: adding to granted list... type=%d, convert_type=%d\n",
-			  lock->type, lock->convert_type);
-		if (lock->convert_type != LKM_IVMODE) {
-			lock->type = lock->convert_type;
-			lock->convert_type = LKM_IVMODE;
-		} else {
-			// should already be there....
-		}
-		
-		lock->lksb->status = DLM_NORMAL;
-
-		/* if we requested the lvb, fetch it into our lksb now */
-		if (flags & LKM_GET_LVB) {
-			DLM_ASSERT(lock->lksb->flags & DLM_LKSB_GET_LVB);
-			memcpy(lock->lksb->lvb, past->lvb, DLM_LVB_LEN);
-		}
-		status = dlm_do_ast(dlm, res, lock);
-		dlmprintk("ast done: now... type=%d, convert_type=%d\n",
-			  lock->type, lock->convert_type);
-	} else {
-		dlmprintk("bast: before... type=%d, convert_type=%d\n",
-			  lock->type, lock->convert_type);
-		status = dlm_do_bast(dlm, res, lock, past->blocked_type);
-		dlmprintk("bast: after... type=%d, convert_type=%d\n",
-			  lock->type, lock->convert_type);
-	}
-
-	if (status < 0)
-		dlmprintk("eeek: ast/bast returned %d\n", status);
-
-	spin_unlock(&res->spinlock);
-	if (!dlm_is_recovery_lock(past->name, past->namelen))
-		up_read(&dlm->recovery_sem);
-	return 0;
-}
-
-
-
-
-
-
-
-/*
- * message handlers should just return status.
- * this will get send back to the calling node if it
- * requested a status return.
- */
-
-
-/* remote lock creation */
-dlm_status dlm_send_remote_lock_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags)
-{
-	struct inode *inode = NULL;
-	dlm_create_lock create;
-	int tmpret, status = 0;
-	dlm_status ret;
-
-	dlmprintk0("\n");
-
-	memset(&create, 0, sizeof(create));
-	create.node_idx = dlm->group_index;
-	create.requested_type = lock->type;
-	create.cookie = lock->cookie;
-	create.namelen = res->lockname.len;
-	create.flags = flags;
-	strncpy(create.name, res->lockname.name, create.namelen);
-
-	ret = DLM_NOLOCKMGR;
-	inode = nm_get_group_node_by_index(dlm->group, res->owner);
-	if (inode) {
-		dlm_create_lock_to_net(&create);
-		tmpret = net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, sizeof(create), inode, &status);
-		if (tmpret >= 0) {
-			// successfully sent and received
-			ret = status;  // this is already a dlm_status
-		} else {
-			dlmprintk("error occurred in net_send_message: %d\n", tmpret);
-			ret = dlm_err_to_dlm_status(tmpret);
-		}
-		iput(inode);
-	}
-
-	return ret;
-}
-
-int dlm_create_lock_handler(net_msg *msg, u32 len, void *data)
-{
-	dlm_ctxt *dlm = data;
-	dlm_create_lock *create = (dlm_create_lock *)msg->buf;
-	dlm_lock_resource *res;
-	dlm_lock *newlock;
-	dlm_lockstatus *lksb;
-	dlm_status status = DLM_NORMAL;
-	struct qstr lockname;
-
-	dlm_create_lock_to_host(create);
-	lockname.name = create->name;
-	lockname.len = create->namelen;
-	
-	dlmprintk0("\n");
-
-	lockname.hash = full_name_hash(lockname.name, lockname.len);
-
-	newlock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
-	if (!newlock)
-		return DLM_SYSERR;
-	
-	lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
-	if (!lksb) {
-		kfree(newlock);
-		return DLM_SYSERR;
-	}
-		
-	memset(newlock, 0, sizeof(dlm_lock));
-	INIT_LIST_HEAD(&newlock->list);
-	INIT_LIST_HEAD(&newlock->ast_list);
-	spin_lock_init(&newlock->spinlock);
-	newlock->type = create->requested_type;
-	newlock->convert_type = LKM_IVMODE;
-	newlock->highest_blocked = LKM_IVMODE;
-	newlock->node = create->node_idx;
-	newlock->ast = NULL;
-	newlock->bast = NULL;
-	newlock->astdata = NULL;
-	newlock->cookie = create->cookie;
-
-	memset(lksb, 0, sizeof(dlm_lockstatus));
-	newlock->lksb = lksb;
-	lksb->lockid = newlock;
-	lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
-
-	status = DLM_IVLOCKID;
-	res = dlm_lookup_lock(dlm, &lockname);
-	if (res) {
-		spin_lock(&res->spinlock);
-		newlock->lockres = res;
-		status = dlmlock_local(dlm, res, newlock, create->flags);
-
-		if (create->flags & LKM_NOQUEUE &&
-		    status == DLM_NOTQUEUED) {
-			dlmprintk("failed to get NOQUEUE lock %*s at level %d...\n",
-				  res->lockname.len, res->lockname.name, newlock->type);
-			/* never added to blocked queue, just delete */
-			kfree(newlock);
-		}
-	}
-
-	return status;
-}
-
-/* remote lock conversion */
-dlm_status dlm_send_remote_convert_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags, int type)
-{
-	struct inode *inode = NULL;
-	dlm_convert_lock convert;
-	int tmpret;
-	dlm_status ret;
-	int status = 0;
-	struct iovec iov[2];
-	size_t iovlen = 1;
-
-	dlmprintk0("\n");
-
-	memset(&convert, 0, sizeof(dlm_convert_lock));
-	convert.node_idx = dlm->group_index;
-	convert.requested_type = type;
-	convert.cookie = lock->cookie;
-	convert.namelen = res->lockname.len;
-	convert.flags = flags;
-	strncpy(convert.name, res->lockname.name, convert.namelen);
-	
-	iov[0].iov_len = sizeof(dlm_convert_lock);
-	iov[0].iov_base = &convert;
-
-	if (flags & LKM_PUT_LVB) {
-		/* extra data to send if we are updating lvb */
-		iov[1].iov_len = DLM_LVB_LEN;
-		iov[1].iov_base = lock->lksb->lvb;
-		iovlen++;
-	}
-
-	ret = DLM_NOLOCKMGR;
-	inode = nm_get_group_node_by_index(dlm->group, res->owner);
-	if (inode) {
-		dlm_convert_lock_to_net(&convert);
-		tmpret = net_send_message_iov(DLM_CONVERT_LOCK_MSG, dlm->key,
-					      iov, iovlen, inode, &status);
-		if (tmpret >= 0) {
-			// successfully sent and received
-			ret = status;  // this is already a dlm_status
-		} else {
-			dlmprintk("error occurred in net_send_message: %d\n", tmpret);
-			ret = dlm_err_to_dlm_status(tmpret);
-		}
-		iput(inode);
-	}
-
-	return ret;
-}
-
-
-
-int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data)
-{
-	dlm_ctxt *dlm = data;
-	dlm_convert_lock *convert = (dlm_convert_lock *)msg->buf;
-	dlm_lock_resource *res;
-	struct list_head *iter;
-	dlm_lock *lock;
-	dlm_lockstatus *lksb;
-	dlm_status status = DLM_NORMAL;
-	int found = 0;
-	struct qstr lockname;
-	u32 flags;
-
-	dlm_convert_lock_to_host(convert);
-	lockname.name = convert->name;
-	lockname.len = convert->namelen;
-	flags = convert->flags;
-
-	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
-	     (LKM_PUT_LVB|LKM_GET_LVB)) {
-		dlmprintk("both PUT and GET lvb specified\n");
-		return DLM_BADARGS;
-	}
-
-	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : 
-		  (flags & LKM_GET_LVB ? "get lvb" : "none"));
-	
-	lockname.hash = full_name_hash(lockname.name, lockname.len);
-
-	status = DLM_IVLOCKID;
-	res = dlm_lookup_lock(dlm, &lockname);
-	if (res) {
-		spin_lock(&res->spinlock);
-		list_for_each(iter, &res->granted) {
-			lock = list_entry(iter, dlm_lock, list);
-			if (lock->cookie == convert->cookie &&
-			    lock->node == convert->node_idx) {
-				found = 1;
-				lksb = lock->lksb;
-				if (flags & LKM_PUT_LVB) {
-					DLM_ASSERT(!(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)));
-					lksb->flags |= DLM_LKSB_PUT_LVB;
-					memcpy(&lksb->lvb[0], &convert->lvb[0], DLM_LVB_LEN);
-				} else if (flags & LKM_GET_LVB) {
-					DLM_ASSERT(!(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)));
-					lksb->flags |= DLM_LKSB_GET_LVB;
-				}
-				status = dlmconvert_local(dlm, res, lock, flags, convert->requested_type);
-				if (status != DLM_NORMAL) {
-					lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
-				}
-				break;
-			}
-		}
-		/* dlmconvert_lock released res->spinlock! */
-		if (!found)
-			spin_unlock(&res->spinlock);
-	}
-	if (!found)
-		dlmprintk("failed to find lock to convert on grant queue!  cookie=%llu\n", convert->cookie);
-
-	return status;
-}
-
 void dlm_dump_everything(void)
 {
 	dlm_ctxt *dlm;

Modified: trunk/cluster/dlmmod.h
===================================================================
--- trunk/cluster/dlmmod.h	2005-01-18 00:55:48 UTC (rev 1780)
+++ trunk/cluster/dlmmod.h	2005-01-18 01:13:40 UTC (rev 1781)
@@ -212,9 +212,13 @@
 typedef struct _dlm_lock_resource
 {
 	struct list_head list;
+	
+	/* please keep these next 3 in this order 
+	 * some funcs want to iterate over all lists */
 	struct list_head granted;
 	struct list_head converting; 
 	struct list_head blocked;
+
 	struct list_head dirty;
 	struct list_head recovering; // dlm_recovery_ctxt.resources list
 	spinlock_t spinlock;
@@ -508,7 +512,6 @@
 int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
 
 int dlm_unlock_lock_handler(net_msg *msg, u32 len, void *data);
-dlm_status dlm_send_remote_unlock_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, dlm_lockstatus *lksb, int flags);
 
 
 
@@ -523,20 +526,34 @@
 dlm_status dlmlock(dlm_ctxt *dlm, int mode, dlm_lockstatus *lksb, int flags, char *name, 
 		   dlm_astlockfunc_t *ast, void *data, dlm_bastlockfunc_t *bast);
 		   
+dlm_status dlmlock_master(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			  dlm_lock *lock, int flags);
+dlm_status dlmlock_remote(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			  dlm_lock *lock, int flags);
 
-dlm_status do_dlmlock(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lockstatus *lksb,
-		      int flags, int type, dlm_astlockfunc_t *ast, 
-		      dlm_bastlockfunc_t *bast, void *data);
-dlm_status dlmlock_local(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags);
-dlm_status dlmlock_remote(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags);
+dlm_status dlmconvert_master(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			     dlm_lock *lock, int flags, int type);
+dlm_status dlmconvert_remote(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			     dlm_lock *lock, int flags, int type);
 
-dlm_status do_dlmconvert(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags, int type);
-dlm_status dlmconvert_local(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags, int type);
-dlm_status dlmconvert_remote(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags, int type);
+dlm_status dlmunlock_common(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			    dlm_lock *lock, dlm_lockstatus *lksb, 
+			    int flags, int *call_ast, int master_node);
+static inline dlm_status dlmunlock_master(dlm_ctxt *dlm, dlm_lock_resource *res,
+			    dlm_lock *lock, dlm_lockstatus *lksb, 
+			    int flags, int *call_ast)
+{
+	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 1);
+}
 
-dlm_status dlmunlock(dlm_ctxt *dlm, dlm_lockstatus *lksb, int flags, dlm_astunlockfunc_t *unlockast, void *data);
-dlm_status dlmunlock_local(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, dlm_lockstatus *lksb, int flags, int *call_ast);
+static inline dlm_status dlmunlock_remote(dlm_ctxt *dlm, dlm_lock_resource *res,
+			    dlm_lock *lock, dlm_lockstatus *lksb, 
+			    int flags, int *call_ast)
+{
+	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 0);
+}
 
+
 dlm_ctxt * dlm_register_domain(char *domain, char *group_name, u32 key);
 void dlm_unregister_domain(dlm_ctxt *dlm);
 dlm_lock_resource * dlm_get_lock_resource(dlm_ctxt *dlm, struct qstr *lockname, int flags);
@@ -575,6 +592,32 @@
 void dlm_dump_everything(void);
 void dlm_dump_dlm(dlm_ctxt *dlm);
 
+dlm_status dlm_send_remote_convert_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags, int type);
+dlm_status dlm_send_remote_lock_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags);
+int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
+int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
+int dlm_send_proxy_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int type, int blocked_type);
+
+void dlm_wait_on_lockres(dlm_lock_resource *res);
+void __dlm_wait_on_lockres(dlm_lock_resource *res);
+
+
+
+
+static inline const char * dlm_lock_mode_name(int mode)
+{
+	switch (mode) {
+		case LKM_EXMODE:
+			return "EX";
+		case LKM_PRMODE:
+			return "PR";
+		case LKM_NLMODE:
+			return "NL";
+	}
+	return "UNKNOWN";
+}
+
+
 static inline int dlm_lock_compatible(int existing, int request)
 {
 	/* NO_LOCK compatible with all */

Added: trunk/cluster/dlmunlock.c
===================================================================
--- trunk/cluster/dlmunlock.c	2005-01-18 00:55:48 UTC (rev 1780)
+++ trunk/cluster/dlmunlock.c	2005-01-18 01:13:40 UTC (rev 1781)
@@ -0,0 +1,404 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * dlmunlock.c
+ *
+ * underlying calls for unlocking locks
+ *
+ * Copyright (C) 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ *
+ * Authors: Kurt Hackel
+ */
+
+#include "warning_hack.h"
+
+#include "dlm_compat.h"
+#include "util.h"
+#include "dlmcommon.h"
+
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+#include <linux/utsname.h>
+#include <linux/init.h>
+#include <linux/sysctl.h>
+#include <linux/random.h>
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
+#include <linux/statfs.h>
+#include <linux/moduleparam.h>
+#endif
+#include <linux/blkdev.h>
+#include <linux/socket.h>
+#include <linux/inet.h>
+#include <linux/spinlock.h>
+
+
+#include "heartbeat.h"
+#include "nodemanager.h"
+#include "tcp.h"
+#include "dlmmod.h"
+
+
+#define DLM_UNLOCK_FREE_LOCK    0x00000001
+#define DLM_UNLOCK_CALL_AST     0x00000002
+#define DLM_UNLOCK_REMOVE_LOCK  0x00000004
+#define DLM_UNLOCK_REGRANT_LOCK 0x00000008
+
+
+static dlm_status dlm_get_cancel_actions(dlm_ctxt *dlm, dlm_lock_resource *res, 
+					 dlm_lock *lock, dlm_lockstatus *lksb, 
+					 int *actions);
+static dlm_status dlm_get_unlock_actions(dlm_ctxt *dlm, dlm_lock_resource *res,
+					 dlm_lock *lock, dlm_lockstatus *lksb, 
+					 int *actions);
+
+static dlm_status dlm_send_remote_unlock_request(dlm_ctxt *dlm, 
+						 dlm_lock_resource *res, 
+						 dlm_lock *lock, 
+						 dlm_lockstatus *lksb, 
+						 int flags);
+
+
+
+/* 
+ * locking:
+ *   caller needs:  none
+ *   taken:         res->spinlock and lock->spinlock taken and dropped
+ *   held on exit:  none
+ * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
+ */
+dlm_status dlmunlock_common(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			    dlm_lock *lock, dlm_lockstatus *lksb, 
+			    int flags, int *call_ast, int master_node)
+{
+	dlm_status status;
+	int actions = 0;
+	
+	dlmprintk0("\n");
+
+	if (master_node)
+		DLM_ASSERT(res->owner == dlm->group_index);
+	else
+		DLM_ASSERT(res->owner != dlm->group_index);
+
+	spin_lock(&res->spinlock);
+	if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
+		if (!master_node) {
+		/* TODO: should we return -EAGAIN or something here? */
+			dlmprintk0("lockres in progress! eek!\n");
+		}
+#warning THIS CAN SLEEP!!!
+		__dlm_wait_on_lockres(res);
+		res->state |= DLM_LOCK_RES_IN_PROGRESS;
+	}
+	spin_lock(&lock->spinlock);
+
+	if (res->state & DLM_LOCK_RES_RECOVERING) {
+		/* !!!!! */
+		status = DLM_RECOVERING;
+		goto leave;
+	}
+	
+	/* according to spec and opendlm code
+	 *  flags & LKM_CANCEL != 0: must be converting or blocked
+	 *  flags & LKM_CANCEL == 0: must be granted
+	 * iow, to unlock a converting lock, you must first LKM_CANCEL
+	 * the convert, then call the unlock again with no LKM_CANCEL
+	 */
+
+	if (flags & LKM_CANCEL) {
+		status = dlm_get_cancel_actions(dlm, res, lock, lksb, &actions);
+	} else {
+		status = dlm_get_unlock_actions(dlm, res, lock, lksb, &actions);
+		if (master_node && status == DLM_NORMAL) {
+			/* make the final update to the lvb */
+			if (lksb->flags & DLM_LKSB_PUT_LVB)
+				memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
+		}
+	}
+
+	if (status != DLM_NORMAL)
+		goto leave;
+
+	if (!master_node) {
+		/* drop locks and send message */
+		spin_unlock(&lock->spinlock);
+		spin_unlock(&res->spinlock);
+		status = dlm_send_remote_unlock_request(dlm, res, lock, 
+							lksb, flags);
+		spin_lock(&res->spinlock);
+		spin_lock(&lock->spinlock);
+	}
+	
+	if (actions & DLM_UNLOCK_REMOVE_LOCK)
+		list_del(&lock->list);
+	if (actions & DLM_UNLOCK_REGRANT_LOCK)
+		list_add_tail(&lock->list, &res->granted);
+
+
+leave:
+	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
+	spin_unlock(&lock->spinlock);
+	spin_unlock(&res->spinlock);
+	wake_up(&res->wq);
+	
+
+	if (actions & DLM_UNLOCK_FREE_LOCK) {
+#warning this must change to proper refcounting
+		/* TODO: refcounting... tho for now this will work because 
+		 * the middle layer is keeping track of everything */
+		kfree(lock);
+		lksb->lockid = NULL;
+	}
+	if (actions & DLM_UNLOCK_CALL_AST)
+		*call_ast = 1;
+
+	/* if cancel or unlock succeeded, lvb work is done */
+	if (status == DLM_NORMAL)
+		lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
+
+	return status;
+}
+
+
+	
+
+	
+/* 
+ * locking:
+ *   caller needs:  none
+ *   taken:         none
+ *   held on exit:  none
+ * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
+ */
+static dlm_status dlm_send_remote_unlock_request(dlm_ctxt *dlm, 
+						 dlm_lock_resource *res, 
+						 dlm_lock *lock, 
+						 dlm_lockstatus *lksb, 
+						 int flags)
+{
+	struct inode *inode = NULL;
+	dlm_unlock_lock unlock;
+	int tmpret;
+	dlm_status ret;
+	int status = 0;
+	struct iovec iov[2];
+	size_t iovlen = 1;
+
+
+	dlmprintk0("\n");
+
+	memset(&unlock, 0, sizeof(unlock));
+	unlock.node_idx = dlm->group_index;
+	unlock.flags = flags;
+	unlock.cookie = lock->cookie;
+	unlock.namelen = res->lockname.len;
+	strncpy(unlock.name, res->lockname.name, unlock.namelen);
+
+	iov[0].iov_len = sizeof(dlm_unlock_lock);
+	iov[0].iov_base = &unlock;
+
+	if (flags & LKM_PUT_LVB) {
+		/* extra data to send if we are updating lvb */
+		iov[1].iov_len = DLM_LVB_LEN;
+		iov[1].iov_base = lock->lksb->lvb;
+		iovlen++;
+	}
+
+	ret = DLM_NOLOCKMGR;
+	lksb->status = DLM_NOLOCKMGR;
+	inode = nm_get_group_node_by_index(dlm->group, res->owner);
+	if (inode) {
+		dlm_unlock_lock_to_net(&unlock);
+		tmpret = net_send_message_iov(DLM_UNLOCK_LOCK_MSG, dlm->key, 
+					      iov, iovlen, inode, &status);
+		if (tmpret >= 0) {
+			// successfully sent and received
+			if (status == DLM_CANCELGRANT)
+				ret = DLM_NORMAL;
+			else
+				ret = status;
+			lksb->status = status;
+		} else {
+			dlmprintk("error occurred in net_send_message: %d\n",
+				  tmpret);
+			ret = dlm_err_to_dlm_status(tmpret);
+			lksb->status = ret;
+		}
+		iput(inode);
+	}
+
+	return ret;
+}
+
+/* 
+ * locking:
+ *   caller needs:  none
+ *   taken:         takes and drops res->spinlock
+ *   held on exit:  none
+ * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID, 
+ *          return value from dlmunlock_master
+ */
+int dlm_unlock_lock_handler(net_msg *msg, u32 len, void *data)
+{
+	dlm_ctxt *dlm = data;
+	dlm_unlock_lock *unlock = (dlm_unlock_lock *)msg->buf;
+	dlm_lock_resource *res;
+	struct list_head *iter;
+	dlm_lock *lock = NULL;
+	dlm_status status = DLM_NORMAL;
+	int found = 0, i;
+	dlm_lockstatus *lksb = NULL;
+	int ignore;
+	struct qstr lockname;
+	u32 flags;
+	struct list_head *queue;
+ 
+	dlm_unlock_lock_to_host(unlock);
+	lockname.name = unlock->name;
+	lockname.len = unlock->namelen;
+	flags = unlock->flags;
+
+	if (flags & LKM_GET_LVB) {
+		dlmprintk0("bad args!  GET_LVB specified on unlock!\n");
+		return DLM_BADARGS;
+	}
+
+	if ((flags & (LKM_PUT_LVB|LKM_CANCEL)) == (LKM_PUT_LVB|LKM_CANCEL)) {
+		dlmprintk0("bad args!  cannot modify lvb on a CANCEL "
+			   "request!\n");
+		return DLM_BADARGS;
+	}
+
+	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : "none");
+
+	lockname.hash = full_name_hash(lockname.name, lockname.len);
+
+	status = DLM_IVLOCKID;
+	res = dlm_lookup_lock(dlm, &lockname);
+	if (!res)
+		goto not_found;
+
+	queue=&res->granted;
+	found = 0;
+	spin_lock(&res->spinlock);
+	for (i=0; i<3; i++) {
+		list_for_each(iter, queue) {
+			lock = list_entry(iter, dlm_lock, list);
+			if (lock->cookie == unlock->cookie &&
+		    	    lock->node == unlock->node_idx) {
+				found = 1;
+				break;
+			}
+		}
+		if (found)
+			break;
+		/* scan granted -> converting -> blocked queues */
+		queue++;
+	}
+	spin_unlock(&res->spinlock);
+	if (!found)
+		goto not_found;
+	
+	/* lock was found on queue */
+	lksb = lock->lksb;
+	/* unlockast only called on originating node */
+	if (flags & LKM_PUT_LVB) {
+		lksb->flags |= DLM_LKSB_PUT_LVB;
+		memcpy(&lksb->lvb[0], &unlock->lvb[0], DLM_LVB_LEN);
+	}
+#warning BUG! THIS CAN SLEEP!!!  
+	/* so either we should respond with EAGAIN in dlmunlock_master 
+	 * and skip the __dlm_wait_on_lockres, or this message type 
+	 * should be dispatched */
+	status = dlmunlock_master(dlm, res, lock, lksb, flags, &ignore);
+	if (flags & LKM_PUT_LVB)
+		lksb->flags &= ~DLM_LKSB_PUT_LVB;
+
+not_found:
+	if (!found)
+		dlmprintk("failed to find lock to unlock!  cookie=%llu\n", 
+			  unlock->cookie);
+	else {
+		/* send the lksb->status back to the other node */
+		status = lksb->status;
+	}
+
+	return status;
+}
+
+
+static dlm_status dlm_get_cancel_actions(dlm_ctxt *dlm, dlm_lock_resource *res, 
+					 dlm_lock *lock, dlm_lockstatus *lksb, 
+					 int *actions)
+{
+	dlm_status status;
+
+	if (dlm_lock_on_list(&res->blocked, lock)) {
+		/* cancel this outright */
+		lksb->status = DLM_NORMAL;
+		status = DLM_NORMAL;
+		*actions = (DLM_UNLOCK_FREE_LOCK |
+			    DLM_UNLOCK_CALL_AST |
+			    DLM_UNLOCK_REMOVE_LOCK);
+	} else if (dlm_lock_on_list(&res->converting, lock)) {
+		/* cancel the request, put back on granted */
+		lksb->status = DLM_NORMAL;
+		status = DLM_NORMAL;
+		*actions = (DLM_UNLOCK_CALL_AST |
+			    DLM_UNLOCK_REMOVE_LOCK |
+			    DLM_UNLOCK_REGRANT_LOCK);
+	} else if (dlm_lock_on_list(&res->granted, lock)) {
+		/* too late, already granted.  DLM_CANCELGRANT */
+		lksb->status = DLM_CANCELGRANT;
+		status = DLM_NORMAL;
+		*actions = DLM_UNLOCK_CALL_AST;
+	} else {
+		/* err. um. eek! */
+		dlmprintk0("lock to cancel is not on any list! bug!\n");
+		lksb->status = DLM_IVLOCKID;
+		status = DLM_IVLOCKID;
+		*actions = 0;
+	}
+	return status;
+}
+
+static dlm_status dlm_get_unlock_actions(dlm_ctxt *dlm, dlm_lock_resource *res,
+					 dlm_lock *lock, dlm_lockstatus *lksb, 
+					 int *actions)
+{
+	dlm_status status;
+
+	/* unlock request */
+	if (!dlm_lock_on_list(&res->granted, lock)) {
+		lksb->status = DLM_DENIED;
+		status = DLM_DENIED;
+		*actions = 0;
+	} else {
+		/* unlock granted lock */
+		lksb->status = DLM_NORMAL;
+		status = DLM_NORMAL;
+		*actions = (DLM_UNLOCK_FREE_LOCK |
+			    DLM_UNLOCK_CALL_AST |
+			    DLM_UNLOCK_REMOVE_LOCK);
+	}
+	return status;
+}
+



More information about the Ocfs2-commits mailing list