[Ocfs2-commits] mfasheh commits r2038 - in trunk/fs/ocfs2: . dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Tue Mar 22 19:21:07 CST 2005


Author: mfasheh
Signed-off-by: khackel
Signed-off-by: jlbec
Date: 2005-03-22 19:21:05 -0600 (Tue, 22 Mar 2005)
New Revision: 2038

Added:
   trunk/fs/ocfs2/dlm/dlmapi.h
   trunk/fs/ocfs2/dlm/dlmdebug.c
   trunk/fs/ocfs2/dlm/dlmdebug.h
   trunk/fs/ocfs2/dlm/dlmdomain.c
   trunk/fs/ocfs2/dlm/dlmdomain.h
Modified:
   trunk/fs/ocfs2/dlm/Makefile
   trunk/fs/ocfs2/dlm/dlmast.c
   trunk/fs/ocfs2/dlm/dlmcommon.h
   trunk/fs/ocfs2/dlm/dlmconvert.c
   trunk/fs/ocfs2/dlm/dlmfs.c
   trunk/fs/ocfs2/dlm/dlmfs_compat.c
   trunk/fs/ocfs2/dlm/dlmlock.c
   trunk/fs/ocfs2/dlm/dlmmaster.c
   trunk/fs/ocfs2/dlm/dlmrecovery.c
   trunk/fs/ocfs2/dlm/dlmthread.c
   trunk/fs/ocfs2/dlm/dlmunlock.c
   trunk/fs/ocfs2/dlm/userdlm.c
   trunk/fs/ocfs2/dlmglue.c
   trunk/fs/ocfs2/heartbeat.c
   trunk/fs/ocfs2/ocfs.h
   trunk/fs/ocfs2/super.c
   trunk/fs/ocfs2/vote.c
Log:
* cleanup some of the layout of stuff in dlm/

Signed-off-by: khackel
Signed-off-by: jlbec



Modified: trunk/fs/ocfs2/dlm/Makefile
===================================================================
--- trunk/fs/ocfs2/dlm/Makefile	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/Makefile	2005-03-23 01:21:05 UTC (rev 2038)
@@ -31,7 +31,7 @@
 
 obj-m := ocfs2_dlm.o ocfs2_dlmfs.o
 
-ocfs2_dlm-objs := dlmmod.o dlmthread.o dlmrecovery.o \
+ocfs2_dlm-objs := dlmdomain.o dlmdebug.o dlmthread.o dlmrecovery.o \
 	dlmmaster.o dlmast.o dlmconvert.o dlmlock.o dlmunlock.o
 
 ocfs2_dlmfs-objs := userdlm.o dlmfs.o $(DLMFS_COMPAT)
@@ -43,8 +43,9 @@
 #
 
 SOURCES =			\
+	dlmdebug.c		\
+	dlmdomain.c		\
 	dlmmaster.c		\
-	dlmmod.c		\
 	dlmast.c		\
 	dlmconvert.c		\
 	dlmlock.c		\
@@ -56,8 +57,10 @@
 	dlmfs_compat.c
 
 HEADERS = 			\
+	dlmdebug.h		\
+	dlmdomain.h		\
 	dlmcommon.h		\
-	dlmmod.h		\
+	dlmapi.h		\
 	userdlm.h		\
 	dlmfs_compat.h
 

Added: trunk/fs/ocfs2/dlm/dlmapi.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmapi.h	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmapi.h	2005-03-23 01:21:05 UTC (rev 2038)
@@ -0,0 +1,188 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * dlmapi.h
+ *
+ * externally exported dlm interfaces
+ *
+ * Copyright (C) 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ *
+ */
+
+#ifndef DLMAPI_H
+#define DLMAPI_H
+
+struct _dlm_lock;
+typedef struct _dlm_lock dlm_lock;
+
+struct _dlm_ctxt;
+typedef struct _dlm_ctxt dlm_ctxt;
+
+typedef enum _dlm_status {
+	DLM_NORMAL = 0,           /*  0: request in progress */
+	DLM_GRANTED,              /*  1: request granted */
+	DLM_DENIED,               /*  2: request denied */
+	DLM_DENIED_NOLOCKS,       /*  3: request denied, out of system resources */
+	DLM_WORKING,              /*  4: async request in progress */
+	DLM_BLOCKED,              /*  5: lock request blocked */
+	DLM_BLOCKED_ORPHAN,       /*  6: lock request blocked by a orphan lock*/
+	DLM_DENIED_GRACE_PERIOD,  /*  7: topological change in progress */
+	DLM_SYSERR,               /*  8: system error */
+	DLM_NOSUPPORT,            /*  9: unsupported */
+	DLM_CANCELGRANT,          /* 10: can't cancel convert: already granted */
+	DLM_IVLOCKID,             /* 11: bad lockid */
+	DLM_SYNC,                 /* 12: synchronous request granted */
+	DLM_BADTYPE,              /* 13: bad resource type */
+	DLM_BADRESOURCE,          /* 14: bad resource handle */
+	DLM_MAXHANDLES,           /* 15: no more resource handles */
+	DLM_NOCLINFO,             /* 16: can't contact cluster manager */
+	DLM_NOLOCKMGR,            /* 17: can't contact lock manager */
+	DLM_NOPURGED,             /* 18: can't contact purge daemon */
+	DLM_BADARGS,              /* 19: bad api args */
+	DLM_VOID,                 /* 20: no status */
+	DLM_NOTQUEUED,            /* 21: NOQUEUE was specified and request failed */
+	DLM_IVBUFLEN,             /* 22: invalid resource name length */
+	DLM_CVTUNGRANT,           /* 23: attempted to convert ungranted lock */
+	DLM_BADPARAM,             /* 24: invalid lock mode specified */
+	DLM_VALNOTVALID,          /* 25: value block has been invalidated */
+	DLM_REJECTED,             /* 26: request rejected, unrecognized client */
+	DLM_ABORT,                /* 27: blocked lock request cancelled */
+	DLM_CANCEL,               /* 28: conversion request cancelled */
+	DLM_IVRESHANDLE,          /* 29: invalid resource handle */
+	DLM_DEADLOCK,             /* 30: deadlock recovery refused this request */
+	DLM_DENIED_NOASTS,        /* 31: failed to allocate AST */
+	DLM_FORWARD,              /* 32: request must wait for primary's response */
+	DLM_TIMEOUT,              /* 33: timeout value for lock has expired */
+	DLM_IVGROUPID,            /* 34: invalid group specification */
+	DLM_VERS_CONFLICT,        /* 35: version conflicts prevent request handling */
+	DLM_BAD_DEVICE_PATH,      /* 36: Locks device does not exist or path wrong */
+	DLM_NO_DEVICE_PERMISSION, /* 37: Client has insufficient pers for device */
+	DLM_NO_CONTROL_DEVICE,    /* 38: Cannot set options on opened device */
+	DLM_MAXSTATS,             /* 39: upper limit for return code validation */
+	
+	DLM_RECOVERING,           /* 40: our lame addition to allow caller to fail a lock 
+				     request if it is being recovered */
+	DLM_MIGRATING,            /* 40: our lame addition to allow caller to fail a lock 
+				     request if it is being migrated */
+} dlm_status;
+
+#define DLM_LKSB_KERNEL_ALLOCATED  0x01  // allocated on master node on behalf of remote node
+#define DLM_LKSB_PUT_LVB           0x02
+#define DLM_LKSB_GET_LVB           0x04
+#define DLM_LKSB_UNUSED2           0x08
+#define DLM_LKSB_UNUSED3           0x10
+#define DLM_LKSB_UNUSED4           0x20
+#define DLM_LKSB_UNUSED5           0x40
+#define DLM_LKSB_UNUSED6           0x80
+
+#define DLM_LVB_LEN  64
+
+/* Callers are only allowed access to the lvb and status members of
+ * this struct. */
+struct _dlm_lockstatus {
+	dlm_status status;
+	u32 flags;           
+	dlm_lock *lockid;
+	char lvb[DLM_LVB_LEN];
+};
+
+typedef struct _dlm_lockstatus dlm_lockstatus;
+
+/* Valid lock modes. */
+#define LKM_IVMODE      (-1)            /* invalid mode */
+#define LKM_NLMODE      0               /* null lock */
+#define LKM_CRMODE      1               /* concurrent read */    /* unsupported */
+#define LKM_CWMODE      2               /* concurrent write */    /* unsupported */
+#define LKM_PRMODE      3               /* protected read */
+#define LKM_PWMODE      4               /* protected write */    /* unsupported */
+#define LKM_EXMODE      5               /* exclusive */
+#define LKM_MAXMODE     5
+#define LKM_MODEMASK    0xff
+
+/* Flags passed to dlmlock and dlmunlock:
+ * reserved: flags used by the "real" dlm
+ * only a few are supported by this dlm 
+ * (U) = unsupported by ocfs2 dlm */
+#define LKM_ORPHAN       0x00000010  /* this lock is orphanable (U) */
+#define LKM_PARENTABLE   0x00000020  /* this lock was orphaned (U) */
+#define LKM_BLOCK        0x00000040  /* blocking lock request (U) */
+#define LKM_LOCAL        0x00000080  /* local lock request */    
+#define LKM_VALBLK       0x00000100  /* lock value block request */
+#define LKM_NOQUEUE      0x00000200  /* non blocking request */
+#define LKM_CONVERT      0x00000400  /* conversion request */
+#define LKM_NODLCKWT     0x00000800  /* this lock wont deadlock (U) */
+#define LKM_UNLOCK       0x00001000  /* deallocate this lock */
+#define LKM_CANCEL       0x00002000  /* cancel conversion request */
+#define LKM_DEQALL       0x00004000  /* remove all locks held by proc (U) */
+#define LKM_INVVALBLK    0x00008000  /* invalidate lock value block */
+#define LKM_SYNCSTS      0x00010000  /* return synchronous status if poss (U) */
+#define LKM_TIMEOUT      0x00020000  /* lock request contains timeout (U) */
+#define LKM_SNGLDLCK     0x00040000  /* request can self-deadlock (U) */
+#define LKM_FINDLOCAL    0x00080000  /* find local lock request (U) */
+#define LKM_PROC_OWNED   0x00100000  /* owned by process, not group (U) */
+#define LKM_XID          0x00200000  /* use transaction id for deadlock (U) */
+#define LKM_XID_CONFLICT 0x00400000  /* do not allow lock inheritance (U) */
+#define LKM_FORCE        0x00800000  /* force unlock flag */
+#define LKM_REVVALBLK    0x01000000  /* temporary solution: re-validate 
+					lock value block (U) */
+/* unused */
+#define LKM_UNUSED1      0x00000001  /* unused */
+#define LKM_UNUSED2      0x00000002  /* unused */
+#define LKM_UNUSED3      0x00000004  /* unused */
+#define LKM_UNUSED4      0x00000008  /* unused */
+#define LKM_UNUSED5      0x02000000  /* unused */
+#define LKM_UNUSED6      0x04000000  /* unused */
+#define LKM_UNUSED7      0x08000000  /* unused */
+
+/* ocfs2 extensions: internal only
+ * should never be used by caller */
+#define LKM_MIGRATION    0x10000000  /* extension: lockres is to be migrated
+					to another node */
+#define LKM_PUT_LVB      0x20000000  /* extension: lvb is being passed
+					should be applied to lockres */
+#define LKM_GET_LVB      0x40000000  /* extension: lvb should be copied 
+					from lockres when lock is granted */
+#define LKM_RECOVERY     0x80000000  /* extension: flag for recovery lock
+					used to avoid recovery rwsem */
+
+
+typedef void (dlm_astlockfunc_t)(void *);
+typedef void (dlm_bastlockfunc_t)(void *, int);
+typedef void (dlm_astunlockfunc_t)(void *, dlm_status);
+
+dlm_status dlmlock(dlm_ctxt *dlm,
+		   int mode,
+		   dlm_lockstatus *lksb,
+		   int flags,
+		   const char *name,
+		   dlm_astlockfunc_t *ast,
+		   void *data,
+		   dlm_bastlockfunc_t *bast);
+
+dlm_status dlmunlock(dlm_ctxt *dlm,
+		     dlm_lockstatus *lksb,
+		     int flags,
+		     dlm_astunlockfunc_t *unlockast,
+		     void *data);
+
+dlm_ctxt * dlm_register_domain(const char *domain,
+			       u32 key);
+
+void dlm_unregister_domain(dlm_ctxt *dlm);
+
+#endif /* DLMAPI_H */

Modified: trunk/fs/ocfs2/dlm/dlmast.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmast.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmast.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -44,8 +44,8 @@
 #include "cluster/nodemanager.h"
 #include "cluster/tcp.h"
 
+#include "dlmapi.h"
 #include "dlmcommon.h"
-#include "dlmmod.h"
 
 
 static void dlm_update_lvb(dlm_ctxt *dlm, dlm_lock_resource *res,

Modified: trunk/fs/ocfs2/dlm/dlmcommon.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmcommon.h	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmcommon.h	2005-03-23 01:21:05 UTC (rev 2038)
@@ -3,8 +3,6 @@
  *
  * dlmcommon.h
  *
- * Common stuff
- *
  * Copyright (C) 2004 Oracle.  All rights reserved.
  *
  * This program is free software; you can redistribute it and/or
@@ -24,11 +22,1038 @@
  *
  */
 
-#ifndef CLUSTER_DLMCOMMON_H
-#define CLUSTER_DLMCOMMON_H
+#ifndef DLMCOMMON_H
+#define DLMCOMMON_H
 
+#include <linux/kref.h>
+
+#ifndef ENABLE_DLMPRINTK
+#define dlmprintk(x, arg...)
+#define dlmprintk0(x)
+#else
+#define dlmprintk(x, arg...)    printk("(dlm:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__, ##arg)
+#define dlmprintk0(x)           printk("(dlm:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__)
+#endif
+
+#define dlmerror(x, arg...)    printk(KERN_ERR "(dlm:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__, ##arg)
+#define dlmerror0(x)           printk(KERN_ERR "(dlm:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__)
+
 #define DLM_ASSERT(x)       ({  if (!(x)) { printk("assert failed! %s:%d\n", __FILE__, __LINE__); BUG(); } })
 
-typedef struct _dlm_ctxt dlm_ctxt;
 
-#endif /* CLUSTER_DLMCOMMON_H */
+#define DLM_HB_NODE_DOWN_PRI     (0xf000000)
+#define DLM_HB_NODE_UP_PRI       (0x8000000)  
+
+#define DLM_LOCKID_NAME_MAX    32
+
+#define DLM_DOMAIN_NAME_MAX_LEN    255
+#define DLM_LOCK_RES_OWNER_UNKNOWN     NM_MAX_NODES
+#define DLM_THREAD_SHUFFLE_INTERVAL    5     // flush everything every 5 passes
+#define DLM_THREAD_MS                  200   // flush at least every 200 ms
+
+#define DLM_HASH_BITS     7
+#define DLM_HASH_SIZE     (1 << DLM_HASH_BITS)
+#define DLM_HASH_MASK     (DLM_HASH_SIZE - 1)
+
+typedef enum _dlm_ast_type {
+	DLM_AST = 0,
+	DLM_BAST,
+	DLM_ASTUNLOCK
+} dlm_ast_type;
+
+
+#define LKM_VALID_FLAGS (LKM_VALBLK | LKM_CONVERT | LKM_UNLOCK | \
+			 LKM_CANCEL | LKM_INVVALBLK | LKM_FORCE | \
+			 LKM_RECOVERY | LKM_LOCAL | LKM_NOQUEUE)
+
+#define DLM_RECOVERY_LOCK_NAME       "$RECOVERY"
+#define DLM_RECOVERY_LOCK_NAME_LEN   9
+
+static inline int dlm_is_recovery_lock(const char *lock_name, int name_len)
+{
+	if (name_len == DLM_RECOVERY_LOCK_NAME_LEN &&
+	    strncmp(lock_name, DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN)==0)
+		return 1;
+	return 0;
+}
+
+
+typedef struct _dlm_recovery_ctxt
+{
+	struct list_head resources;
+	struct list_head received;   // list of dlm_reco_lock_infos received from other nodes during recovery
+	struct list_head node_data;
+	u8  new_master;
+	u8  dead_node;
+	unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
+} dlm_recovery_ctxt;
+
+typedef enum _dlm_ctxt_state {
+	DLM_CTXT_NEW = 0,
+	DLM_CTXT_JOINED,
+	DLM_CTXT_IN_SHUTDOWN,
+	DLM_CTXT_LEAVING,
+} dlm_ctxt_state;
+
+struct _dlm_ctxt
+{
+	struct list_head list;
+	struct list_head *resources;
+	struct list_head dirty_list;
+	struct list_head purge_list;
+	struct list_head pending_asts;
+	struct list_head pending_basts;
+	unsigned int purge_count;
+	spinlock_t spinlock;
+	struct rw_semaphore recovery_sem;
+	char *name;
+	u8 node_num;
+	u32 key;
+	u8  joining_node;
+	wait_queue_head_t dlm_join_events;
+	unsigned long live_nodes_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	unsigned long domain_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	unsigned long recovery_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	dlm_recovery_ctxt reco;
+	spinlock_t master_lock;
+	struct list_head master_list;
+	struct list_head mle_hb_events;
+
+	/* these give a really vague idea of the system load */
+	atomic_t local_resources;
+	atomic_t remote_resources;
+	atomic_t unknown_resources;
+
+	/* NOTE: Next three are protected by dlm_domain_lock */
+	struct kref dlm_refs;
+	dlm_ctxt_state dlm_state;
+	unsigned int num_joins;
+
+	struct hb_callback_func dlm_hb_up;
+	struct hb_callback_func dlm_hb_down;
+	struct task_struct *dlm_thread_task;
+	struct task_struct *dlm_reco_thread_task;
+	wait_queue_head_t dlm_thread_wq;
+	wait_queue_head_t dlm_reco_thread_wq;
+
+	struct work_struct dispatched_work;
+	struct list_head work_list;
+	spinlock_t work_lock;
+	struct list_head	dlm_domain_handlers;
+};
+
+/* these keventd work queue items are for less-frequently 
+ * called functions that cannot be directly called from the
+ * net message handlers for some reason, usually because
+ * they need to send net messages of their own. */
+void dlm_dispatch_work(void *data);
+
+typedef struct _dlm_lock_resource dlm_lock_resource;
+typedef struct _dlm_work_item dlm_work_item;
+
+typedef void (dlm_workfunc_t)(dlm_work_item *, void *);
+
+typedef struct _dlm_request_all_locks_priv
+{
+	u8 reco_master;
+	u8 dead_node;
+} dlm_request_all_locks_priv;
+
+typedef struct _dlm_mig_lockres_priv
+{
+	dlm_lock_resource *lockres;
+	u8 real_master;
+} dlm_mig_lockres_priv;
+
+struct _dlm_work_item 
+{
+	struct list_head list;
+	dlm_workfunc_t *func;
+	dlm_ctxt *dlm;
+	void *data;
+	union {
+		dlm_request_all_locks_priv ral;
+		dlm_mig_lockres_priv ml;
+	} u;
+};
+
+static inline void dlm_init_work_item(dlm_ctxt *dlm, dlm_work_item *i, 
+				      dlm_workfunc_t *f, void *data)
+{
+	DLM_ASSERT(i);
+	DLM_ASSERT(f);
+	memset(i, 0, sizeof(dlm_work_item));
+	i->func = f;
+	INIT_LIST_HEAD(&i->list);
+	i->data = data;
+	i->dlm = dlm;  /* must have already done a dlm_grab on this! */
+}
+
+
+
+static inline void __dlm_set_joining_node(struct _dlm_ctxt *dlm,
+					  u8 node)
+{
+	assert_spin_locked(&dlm->spinlock);
+
+	dlm->joining_node = node;
+	wake_up(&dlm->dlm_join_events);
+}
+
+#define DLM_LOCK_RES_UNINITED             0x00000001
+#define DLM_LOCK_RES_RECOVERING           0x00000002
+#define DLM_LOCK_RES_READY                0x00000004
+#define DLM_LOCK_RES_DIRTY                0x00000008
+#define DLM_LOCK_RES_IN_PROGRESS          0x00000010 
+#define DLM_LOCK_RES_MIGRATING            0x00000020
+
+#define DLM_PURGE_INTERVAL_MS   (8 * 1000)
+
+struct _dlm_lock_resource
+{
+	/* WARNING: Please see the comment in dlm_init_lockres before
+	 * adding fields here. */
+	struct list_head list;
+	struct kref      refs;
+
+	/* please keep these next 3 in this order 
+	 * some funcs want to iterate over all lists */
+	struct list_head granted;
+	struct list_head converting;
+	struct list_head blocked;
+
+	struct list_head dirty;
+	struct list_head recovering; // dlm_recovery_ctxt.resources list
+
+	/* unused lock resources have their last_used stamped and are
+	 * put on a list for the dlm thread to run. */
+	struct list_head purge;
+	unsigned long    last_used;
+
+	spinlock_t spinlock;
+	wait_queue_head_t wq;
+	u8  owner;              //node which owns the lock resource, or unknown
+	u16 state;
+	struct qstr lockname;
+	char lvb[DLM_LVB_LEN];
+};
+
+typedef struct _dlm_migratable_lock
+{
+	u64 cookie;
+
+	/* these 3 are just padding for the in-memory structure, but 
+	 * list and flags are actually used when sent over the wire */ 
+	u16 pad1;
+	u8 list;  // 0=granted, 1=converting, 2=blocked
+	u8 flags; 
+
+	s8 type;
+	s8 convert_type;
+	s8 highest_blocked;
+	u8 node;
+} dlm_migratable_lock;  // 16 bytes
+
+struct _dlm_lock
+{
+	dlm_migratable_lock ml;
+
+	struct list_head list;
+	struct list_head ast_list;
+	struct list_head bast_list;
+	dlm_lock_resource *lockres;
+	spinlock_t spinlock;
+
+	// ast and bast must be callable while holding a spinlock!
+	dlm_astlockfunc_t *ast;     
+	dlm_bastlockfunc_t *bast;
+	void *astdata;
+	dlm_lockstatus *lksb;
+	unsigned ast_pending:1,
+		 bast_pending:1;
+};
+
+
+#define DLM_LKSB_KERNEL_ALLOCATED  0x01  // allocated on master node on behalf of remote node
+#define DLM_LKSB_PUT_LVB           0x02
+#define DLM_LKSB_GET_LVB           0x04
+#define DLM_LKSB_UNUSED2           0x08
+#define DLM_LKSB_UNUSED3           0x10
+#define DLM_LKSB_UNUSED4           0x20
+#define DLM_LKSB_UNUSED5           0x40
+#define DLM_LKSB_UNUSED6           0x80
+
+enum dlm_mle_type {
+	DLM_MLE_BLOCK,
+	DLM_MLE_MASTER,
+	DLM_MLE_MIGRATION
+};
+
+typedef struct _dlm_lock_name
+{
+	u8 len;
+	u8 name[0];   // [DLM_LOCKID_NAME_MAX]
+} dlm_lock_name;
+
+/* good god this needs to be trimmed down */
+typedef struct _dlm_master_list_entry
+{
+	struct list_head list;
+	struct list_head hb_events;
+	dlm_ctxt *dlm;
+	spinlock_t spinlock;
+	wait_queue_head_t wq;
+	atomic_t woken;
+	struct kref mle_refs;
+	unsigned long maybe_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	unsigned long vote_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	unsigned long response_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	u8 master;
+	u8 new_master;
+	u8 error;
+	enum dlm_mle_type type;    // BLOCK or MASTER
+	union {
+		dlm_lock_resource *res;
+		dlm_lock_name name;
+	} u;
+	struct hb_callback_func mle_hb_up;
+	struct hb_callback_func mle_hb_down;
+} dlm_master_list_entry;
+
+typedef struct _dlm_node_iter
+{
+	unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	int curnode;
+} dlm_node_iter;
+
+
+#define DLM_MASTER_REQUEST_MSG  	500
+#define DLM_UNUSED_MSG1            	501
+#define DLM_ASSERT_MASTER_MSG		502
+#define DLM_CREATE_LOCK_MSG		503
+#define DLM_CONVERT_LOCK_MSG		504
+#define DLM_PROXY_AST_MSG		505
+#define DLM_UNLOCK_LOCK_MSG		506
+#define DLM_UNUSED_MSG2			507
+#define DLM_MIGRATE_REQUEST_MSG		508
+#define DLM_MIG_LOCKRES_MSG 		509
+#define DLM_QUERY_JOIN_MSG		510
+#define DLM_ASSERT_JOINED_MSG		511
+#define DLM_CANCEL_JOIN_MSG		512
+#define DLM_EXIT_DOMAIN_MSG		513
+#define DLM_MASTER_REQUERY_MSG		514
+#define DLM_LOCK_REQUEST_MSG		515
+#define DLM_RECO_DATA_DONE_MSG		516
+#define DLM_BEGIN_RECO_MSG		517
+#define DLM_FINALIZE_RECO_MSG		518
+
+
+typedef struct _dlm_reco_node_data
+{
+	int state;
+	u8 node_num;
+	struct list_head list;
+	struct list_head granted;
+	struct list_head converting;
+	struct list_head blocked;
+} dlm_reco_node_data;
+
+enum {
+	DLM_RECO_NODE_DATA_DEAD = -1,
+	DLM_RECO_NODE_DATA_INIT = 0,
+	DLM_RECO_NODE_DATA_REQUESTING,
+	DLM_RECO_NODE_DATA_REQUESTED,
+	DLM_RECO_NODE_DATA_RECEIVING,
+	DLM_RECO_NODE_DATA_DONE,
+	DLM_RECO_NODE_DATA_FINALIZE_SENT,
+};
+
+
+enum {
+	DLM_MASTER_RESP_NO,
+	DLM_MASTER_RESP_YES,
+	DLM_MASTER_RESP_MAYBE,
+	DLM_MASTER_RESP_ERROR
+};
+
+
+typedef struct _dlm_master_request
+{
+	u8 node_idx;
+	u8 namelen;
+	u16 pad1;
+	u32 flags;
+
+	u8 name[NM_MAX_NAME_LEN];
+} dlm_master_request;
+
+typedef struct _dlm_assert_master
+{
+	u8 node_idx;
+	u8 namelen;
+	u16 pad1;
+	u32 flags;
+
+	u8 name[NM_MAX_NAME_LEN];
+} dlm_assert_master;
+
+typedef struct _dlm_migrate_request
+{
+	u8 master;
+	u8 new_master;
+	u8 namelen;
+	u8 pad1;
+	u32 pad2;
+	u8 name[NM_MAX_NAME_LEN];
+} dlm_migrate_request;
+
+typedef struct _dlm_master_requery
+{
+	u8 pad1;
+	u8 pad2;
+	u8 node_idx;
+	u8 namelen;
+	u32 pad3;
+	u8 name[NM_MAX_NAME_LEN];
+} dlm_master_requery;
+
+#define DLM_MRES_RECOVERY   0x01
+#define DLM_MRES_MIGRATION  0x02
+#define DLM_MRES_ALL_DONE   0x04
+
+// NET_MAX_PAYLOAD_BYTES is roughly 4080
+// 240 * 16 = 3840 
+// 3840 + 112 = 3952 bytes
+// leaves us about 128 bytes
+#define DLM_MAX_MIGRATABLE_LOCKS   240 
+
+typedef struct _dlm_migratable_lockres
+{
+	u8 master;
+	u8 lockname_len;
+	u8 num_locks;    // locks sent in this structure
+	u8 flags;
+	u32 total_locks; // locks to be sent for this migration cookie
+	u64 mig_cookie;  // cookie for this lockres migration
+			 // or zero if not needed
+	// 16 bytes
+	u8 lockname[DLM_LOCKID_NAME_MAX];   
+	// 48 bytes
+	u8 lvb[DLM_LVB_LEN];                
+	// 112 bytes
+	dlm_migratable_lock ml[0];  // 16 bytes each, begins at byte 112
+} dlm_migratable_lockres;
+#define DLM_MIG_LOCKRES_MAX_LEN  (sizeof(dlm_migratable_lockres) + \
+				(sizeof(dlm_migratable_lock) * \
+				 DLM_MAX_MIGRATABLE_LOCKS) )
+
+typedef struct _dlm_create_lock
+{
+	u64 cookie;
+
+	u32 flags;
+	u8 pad1;
+	u8 node_idx;
+	s8 requested_type;
+	u8 namelen;
+
+	u8 name[NM_MAX_NAME_LEN];
+} dlm_create_lock;
+
+typedef struct _dlm_convert_lock
+{
+	u64 cookie;
+
+	u32 flags;
+	u8 pad1;
+	u8 node_idx;
+	s8 requested_type;
+	u8 namelen;
+
+	u8 name[NM_MAX_NAME_LEN];
+
+	s8 lvb[0];
+} dlm_convert_lock;
+#define DLM_CONVERT_LOCK_MAX_LEN  (sizeof(dlm_convert_lock) + DLM_LVB_LEN)
+
+typedef struct _dlm_unlock_lock
+{
+	u64 cookie;
+
+	u32 flags;
+	u16 pad1;
+	u8 node_idx;
+	u8 namelen;
+
+	u8 name[NM_MAX_NAME_LEN];
+
+	s8 lvb[0];
+} dlm_unlock_lock;
+#define DLM_UNLOCK_LOCK_MAX_LEN  (sizeof(dlm_unlock_lock) + DLM_LVB_LEN)
+
+typedef struct _dlm_proxy_ast
+{
+	u64 cookie;
+
+	u32 flags;
+	u8 node_idx;
+	u8 type;
+	u8 blocked_type;
+	u8 namelen;
+
+	u8 name[NM_MAX_NAME_LEN];
+
+	s8 lvb[0];
+} dlm_proxy_ast;
+#define DLM_PROXY_AST_MAX_LEN  (sizeof(dlm_proxy_ast) + DLM_LVB_LEN)
+
+#define DLM_MOD_KEY (0x666c6172)
+enum dlm_query_join_response {
+	JOIN_DISALLOW = 0,
+	JOIN_OK,
+	JOIN_OK_NO_MAP,
+};
+
+typedef struct _dlm_lock_request
+{
+	u8 node_idx;
+	u8 dead_node;
+	u16 pad1;
+	u32 pad2;
+} dlm_lock_request;
+
+typedef struct _dlm_reco_data_done
+{
+	u8 node_idx;
+	u8 dead_node;
+	u16 pad1;
+	u32 pad2;
+
+	/* unused for now */
+	/* eventually we can use this to attempt 
+	 * lvb recovery based on each node's info */
+	u8 reco_lvb[DLM_LVB_LEN];
+} dlm_reco_data_done;
+
+typedef struct _dlm_begin_reco
+{
+	u8 node_idx;
+	u8 dead_node;
+	u16 pad1;
+	u32 pad2;
+} dlm_begin_reco;
+
+
+typedef struct _dlm_query_join_request
+{
+	u8 node_idx;
+	u8 pad1[2];
+	u8 name_len;
+	u8 domain[NM_MAX_NAME_LEN];
+} dlm_query_join_request;
+
+typedef struct _dlm_assert_joined
+{
+	u8 node_idx;
+	u8 pad1[2];
+	u8 name_len;
+	u8 domain[NM_MAX_NAME_LEN];
+} dlm_assert_joined;
+
+typedef struct _dlm_cancel_join
+{
+	u8 node_idx;
+	u8 pad1[2];
+	u8 name_len;
+	u8 domain[NM_MAX_NAME_LEN];
+} dlm_cancel_join;
+
+typedef struct _dlm_exit_domain
+{
+	u8 node_idx;
+	u8 pad1[3];
+} dlm_exit_domain;
+
+typedef struct _dlm_finalize_reco
+{
+	u8 node_idx;
+	u8 dead_node;
+	u16 pad1;
+	u32 pad2;
+} dlm_finalize_reco;
+
+
+static inline void dlm_query_join_request_to_net(dlm_query_join_request *m)
+{
+	/* do nothing */
+}
+static inline void dlm_query_join_request_to_host(dlm_query_join_request *m)
+{
+	/* do nothing */
+}
+static inline void dlm_assert_joined_to_net(dlm_assert_joined *m)
+{
+	/* do nothing */
+}
+static inline void dlm_assert_joined_to_host(dlm_assert_joined *m)
+{
+	/* do nothing */
+}
+static inline void dlm_cancel_join_to_net(dlm_cancel_join *m)
+{
+	/* do nothing */
+}
+static inline void dlm_cancel_join_to_host(dlm_cancel_join *m)
+{
+	/* do nothing */
+}
+static inline void dlm_exit_domin_to_net(dlm_exit_domain *m)
+{
+	/* do nothing */
+}
+static inline void dlm_exit_domain_to_host(dlm_exit_domain *m)
+{
+	/* do nothing */
+}
+static inline void dlm_master_request_to_net(dlm_master_request *m)
+{
+	m->flags = htonl(m->flags);
+}
+static inline void dlm_master_request_to_host(dlm_master_request *m)
+{
+	m->flags = ntohl(m->flags);
+}
+
+static inline void dlm_assert_master_to_net(dlm_assert_master *m)
+{
+	m->flags = htonl(m->flags);
+}
+static inline void dlm_assert_master_to_host(dlm_assert_master *m)
+{
+	m->flags = ntohl(m->flags);
+}
+
+static inline void dlm_migrate_request_to_net(dlm_migrate_request *m)
+{
+	/* do nothing */
+}
+static inline void dlm_migrate_request_to_host(dlm_migrate_request *m)
+{
+	/* do nothing */
+}
+
+static inline void dlm_master_requery_to_net(dlm_master_requery *m)
+{
+	/* do nothing */
+}
+static inline void dlm_master_requery_to_host(dlm_master_requery *m)
+{
+	/* do nothing */
+}
+
+static inline void dlm_create_lock_to_net(dlm_create_lock *c)
+{
+	c->cookie = cpu_to_be64(c->cookie);
+	c->flags = htonl(c->flags);
+}
+static inline void dlm_create_lock_to_host(dlm_create_lock *c)
+{
+	c->cookie = be64_to_cpu(c->cookie);
+	c->flags = ntohl(c->flags);
+}
+
+static inline void dlm_convert_lock_to_net(dlm_convert_lock *c)
+{
+	c->cookie = cpu_to_be64(c->cookie);
+	c->flags = htonl(c->flags);
+}
+static inline void dlm_convert_lock_to_host(dlm_convert_lock *c)
+{
+	c->cookie = be64_to_cpu(c->cookie);
+	c->flags = ntohl(c->flags);
+}
+
+static inline void dlm_unlock_lock_to_net(dlm_unlock_lock *u)
+{
+	u->cookie = cpu_to_be64(u->cookie);
+	u->flags = htonl(u->flags);
+}
+static inline void dlm_unlock_lock_to_host(dlm_unlock_lock *u)
+{
+	u->cookie = be64_to_cpu(u->cookie);
+	u->flags = ntohl(u->flags);
+}
+
+static inline void dlm_proxy_ast_to_net(dlm_proxy_ast *a)
+{
+	a->cookie = cpu_to_be64(a->cookie);
+	a->flags = htonl(a->flags);
+}
+static inline void dlm_proxy_ast_to_host(dlm_proxy_ast *a)
+{
+	a->cookie = be64_to_cpu(a->cookie);
+	a->flags = ntohl(a->flags);
+}
+static inline void dlm_migratable_lock_to_net(dlm_migratable_lock *ml)
+{
+	ml->cookie = cpu_to_be64(ml->cookie);
+}
+static inline void dlm_migratable_lock_to_host(dlm_migratable_lock *ml)
+{
+	ml->cookie = be64_to_cpu(ml->cookie);
+}
+static inline void dlm_lock_request_to_net(dlm_lock_request *r)
+{
+	/* do nothing */
+}
+static inline void dlm_lock_request_to_host(dlm_lock_request *r)
+{
+	/* do nothing */
+}
+static inline void dlm_reco_data_done_to_net(dlm_reco_data_done *r)
+{
+	/* do nothing */
+}
+static inline void dlm_reco_data_done_to_host(dlm_reco_data_done *r)
+{
+	/* do nothing */
+}
+
+static inline void dlm_begin_reco_to_net(dlm_begin_reco *r)
+{
+	/* do nothing */
+}
+static inline void dlm_begin_reco_to_host(dlm_begin_reco *r)
+{
+	/* do nothing */
+}
+static inline void dlm_finalize_reco_to_net(dlm_finalize_reco *f)
+{
+	/* do nothing */
+}
+static inline void dlm_finalize_reco_to_host(dlm_finalize_reco *f)
+{
+	/* do nothing */
+}
+
+static inline void dlm_migratable_lockres_to_net(dlm_migratable_lockres *mr)
+{
+	int i, nr = mr->total_locks;
+	
+	DLM_ASSERT(nr >= 0);
+	DLM_ASSERT(nr <= DLM_MAX_MIGRATABLE_LOCKS);
+	
+	mr->total_locks = htonl(mr->total_locks);
+	mr->mig_cookie = cpu_to_be64(mr->mig_cookie);
+	
+	for (i=0; i<nr; i++)
+		dlm_migratable_lock_to_net(&(mr->ml[i]));
+}
+
+static inline void dlm_migratable_lockres_to_host(dlm_migratable_lockres *mr)
+{
+	int i, nr;
+
+	mr->total_locks = ntohl(mr->total_locks);
+	mr->mig_cookie = be64_to_cpu(mr->mig_cookie);
+
+	nr = mr->total_locks;
+	DLM_ASSERT(nr >= 0);
+	DLM_ASSERT(nr <= DLM_MAX_MIGRATABLE_LOCKS);
+
+	for (i=0; i<nr; i++)
+		dlm_migratable_lock_to_host(&(mr->ml[i]));
+}
+
+
+int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
+int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
+int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
+
+int dlm_unlock_lock_handler(net_msg *msg, u32 len, void *data);
+
+
+
+
+
+void dlm_shuffle_lists(dlm_ctxt *dlm, dlm_lock_resource *res);
+void dlm_thread_run_lock_resources(dlm_ctxt *dlm);
+int dlm_launch_thread(dlm_ctxt *dlm);
+void dlm_complete_thread(dlm_ctxt *dlm);
+int dlm_launch_recovery_thread(dlm_ctxt *dlm);
+void dlm_complete_recovery_thread(dlm_ctxt *dlm);
+void dlm_flush_asts(dlm_ctxt *dlm);
+int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res);
+		   
+dlm_status dlmlock_master(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			  dlm_lock *lock, int flags);
+dlm_status dlmlock_remote(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			  dlm_lock *lock, int flags);
+
+dlm_status dlmconvert_master(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			     dlm_lock *lock, int flags, int type);
+dlm_status dlmconvert_remote(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			     dlm_lock *lock, int flags, int type);
+
+dlm_status dlmunlock_common(dlm_ctxt *dlm, dlm_lock_resource *res,
+			    dlm_lock *lock, dlm_lockstatus *lksb,
+			    int flags, int *call_ast, int master_node);
+static inline dlm_status dlmunlock_master(dlm_ctxt *dlm,
+					  dlm_lock_resource *res,
+					  dlm_lock *lock,
+					  dlm_lockstatus *lksb,
+					  int flags,
+					  int *call_ast)
+{
+	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 1);
+}
+
+static inline dlm_status dlmunlock_remote(dlm_ctxt *dlm,
+					  dlm_lock_resource *res,
+					  dlm_lock *lock,
+					  dlm_lockstatus *lksb,
+					  int flags, int *call_ast)
+{
+	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 0);
+}
+
+void dlm_get(dlm_ctxt *dlm);
+void dlm_put(dlm_ctxt *dlm);
+dlm_ctxt *dlm_grab(dlm_ctxt *dlm);
+int dlm_domain_fully_joined(dlm_ctxt *dlm);
+
+int __dlm_lockres_unused(dlm_lock_resource *res);
+void __dlm_lockres_calc_usage(dlm_ctxt *dlm, dlm_lock_resource *res);
+void dlm_lockres_calc_usage(dlm_ctxt *dlm,
+			    dlm_lock_resource *res);
+void dlm_purge_lockres(dlm_ctxt *dlm, dlm_lock_resource *lockres);
+void __dlm_lockres_get(dlm_lock_resource *res);
+
+dlm_lock_resource *dlm_lockres_grab(dlm_ctxt *dlm,
+				    dlm_lock_resource *res);
+void __dlm_lockres_put(dlm_ctxt *dlm,
+		       dlm_lock_resource *res);
+void dlm_lockres_put(dlm_ctxt *dlm,
+		     dlm_lock_resource *res);
+void __dlm_unhash_lock(dlm_ctxt *dlm,
+		       dlm_lock_resource *res);
+void __dlm_insert_lock(dlm_ctxt *dlm,
+		       dlm_lock_resource *res);
+dlm_lock_resource * __dlm_lookup_lock(dlm_ctxt *dlm,
+				      const char *name,
+				      unsigned int len);
+dlm_lock_resource * dlm_lookup_lock(dlm_ctxt *dlm,
+				    const char *name,
+				    unsigned int len);
+
+void dlm_change_lockres_owner(dlm_ctxt *dlm, dlm_lock_resource *res, u8 owner);
+void dlm_set_lockres_owner(dlm_ctxt *dlm, dlm_lock_resource *res, u8 owner);
+dlm_lock_resource * dlm_get_lock_resource(dlm_ctxt *dlm, 
+					  const char *lockid,
+					  int flags);
+dlm_lock_resource *dlm_new_lockres(dlm_ctxt *dlm, 
+				   const char *name, 
+				   unsigned int namelen);
+
+int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
+int dlm_refresh_lock_resource(dlm_ctxt *dlm, dlm_lock_resource *res);
+
+
+void __dlm_queue_ast(dlm_ctxt *dlm, dlm_lock *lock);
+void dlm_queue_ast(dlm_ctxt *dlm, dlm_lock *lock);
+void __dlm_queue_bast(dlm_ctxt *dlm, dlm_lock *lock);
+void dlm_do_local_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock);
+int dlm_do_remote_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock);
+void dlm_do_local_bast(dlm_ctxt *dlm, dlm_lock_resource *res, 
+		       dlm_lock *lock, int blocked_type);
+int dlm_send_proxy_ast_msg(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			   dlm_lock *lock, int msg_type, 
+			   int blocked_type, int flags);
+static inline int dlm_send_proxy_bast(dlm_ctxt *dlm, dlm_lock_resource *res, 
+				      dlm_lock *lock, int blocked_type)
+{
+	return dlm_send_proxy_ast_msg(dlm, res, lock, DLM_BAST,
+				      blocked_type, 0);
+}
+
+static inline int dlm_send_proxy_ast(dlm_ctxt *dlm, dlm_lock_resource *res, 
+				     dlm_lock *lock, int flags)
+{
+	return dlm_send_proxy_ast_msg(dlm, res, lock, DLM_AST,
+				      0, flags);
+}
+
+u8 dlm_nm_this_node(dlm_ctxt *dlm);
+void dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res);
+
+int dlm_nm_init(dlm_ctxt *dlm);
+int dlm_heartbeat_init(dlm_ctxt *dlm);
+void dlm_hb_node_down_cb(struct nm_node *node, int idx, void *data);
+void dlm_hb_node_up_cb(struct nm_node *node, int idx, void *data);
+int dlm_hb_node_dead(dlm_ctxt *dlm, int node);
+int __dlm_hb_node_dead(dlm_ctxt *dlm, int node);
+
+int dlm_migrate_lockres(dlm_ctxt *dlm, dlm_lock_resource *res, u8 target);
+int dlm_finish_migration(dlm_ctxt *dlm, dlm_lock_resource *res, u8 old_master);
+
+int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
+int dlm_master_request_handler(net_msg *msg, u32 len, void *data);
+int dlm_assert_master_handler(net_msg *msg, u32 len, void *data);
+int dlm_migrate_request_handler(net_msg *msg, u32 len, void *data);
+int dlm_mig_lockres_handler(net_msg *msg, u32 len, void *data);
+int dlm_master_requery_handler(net_msg *msg, u32 len, void *data);
+int dlm_request_all_locks_handler(net_msg *msg, u32 len, void *data);
+int dlm_reco_data_done_handler(net_msg *msg, u32 len, void *data);
+int dlm_begin_reco_handler(net_msg *msg, u32 len, void *data);
+int dlm_finalize_reco_handler(net_msg *msg, u32 len, void *data);
+
+	
+int dlm_send_one_lockres(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			 dlm_migratable_lockres *mres, 
+			 u8 send_to, u8 flags);
+void dlm_move_lockres_to_recovery_list(dlm_ctxt *dlm, dlm_lock_resource *res);
+
+void dlm_init_lockres(dlm_ctxt *dlm, dlm_lock_resource *res, 
+		      const char *name, unsigned int namelen);
+
+/* will exit holding res->spinlock, but may drop in function */
+void __dlm_wait_on_lockres_flags(dlm_lock_resource *res, int flags);
+
+/* will exit holding res->spinlock, but may drop in function */
+static inline void __dlm_wait_on_lockres(dlm_lock_resource *res)
+{
+	__dlm_wait_on_lockres_flags(res, (DLM_LOCK_RES_IN_PROGRESS|
+				    	  DLM_LOCK_RES_RECOVERING|
+					  DLM_LOCK_RES_MIGRATING));
+}
+
+void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie);
+
+void dlm_mle_node_down(dlm_ctxt *dlm, dlm_master_list_entry *mle,
+		       struct nm_node *node, int idx);
+void dlm_mle_node_up(dlm_ctxt *dlm, dlm_master_list_entry *mle,
+		       struct nm_node *node, int idx);
+int dlm_do_assert_master(dlm_ctxt *dlm, const char *lockname, 
+			 unsigned int namelen, void *nodemap);
+int dlm_do_migrate_request(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			   u8 master, u8 new_master, dlm_node_iter *iter);
+void dlm_clean_master_list(dlm_ctxt *dlm, u8 dead_node);
+
+
+#define DLM_MLE_DEBUG 1
+
+#ifdef DLM_MLE_DEBUG
+void dlm_dump_all_mles(char *data, int len);
+#endif
+
+
+static inline const char * dlm_lock_mode_name(int mode)
+{
+	switch (mode) {
+		case LKM_EXMODE:
+			return "EX";
+		case LKM_PRMODE:
+			return "PR";
+		case LKM_NLMODE:
+			return "NL";
+	}
+	return "UNKNOWN";
+}
+
+
+static inline int dlm_lock_compatible(int existing, int request)
+{
+	/* NO_LOCK compatible with all */
+	if (request == LKM_NLMODE ||
+	    existing == LKM_NLMODE)
+		return 1;
+
+	/* EX incompatible with all non-NO_LOCK */
+	if (request == LKM_EXMODE)
+		return 0;
+
+	/* request must be PR, which is compatible with PR */
+	if (existing == LKM_PRMODE)
+		return 1;
+
+	return 0;
+}
+
+static inline int dlm_lock_on_list(struct list_head *head, dlm_lock *lock)
+{
+	struct list_head *iter;
+	dlm_lock *tmplock;
+
+	list_for_each(iter, head) {
+		tmplock = list_entry(iter, dlm_lock, list);
+		if (tmplock == lock)
+			return 1;
+	}
+	return 0;
+}
+
+static inline int dlm_mle_equal(dlm_ctxt *dlm,
+				dlm_master_list_entry *mle,
+				const char *name,
+				unsigned int namelen)
+{
+	dlm_lock_resource *res;
+
+	if (dlm != mle->dlm)
+		return 0;
+
+	if (mle->type == DLM_MLE_BLOCK ||
+	    mle->type == DLM_MLE_MIGRATION) {
+		if (namelen != mle->u.name.len ||
+    	    	    strncmp(name, mle->u.name.name, namelen)!=0)
+			return 0;
+	} else {
+		res = mle->u.res;
+		if (namelen != res->lockname.len ||
+		    strncmp(res->lockname.name, name, namelen) != 0)
+			return 0;
+	}
+	return 1;
+}
+
+static inline dlm_status dlm_err_to_dlm_status(int err)
+{
+	dlm_status ret;
+	if (err == -ENOMEM)
+		ret = DLM_SYSERR;
+	else if (err == -ETIMEDOUT || net_link_down(err, NULL)) 
+		ret = DLM_NOLOCKMGR;
+	else if (err == -EINVAL)
+		ret = DLM_BADPARAM;
+	else if (err == -ENAMETOOLONG)
+		ret = DLM_IVBUFLEN;
+	else
+		ret = DLM_BADARGS;
+	return ret;
+}
+
+
+static inline void dlm_node_iter_init(unsigned long *map, dlm_node_iter *iter)
+{
+	DLM_ASSERT(iter);
+	memcpy(iter->node_map, map, sizeof(iter->node_map));
+	iter->curnode = -1;
+}
+
+static inline int dlm_node_iter_next(dlm_node_iter *iter)
+{
+	int bit;
+	DLM_ASSERT(iter);
+	bit = find_next_bit(iter->node_map, NM_MAX_NODES, iter->curnode+1);
+	if (bit >= NM_MAX_NODES) {
+		iter->curnode = NM_MAX_NODES;
+		return -ENOENT;
+	}
+	iter->curnode = bit;
+	return bit;
+}
+
+
+
+#endif /* DLMCOMMON_H */

Modified: trunk/fs/ocfs2/dlm/dlmconvert.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmconvert.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmconvert.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -44,8 +44,8 @@
 #include "cluster/nodemanager.h"
 #include "cluster/tcp.h"
 
+#include "dlmapi.h"
 #include "dlmcommon.h"
-#include "dlmmod.h"
 
 
 /* NOTE: __dlmconvert_master is the only function in here that

Added: trunk/fs/ocfs2/dlm/dlmdebug.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmdebug.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmdebug.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -0,0 +1,310 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * dlmdebug.c
+ *
+ * debug functionality for the dlm
+ *
+ * Copyright (C) 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ *
+ */
+
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+#include <linux/utsname.h>
+#include <linux/sysctl.h>
+#include <linux/spinlock.h>
+#include <linux/proc_fs.h>
+
+#include "cluster/heartbeat.h"
+#include "cluster/nodemanager.h"
+#include "cluster/tcp.h"
+
+#include "dlmapi.h"
+#include "dlmcommon.h"
+
+#include "dlmdomain.h"
+
+static void dlm_dump_all_lock_resources(char *data, int len);
+static void dlm_dump_lock_resources(dlm_ctxt *dlm);
+static void dlm_dump_purge_list(dlm_ctxt *dlm);
+static void dlm_dump_all_purge_lists(char *data, int len);
+static void dlm_trigger_migration(char *data, int len);
+
+typedef void (dlm_debug_func_t)(char *data, int len);
+
+typedef struct _dlm_debug_funcs
+{
+	char key;
+	dlm_debug_func_t *func;
+} dlm_debug_funcs;
+
+static dlm_debug_funcs dlm_debug_map[] = {
+	{ 'r', dlm_dump_all_lock_resources },
+#ifdef DLM_MLE_DEBUG
+	{ 'm', dlm_dump_all_mles },
+#endif
+	{ 'p', dlm_dump_all_purge_lists  },
+	{ 'M', dlm_trigger_migration },
+};
+static int dlm_debug_map_sz = (sizeof(dlm_debug_map) / 
+			       sizeof(dlm_debug_funcs));
+
+static ssize_t write_dlm_debug(struct file *file, const char __user *buf,
+                                   size_t count, loff_t *ppos)
+{
+	int i;
+	char c;
+	dlm_debug_func_t *fn;
+
+	printk("(%p, %p, %u, %lld)\n",
+		  file, buf, (unsigned int)count, (long long)*ppos);
+	if (!count)
+		return 0;
+
+	if (get_user(c, buf))
+		return -EFAULT;
+
+	for (i=0; i < dlm_debug_map_sz; i++) {
+		dlm_debug_funcs *d = &dlm_debug_map[i];
+		if (c == d->key) {
+			fn = d->func;
+			if (fn)
+				(fn)((char *)buf, count);
+			break;
+		}
+	}
+	return count;
+}
+
+static struct file_operations dlm_debug_operations = {
+	.write          = write_dlm_debug,
+};
+
+void dlm_create_dlm_debug_proc_entry(void)
+{
+	struct proc_dir_entry *entry;
+	entry = create_proc_entry("dlm-debug", S_IWUSR, NULL);
+	if (entry)
+		entry->proc_fops = &dlm_debug_operations;
+}
+
+static void dlm_dump_all_lock_resources(char *data, int len)
+{
+	dlm_ctxt *dlm;
+	struct list_head *iter;
+
+	printk("dumping ALL dlm state for node %s\n", 
+		  system_utsname.nodename);
+	spin_lock(&dlm_domain_lock);
+	list_for_each(iter, &dlm_domains) {
+		dlm = list_entry (iter, dlm_ctxt, list);
+		dlm_dump_lock_resources(dlm);
+	}
+	spin_unlock(&dlm_domain_lock);
+}
+
+static void dlm_dump_lock_resources(dlm_ctxt *dlm)
+{
+	dlm_lock_resource *res;
+	dlm_lock *lock;
+	struct list_head *iter, *iter2;
+	struct list_head *bucket;
+	int i;
+
+	printk("dlm_ctxt: %s, node=%u, key=%u\n", 
+		  dlm->name, dlm->node_num, dlm->key);
+	printk("some bug here... should not have to check for this...\n");
+	if (!dlm || !dlm->name) {
+		printk("wtf... dlm=%p\n", dlm);
+		return;
+	}
+		
+	spin_lock(&dlm->spinlock);
+	for (i=0; i<DLM_HASH_SIZE; i++) {
+		bucket = &(dlm->resources[i]);
+		list_for_each(iter, bucket) {
+			res = list_entry(iter, dlm_lock_resource, list);
+			printk("lockres: %.*s, owner=%u, state=%u\n", 
+			       res->lockname.len, res->lockname.name, 
+			       res->owner, res->state);
+			spin_lock(&res->spinlock);
+			printk("  granted queue: \n");
+			list_for_each(iter2, &res->granted) {
+				lock = list_entry(iter2, dlm_lock, list);
+				spin_lock(&lock->spinlock);
+				printk("    type=%d, conv=%d, node=%u, " 
+				       "cookie=%llu\n", lock->ml.type, 
+				       lock->ml.convert_type, lock->ml.node, 
+				       lock->ml.cookie);
+				spin_unlock(&lock->spinlock);
+			}
+			printk("  converting queue: \n");
+			list_for_each(iter2, &res->converting) {
+				lock = list_entry(iter2, dlm_lock, list);
+				spin_lock(&lock->spinlock);
+				printk("    type=%d, conv=%d, node=%u, " 
+				       "cookie=%llu\n", lock->ml.type, 
+				       lock->ml.convert_type, lock->ml.node, 
+				       lock->ml.cookie);
+				spin_unlock(&lock->spinlock);
+			}
+			printk("  blocked queue: \n");
+			list_for_each(iter2, &res->blocked) {
+				lock = list_entry(iter2, dlm_lock, list);
+				spin_lock(&lock->spinlock);
+				printk("    type=%d, conv=%d, node=%u, " 
+				       "cookie=%llu\n", lock->ml.type, 
+				       lock->ml.convert_type, lock->ml.node, 
+				       lock->ml.cookie);
+				spin_unlock(&lock->spinlock);
+			}
+			spin_unlock(&res->spinlock);
+		}
+	}
+	spin_unlock(&dlm->spinlock);
+}
+
+static void dlm_dump_purge_list(dlm_ctxt *dlm)
+{
+	struct list_head *iter;
+	dlm_lock_resource *lockres;
+
+	printk("Purge list for DLM Domain \"%s\"\n", dlm->name);
+	printk("Last_used\tName\n");
+
+	spin_lock(&dlm->spinlock);
+	list_for_each(iter, &dlm->purge_list) {
+		lockres = list_entry(iter, dlm_lock_resource, purge);
+
+		spin_lock(&lockres->spinlock);
+		printk("%lu\t%.*s\n", lockres->last_used,
+		       lockres->lockname.len, lockres->lockname.name);
+		spin_unlock(&lockres->spinlock);
+	}
+	spin_unlock(&dlm->spinlock);
+}
+
+static void dlm_dump_all_purge_lists(char *data, int len)
+{
+	dlm_ctxt *dlm;
+	struct list_head *iter;
+
+	spin_lock(&dlm_domain_lock);
+	list_for_each(iter, &dlm_domains) {
+		dlm = list_entry (iter, dlm_ctxt, list);
+		dlm_dump_purge_list(dlm);
+	}
+	spin_unlock(&dlm_domain_lock);
+}
+
+static void dlm_trigger_migration(char *data, int len)
+{
+	dlm_lock_resource *res;
+	dlm_ctxt *dlm;
+	char *resname;
+	char *domainname;
+	char *tmp, *buf = NULL;
+
+	if (len >= PAGE_SIZE) {
+		printk("user passed too much data: %d bytes\n", len);
+		return;
+	}
+	if (len < 5) {
+		printk("user passed too little data: %d bytes\n", len);
+		return;
+	}
+	buf = kmalloc(len+1, GFP_KERNEL);
+	if (!buf) {
+		printk("could not alloc %d bytes\n", len);
+		return;
+	}
+	if (strncpy_from_user(buf, data, len) < len) {
+		printk("failed to get all user data.  done.\n");
+		goto leave;
+	}
+	buf[len]='\0';
+	dlmprintk("got this data from user: %s\n", buf);
+
+	tmp = buf;
+	if (*tmp != 'M') {
+		printk("bad data\n");
+		goto leave;
+	}
+	tmp++;
+	if (*tmp != ' ') {
+		printk("bad data\n");
+		goto leave;
+	}
+	tmp++;
+	domainname = tmp;
+	
+	while (*tmp) {
+		if (*tmp == ' ')
+			break;
+		tmp++;
+	}
+	if (!*tmp || !*(tmp+1)) {
+		printk("bad data\n");
+		goto leave;
+	}
+
+	*tmp = '\0';  // null term the domainname	
+	tmp++;
+	resname = tmp;
+	while (*tmp) {
+		if (*tmp == '\n' ||
+		    *tmp == ' ' ||
+		    *tmp == '\r') {
+			*tmp = '\0';
+			break;
+		}
+		tmp++;
+	}
+
+	printk("now looking up domain %s, lockres %s\n",
+	       domainname, resname);
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain(domainname);
+	spin_unlock(&dlm_domain_lock);
+
+	if (!dlm_grab(dlm)) {
+		printk("bad dlm!\n");
+		goto leave;
+	}
+
+       	res = dlm_lookup_lock(dlm, resname, strlen(resname));
+	if (!res) {
+		printk("bad lockres!\n");
+		dlm_put(dlm);
+		goto leave;
+	}
+
+	printk("woo! found dlm=%p, lockres=%p\n", dlm, res);
+	{
+		int ret;
+		ret = dlm_migrate_lockres(dlm, res, NM_MAX_NODES);
+		printk("dlm_migrate_lockres returned %d\n", ret);
+	}
+	dlm_lockres_put(dlm, res);
+	dlm_put(dlm);
+
+leave:
+	kfree(buf);
+}

Added: trunk/fs/ocfs2/dlm/dlmdebug.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmdebug.h	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmdebug.h	2005-03-23 01:21:05 UTC (rev 2038)
@@ -0,0 +1,30 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * dlmdebug.h
+ *
+ * Copyright (C) 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ *
+ */
+
+#ifndef DLMDEBUG_H
+#define DLMDEBUG_H
+
+void dlm_create_dlm_debug_proc_entry(void);
+
+#endif

Added: trunk/fs/ocfs2/dlm/dlmdomain.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmdomain.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmdomain.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -0,0 +1,1350 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * dlmdomain.c
+ *
+ * defines domain join / leave apis
+ *
+ * Copyright (C) 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ *
+ */
+
+#include <linux/module.h>
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+#include <linux/utsname.h>
+#include <linux/init.h>
+#include <linux/spinlock.h>
+
+
+#include "cluster/heartbeat.h"
+#include "cluster/nodemanager.h"
+#include "cluster/tcp.h"
+
+#include "dlmapi.h"
+#include "dlmcommon.h"
+
+#include "dlmdebug.h"
+
+/*
+ *
+ * spinlock lock ordering: if multiple locks are needed, obey this ordering:
+ *    dlm_domain_lock -> dlm_ctxt -> dlm_lock_resource -> dlm_lock
+ *
+ */
+
+spinlock_t dlm_domain_lock = SPIN_LOCK_UNLOCKED;
+LIST_HEAD(dlm_domains);
+DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
+
+static int dlm_query_join_handler(net_msg *msg, u32 len, void *data);
+static int dlm_assert_joined_handler(net_msg *msg, u32 len, void *data);
+static int dlm_cancel_join_handler(net_msg *msg, u32 len, void *data);
+static int dlm_exit_domain_handler(net_msg *msg, u32 len, void *data);
+
+static void dlm_unregister_domain_handlers(dlm_ctxt *dlm);
+
+void __dlm_unhash_lock(dlm_ctxt *dlm,
+		       dlm_lock_resource *lockres)
+{
+	list_del_init(&lockres->list);
+	__dlm_lockres_put(dlm, lockres);
+}
+
+void __dlm_insert_lock(dlm_ctxt *dlm,
+		       dlm_lock_resource *res)
+{
+	struct list_head *bucket;
+	struct qstr *q;
+
+	assert_spin_locked(&dlm->spinlock);
+
+	q = &res->lockname;
+	q->hash = full_name_hash(q->name, q->len);
+	bucket = &(dlm->resources[q->hash & DLM_HASH_MASK]);
+
+	/* get a reference for our hashtable */
+	__dlm_lockres_get(res);
+
+	list_add_tail(&res->list, bucket);
+}
+
+dlm_lock_resource * __dlm_lookup_lock(dlm_ctxt *dlm,
+				      const char *name,
+				      unsigned int len)
+{
+	unsigned int hash;
+	struct list_head *iter;
+	dlm_lock_resource *tmpres=NULL;
+	struct list_head *bucket;
+
+	BUG_ON(!name);
+
+	dlmprintk0("\n");
+
+	assert_spin_locked(&dlm->spinlock);
+
+	hash = full_name_hash(name, len);
+
+	bucket = &(dlm->resources[hash & DLM_HASH_MASK]);
+
+	/* check for pre-existing lock */
+	list_for_each(iter, bucket) {
+		tmpres = list_entry(iter, dlm_lock_resource, list);
+		if (tmpres->lockname.len == len &&
+		    strncmp(tmpres->lockname.name, name, len) == 0) {
+			__dlm_lockres_get(tmpres);
+			break;
+		}
+
+		tmpres = NULL;
+	}
+	return tmpres;
+}
+
+dlm_lock_resource * dlm_lookup_lock(dlm_ctxt *dlm, 
+				    const char *name,
+				    unsigned int len)
+{
+	dlm_lock_resource *res;
+
+	BUG_ON(!dlm);
+
+	spin_lock(&dlm->spinlock);
+	res = __dlm_lookup_lock(dlm, name, len);
+	spin_unlock(&dlm->spinlock);
+	return res;
+}
+
+static dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
+{
+	dlm_ctxt *tmp = NULL;
+	struct list_head *iter;
+
+	assert_spin_locked(&dlm_domain_lock);
+
+	list_for_each(iter, &dlm_domains) {
+		tmp = list_entry (iter, dlm_ctxt, list);
+		if (strncmp(tmp->name, domain, len)==0)
+			break;
+		tmp = NULL;
+	}
+
+	return tmp;
+}
+
+/* For null terminated domain strings ONLY */
+dlm_ctxt * __dlm_lookup_domain(const char *domain)
+{
+	assert_spin_locked(&dlm_domain_lock);
+
+	return __dlm_lookup_domain_full(domain, strlen(domain));
+}
+
+
+/* returns true on one of two conditions:
+ * 1) the domain does not exist
+ * 2) the domain exists and it's state is "joined" */
+static int dlm_wait_on_domain_helper(const char *domain)
+{
+	int ret = 0;
+	dlm_ctxt *tmp = NULL;
+
+	spin_lock(&dlm_domain_lock);
+
+	tmp = __dlm_lookup_domain(domain);
+	if (!tmp)
+		ret = 1;
+	else if (tmp->dlm_state == DLM_CTXT_JOINED)
+		ret = 1;
+
+	spin_unlock(&dlm_domain_lock);
+	return ret;
+}
+
+static void dlm_free_ctxt_mem(dlm_ctxt *dlm)
+{
+	BUG_ON(!dlm);
+
+	if (dlm->resources)
+		free_page((unsigned long) dlm->resources);
+
+	if (dlm->name)
+		kfree(dlm->name);
+
+	kfree(dlm);
+}
+
+/* A little strange - this function will be called while holding
+ * dlm_domain_lock and is expected to be holding it on the way out. We
+ * will however drop and reacquire it multiple times */
+static void dlm_ctxt_release(struct kref *kref)
+{
+	dlm_ctxt *dlm;
+
+	BUG_ON(!kref);
+
+	dlm = container_of(kref, dlm_ctxt, dlm_refs);
+
+	BUG_ON(dlm->num_joins);
+	BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
+
+	/* we may still be in the list if we hit an error during join. */
+	list_del_init(&dlm->list);
+
+	spin_unlock(&dlm_domain_lock);
+
+	dlmprintk("freeing memory from domain %s\n", dlm->name);
+
+	wake_up(&dlm_domain_events);
+
+	dlm_free_ctxt_mem(dlm);
+
+	spin_lock(&dlm_domain_lock);
+}
+
+void dlm_put(dlm_ctxt *dlm)
+{
+	BUG_ON(!dlm);
+
+	spin_lock(&dlm_domain_lock);
+	kref_put(&dlm->dlm_refs, dlm_ctxt_release);
+	spin_unlock(&dlm_domain_lock);
+}
+
+static void __dlm_get(dlm_ctxt *dlm)
+{
+	kref_get(&dlm->dlm_refs);
+}
+
+/* given a questionable reference to a dlm object, gets a reference if
+ * it can find it in the list, otherwise returns NULL in which case
+ * you shouldn't trust your pointer. */
+dlm_ctxt *dlm_grab(dlm_ctxt *dlm)
+{
+	struct list_head *iter;
+	dlm_ctxt *target = NULL;
+
+	spin_lock(&dlm_domain_lock);
+
+	list_for_each(iter, &dlm_domains) {
+		target = list_entry (iter, dlm_ctxt, list);
+
+		if (target == dlm) {
+			__dlm_get(target);
+			break;
+		}
+
+		target = NULL;
+	}
+
+	spin_unlock(&dlm_domain_lock);
+
+	return target;
+}
+
+void dlm_get(dlm_ctxt *dlm)
+{
+	BUG_ON(!dlm);
+
+	spin_lock(&dlm_domain_lock);
+	__dlm_get(dlm);
+	spin_unlock(&dlm_domain_lock);
+}
+
+int dlm_domain_fully_joined(dlm_ctxt *dlm)
+{
+	int ret;
+
+	spin_lock(&dlm_domain_lock);
+	ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
+		(dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
+	spin_unlock(&dlm_domain_lock);
+
+	return ret;
+}
+
+static void dlm_complete_dlm_shutdown(dlm_ctxt *dlm)
+{
+	hb_unregister_callback(&dlm->dlm_hb_up);
+	hb_unregister_callback(&dlm->dlm_hb_down);
+
+	dlm_unregister_domain_handlers(dlm);
+
+	dlm_complete_thread(dlm);
+	dlm_complete_recovery_thread(dlm);
+
+	/* We've left the domain. Now we can take ourselves out of the
+	 * list and allow the kref stuff to help us free the
+	 * memory. */
+	spin_lock(&dlm_domain_lock);
+	list_del_init(&dlm->list);
+	spin_unlock(&dlm_domain_lock);
+
+	/* Wake up anyone waiting for us to remove this domain */
+	wake_up(&dlm_domain_events);
+}
+
+static void dlm_migrate_all_locks(dlm_ctxt *dlm)
+{
+	int i, ret;
+	dlm_lock_resource *res;
+	struct list_head *iter;
+
+	dlmprintk("Migrating locks from domain %s\n", dlm->name);
+	spin_lock(&dlm->spinlock);
+	for (i=0; i<DLM_HASH_SIZE; i++) {
+		while (!list_empty(&dlm->resources[i])) {
+			res = list_entry(dlm->resources[i].next,
+				     dlm_lock_resource, list);
+			/* this should unhash the lockres 
+			 * and exit with dlm->spinlock */
+			dlmprintk("purging res=%p\n", res);
+			if (res->state & DLM_LOCK_RES_DIRTY ||
+			    !list_empty(&res->dirty)) {
+				dlmprintk0("this is probably a bug, dirty\n");
+				/* HACK!  this should absolutely go.
+				 * need to figure out why some empty 
+				 * lockreses are still marked dirty */
+				dlm_shuffle_lists(dlm, res);
+				spin_lock(&res->spinlock);
+				list_del_init(&res->dirty);
+				res->state &= ~DLM_LOCK_RES_DIRTY;
+				spin_unlock(&res->spinlock);
+			}
+			dlm_purge_lockres(dlm, res);
+		}
+	}
+	spin_unlock(&dlm->spinlock);
+
+	dlmprintk("DONE Migrating locks from domain %s\n", dlm->name);
+}
+
+static int dlm_no_joining_node(dlm_ctxt *dlm)
+{
+	int ret;
+
+	spin_lock(&dlm->spinlock);
+	ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
+	spin_unlock(&dlm->spinlock);
+
+	return ret;
+}
+
+static void dlm_mark_domain_leaving(dlm_ctxt *dlm)
+{
+	/* Yikes, a double spinlock! I need domain_lock for the dlm
+	 * state and the dlm spinlock for join state... Sorry! */
+again:
+	spin_lock(&dlm_domain_lock);
+	spin_lock(&dlm->spinlock);
+
+	if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
+		dlmprintk("Node %d is joining, we wait on it.\n",
+			  dlm->joining_node);
+		spin_unlock(&dlm->spinlock);
+		spin_unlock(&dlm_domain_lock);
+
+		wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
+		goto again;
+	}
+
+	dlm->dlm_state = DLM_CTXT_LEAVING;
+	spin_unlock(&dlm->spinlock);
+	spin_unlock(&dlm_domain_lock);
+}
+
+static void __dlm_print_nodes(dlm_ctxt *dlm)
+{
+	int node = -1;
+
+	assert_spin_locked(&dlm->spinlock);
+
+	printk("ocfs2_dlm: Nodes in my domain (\"%s\"):\n", dlm->name);
+
+	while ((node = find_next_bit(dlm->domain_map, NM_MAX_NODES, node + 1))
+	       != -1) {
+		if (node >= NM_MAX_NODES)
+			break;
+		printk(" node %d\n", node);
+	}
+}
+
+static int dlm_exit_domain_handler(net_msg *msg, u32 len, void *data)
+{
+	dlm_ctxt *dlm = data;
+	unsigned int node;
+	dlm_exit_domain *exit_msg = (dlm_exit_domain *) msg->buf;
+
+	dlmprintk0("\n");
+
+	if (!dlm_grab(dlm))
+		return 0;
+
+	dlm_exit_domain_to_host(exit_msg);
+
+	node = exit_msg->node_idx;
+
+	dlmprintk("Node %u leaves domain %s\n", node, dlm->name);
+
+	spin_lock(&dlm->spinlock);
+	clear_bit(node, dlm->domain_map);
+	__dlm_print_nodes(dlm);
+	spin_unlock(&dlm->spinlock);
+
+	dlm_put(dlm);
+
+	return 0;
+}
+
+static int dlm_send_one_domain_exit(dlm_ctxt *dlm,
+				    unsigned int node)
+{
+	int status;
+	dlm_exit_domain leave_msg;
+
+	dlmprintk("Asking node %u if we can leave the domain %s me = %u\n",
+		  node, dlm->name, dlm->node_num);
+
+	memset(&leave_msg, 0, sizeof(leave_msg));
+	leave_msg.node_idx = dlm->node_num;
+
+	dlm_exit_domin_to_net(&leave_msg);
+
+	status = net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
+				  &leave_msg, sizeof(leave_msg), node,
+				  NULL);
+
+	dlmprintk("status return %d from net_send_message\n", status);
+
+	return status;
+}
+
+
+static void dlm_leave_domain(dlm_ctxt *dlm)
+{
+	int node, clear_node, status;
+
+	/* At this point we've migrated away all our locks and won't
+	 * accept mastership of new ones. The dlm is responsible for
+	 * almost nothing now. We make sure not to confuse any joining
+	 * nodes and then commence shutdown procedure. */
+
+	spin_lock(&dlm->spinlock);
+	/* Clear ourselves from the domain map */
+	clear_bit(dlm->node_num, dlm->domain_map);
+	while ((node = find_next_bit(dlm->domain_map, NM_MAX_NODES, 0)) 
+	       != -1) {
+		if (node > NM_MAX_NODES)
+			break;
+
+		/* Drop the dlm spinlock. This is safe wrt the domain_map.
+		 * -nodes cannot be added now as the
+                 *   query_join_handlers knows to respond with OK_NO_MAP
+		 * -we catch the right network errors if a node is
+                 *   removed from the map while we're sending him the
+                 *   exit message. */
+		spin_unlock(&dlm->spinlock);
+
+		clear_node = 1;
+
+		status = dlm_send_one_domain_exit(dlm, node);
+		if (status < 0 &&
+		    status != -ENOPROTOOPT &&
+		    status != -ENOTCONN) {
+			printk("dlm_leave_domain: Error %d sending "
+			       "domain exit message to node %d\n", status,
+			       node);
+
+			/* Not sure what to do here but lets sleep for
+			 * a bit in case this was a transient
+			 * error... */
+			schedule();
+			clear_node = 0;
+		}
+
+		spin_lock(&dlm->spinlock);
+		/* If we're not clearing the node bit then we intend
+		 * to loop back around to try again. */
+		if (clear_node)
+			clear_bit(node, dlm->domain_map);
+	}
+	spin_unlock(&dlm->spinlock);
+}
+
+void dlm_unregister_domain(dlm_ctxt *dlm)
+{
+	int leave = 0;
+
+	BUG_ON(!dlm);
+
+	spin_lock(&dlm_domain_lock);
+	BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
+	BUG_ON(!dlm->num_joins);
+
+	dlm->num_joins--;
+	if (!dlm->num_joins) {
+		/* We mark it "in shutdown" now so new register
+		 * requests wait until we've completely left the
+		 * domain. Don't use DLM_CTXT_LEAVING yet as we still
+		 * want new domain joins to communicate with us at
+		 * least until we've completed migration of our
+		 * resources. */
+		dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
+		leave = 1;
+	}
+	spin_unlock(&dlm_domain_lock);
+
+	if (leave) {
+		dlmprintk("shutting down domain %s\n", dlm->name);
+		dlm_migrate_all_locks(dlm);
+		dlm_mark_domain_leaving(dlm);
+		dlm_leave_domain(dlm);
+		dlm_complete_dlm_shutdown(dlm);
+	}
+	dlm_put(dlm);
+}
+EXPORT_SYMBOL(dlm_unregister_domain);
+
+static int dlm_query_join_handler(net_msg *msg, u32 len, void *data)
+{
+	dlm_query_join_request *query;
+	enum dlm_query_join_response response;
+	dlm_ctxt *dlm = NULL;
+
+	query = (dlm_query_join_request *) msg->buf;
+	dlm_query_join_request_to_host(query);
+
+	dlmprintk("node %u wants to join domain %s\n", query->node_idx,
+		  query->domain);
+
+	response = JOIN_OK_NO_MAP;
+
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
+	/* Once the dlm ctxt is marked as leaving then we don't want
+	 * to be put in someone's domain map. */
+	if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
+		spin_lock(&dlm->spinlock);
+
+		if (dlm->dlm_state == DLM_CTXT_NEW &&
+		    dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
+			/*If this is a brand new context and we
+			 * haven't started our join process yet, then
+			 * the other node won the race. */
+			response = JOIN_OK_NO_MAP;
+		} else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
+			/* Disallow parallel joins. */
+			response = JOIN_DISALLOW;
+		} else {
+			/* Alright we're fully a part of this domain
+			 * so we keep some state as to who's joining
+			 * and indicate to him that needs to be fixed
+			 * up. */
+			response = JOIN_OK;
+			__dlm_set_joining_node(dlm, query->node_idx);
+		}
+
+		spin_unlock(&dlm->spinlock);
+	}
+	spin_unlock(&dlm_domain_lock);
+
+	dlmprintk("We respond with %u\n", response);
+
+	return response;
+}
+
+static int dlm_assert_joined_handler(net_msg *msg, u32 len, void *data)
+{
+	dlm_assert_joined *assert;
+	dlm_ctxt *dlm = NULL;
+
+	assert = (dlm_assert_joined *) msg->buf;
+	dlm_assert_joined_to_host(assert);
+
+	dlmprintk("node %u asserts join on domain %s\n", assert->node_idx,
+		  assert->domain);
+
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
+	/* XXX should we consider no dlm ctxt an error? */
+	if (dlm) {
+		spin_lock(&dlm->spinlock);
+
+		/* Alright, this node has officially joined our
+		 * domain. Set him in the map and clean up our
+		 * leftover join state. */
+		BUG_ON(dlm->joining_node != assert->node_idx);
+		set_bit(assert->node_idx, dlm->domain_map);
+		__dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
+
+		__dlm_print_nodes(dlm);
+
+		spin_unlock(&dlm->spinlock);
+	}
+	spin_unlock(&dlm_domain_lock);
+
+	return 0;
+}
+
+static int dlm_cancel_join_handler(net_msg *msg, u32 len, void *data)
+{
+	dlm_cancel_join *cancel;
+	dlm_ctxt *dlm = NULL;
+
+	cancel = (dlm_cancel_join *) msg->buf;
+	dlm_cancel_join_to_host(cancel);
+
+	dlmprintk("node %u cancels join on domain %s\n", cancel->node_idx,
+		  cancel->domain);
+
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
+
+	if (dlm) {
+		spin_lock(&dlm->spinlock);
+
+		/* Yikes, this guy wants to cancel his join. No
+		 * problem, we simply cleanup our join state. */
+		BUG_ON(dlm->joining_node != cancel->node_idx);
+		__dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
+
+		spin_unlock(&dlm->spinlock);
+	}
+	spin_unlock(&dlm_domain_lock);
+
+	return 0;
+}
+
+static int dlm_send_one_join_cancel(dlm_ctxt *dlm,
+				    unsigned int node)
+{
+	int status;
+	dlm_cancel_join cancel_msg;
+
+	memset(&cancel_msg, 0, sizeof(cancel_msg));
+	cancel_msg.node_idx = dlm->node_num;
+	cancel_msg.name_len = strlen(dlm->name);
+	strncpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
+
+	dlm_cancel_join_to_net(&cancel_msg);
+
+	status = net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
+				  &cancel_msg, sizeof(cancel_msg), node,
+				  NULL);
+	if (status < 0) {
+		dlmprintk("net_send_message returned %d!\n", status);
+		goto bail;
+	}
+
+bail:
+	return status;
+}
+
+/* map_size should be in bytes. */
+static int dlm_send_join_cancels(dlm_ctxt *dlm,
+				 unsigned long *node_map,
+				 unsigned int map_size)
+{
+	int status, tmpstat;
+	unsigned int node;
+
+	if (map_size != BITS_TO_LONGS(NM_MAX_NODES))
+		return -EINVAL;
+
+	status = 0;
+	node = -1;
+	while ((node = find_next_bit(node_map, NM_MAX_NODES, node + 1))
+	       != -1) {
+		if (node >= NM_MAX_NODES)
+			break;
+
+		if (node == dlm->node_num)
+			continue;
+
+		tmpstat = dlm_send_one_join_cancel(dlm, node);
+		if (tmpstat) {
+			dlmprintk("Error return %d cancelling join on node "
+				  "%d\n", tmpstat, node);
+			if (!status)
+				status = tmpstat;
+		}
+	}
+
+	return status;
+}
+
+static int dlm_request_join(dlm_ctxt *dlm,
+			    int node,
+			    enum dlm_query_join_response *response)
+{
+	int status, retval;
+	dlm_query_join_request join_msg;
+
+	dlmprintk("querying node %d\n", node);
+
+	memset(&join_msg, 0, sizeof(join_msg));
+	join_msg.node_idx = dlm->node_num;
+	join_msg.name_len = strlen(dlm->name);
+	strncpy(join_msg.domain, dlm->name, join_msg.name_len);
+
+	dlm_query_join_request_to_net(&join_msg);
+
+	status = net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
+				  sizeof(join_msg), node, &retval);
+	if (status < 0 && status != -ENOPROTOOPT && status != -ENOTCONN) {
+		dlmprintk("net_send_message returned %d!\n", status);
+		goto bail;
+	}
+
+	/* -ENOPROTOOPT from the net code means the other side isn't
+            listening for our message type -- that's fine, it means
+            his dlm isn't up, so we can consider him a 'yes' but not
+            joined into the domain. 
+	   -ENOTCONN is treated similarly -- it's returned from the
+            core kernel net code however and indicates that they don't
+            even have their cluster networking module loaded (bad
+            user!) */
+	if (status == -ENOPROTOOPT || status == -ENOTCONN) {
+		status = 0;
+		*response = JOIN_OK_NO_MAP;
+	} else if (retval == JOIN_DISALLOW ||
+		   retval == JOIN_OK ||
+		   retval == JOIN_OK_NO_MAP) {
+		*response = retval;
+	} else {
+		status = -EINVAL;
+		dlmprintk("invalid response %d from node %u\n", retval, node);
+	}
+
+	dlmprintk("status %d, node %d response is %d\n", status, node,
+		  *response);
+
+bail:
+	return status;
+}
+
+static int dlm_send_one_join_assert(dlm_ctxt *dlm,
+				    unsigned int node)
+{
+	int status;
+	dlm_assert_joined assert_msg;
+
+	dlmprintk("Sending join assert to node %u\n", node);
+
+	memset(&assert_msg, 0, sizeof(assert_msg));
+	assert_msg.node_idx = dlm->node_num;
+	assert_msg.name_len = strlen(dlm->name);
+	strncpy(assert_msg.domain, dlm->name, assert_msg.name_len);
+
+	dlm_assert_joined_to_net(&assert_msg);
+
+	status = net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
+				  &assert_msg, sizeof(assert_msg), node, NULL);
+	if (status < 0)
+		dlmprintk("net_send_message returned %d!\n", status);
+
+	return status;
+}
+
+static void dlm_send_join_asserts(dlm_ctxt *dlm,
+				  unsigned long *node_map)
+{
+	int status, node, live;
+
+	status = 0;
+	node = -1;
+	while ((node = find_next_bit(node_map, NM_MAX_NODES, node + 1))
+	       != -1) {
+		if (node >= NM_MAX_NODES)
+			break;
+
+		if (node == dlm->node_num)
+			continue;
+
+		do {
+			/* It is very important that this message be
+			 * received so we spin until either the node
+			 * has died or it gets the message. */
+			status = dlm_send_one_join_assert(dlm, node);
+
+			spin_lock(&dlm->spinlock);
+			live = test_bit(node, dlm->live_nodes_map);
+			spin_unlock(&dlm->spinlock);
+
+			if (status) {
+				dlmprintk("Error return %d asserting join on "
+					  "node %d\n", status, node);
+
+				/* give us some time betweek errors... */
+				if (live)
+					schedule();
+			}
+		} while (status && live);
+	}
+}
+
+struct domain_join_ctxt {
+	unsigned long live_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	unsigned long yes_resp_map[BITS_TO_LONGS(NM_MAX_NODES)];
+};
+
+static int dlm_should_restart_join(dlm_ctxt *dlm,
+				   struct domain_join_ctxt *ctxt,
+				   enum dlm_query_join_response response)
+{
+	int ret;
+
+	if (response == JOIN_DISALLOW) {
+		dlmprintk("Latest response of disallow -- should restart\n");
+		return 1;
+	}
+
+	spin_lock(&dlm->spinlock);
+	/* For now, we restart the process if the node maps have
+	 * changed at all */
+	ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
+		     sizeof(dlm->live_nodes_map));
+	spin_unlock(&dlm->spinlock);
+
+	if (ret)
+		dlmprintk("Node maps changed -- should restart\n");
+
+	return ret;
+}
+
+static int dlm_try_to_join_domain(dlm_ctxt *dlm)
+{
+	int status = 0, tmpstat, node;
+	struct domain_join_ctxt *ctxt;
+	enum dlm_query_join_response response;
+
+	dlmprintk0("\n");
+
+	ctxt = kmalloc(sizeof(struct domain_join_ctxt), GFP_KERNEL);
+	if (!ctxt) {
+		dlmprintk("No memory for domain_join_ctxt\n");
+		status = -ENOMEM;
+		goto bail;
+	}
+	memset(ctxt, 0, sizeof(*ctxt));
+
+	/* group sem locking should work for us here -- we're already
+	 * registered for heartbeat events so filling this should be
+	 * atomic wrt getting those handlers called. */
+	hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
+
+	spin_lock(&dlm->spinlock);
+	memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
+
+	__dlm_set_joining_node(dlm, dlm->node_num);
+
+	spin_unlock(&dlm->spinlock);
+
+	node = -1;
+	while ((node = find_next_bit(ctxt->live_map, NM_MAX_NODES, node + 1))
+	       != -1) {
+		if (node >= NM_MAX_NODES)
+			break;
+
+		if (node == dlm->node_num)
+			continue;
+
+		status = dlm_request_join(dlm, node, &response);
+		if (status < 0) {
+			dlmprintk("%d return from request_join!\n", status);
+			goto bail;
+		}
+
+		/* Ok, either we got a response or the node doesn't have a
+		 * dlm up. */
+		if (response == JOIN_OK)
+			set_bit(node, ctxt->yes_resp_map);
+
+		if (dlm_should_restart_join(dlm, ctxt, response)) {
+			status = -EAGAIN;
+			goto bail;
+		}
+	}
+
+	dlmprintk("Yay, done querying nodes!\n");
+
+	/* Yay, everyone agree's we can join the domain. My domain is
+	 * comprised of all nodes who were put in the
+	 * yes_resp_map. Copy that into our domain map and send a join
+	 * assert message to clean up everyone elses state. */
+	spin_lock(&dlm->spinlock);
+	memcpy(dlm->domain_map, ctxt->yes_resp_map,
+	       sizeof(ctxt->yes_resp_map));
+	set_bit(dlm->node_num, dlm->domain_map);
+	spin_unlock(&dlm->spinlock);
+
+	dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
+
+	spin_lock(&dlm->spinlock);
+	__dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
+	__dlm_print_nodes(dlm);
+	spin_unlock(&dlm->spinlock);
+
+bail:
+	if (ctxt) {
+		/* Do we need to send a cancel message to any nodes? */
+		if (status < 0) {
+			tmpstat = dlm_send_join_cancels(dlm,
+							ctxt->yes_resp_map,
+							sizeof(ctxt->yes_resp_map));
+			if (tmpstat < 0)
+				dlmprintk("%d return cancelling join!\n",
+					  tmpstat);
+		}
+		kfree(ctxt);
+	}
+
+	dlmprintk("returning %d\n", status);
+	return status;
+}
+
+static void dlm_unregister_domain_handlers(dlm_ctxt *dlm)
+{
+	net_unregister_handler_list(&dlm->dlm_domain_handlers);
+}
+
+static int dlm_register_domain_handlers(dlm_ctxt *dlm)
+{
+	int status;
+
+	dlmprintk("registering handlers.\n");
+
+	hb_setup_callback(&dlm->dlm_hb_down, HB_NODE_DOWN_CB,
+			  dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
+	status = hb_register_callback(&dlm->dlm_hb_down);
+	if (status)
+		goto bail;
+
+	hb_setup_callback(&dlm->dlm_hb_up, HB_NODE_UP_CB, dlm_hb_node_up_cb,
+			  dlm, DLM_HB_NODE_UP_PRI);
+	status = hb_register_callback(&dlm->dlm_hb_up);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
+				      sizeof(dlm_master_request), 
+				      dlm_master_request_handler,
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
+				      sizeof(dlm_assert_master), 
+				      dlm_assert_master_handler,
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
+				      sizeof(dlm_create_lock), 
+				      dlm_create_lock_handler,
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key, 
+				      DLM_CONVERT_LOCK_MAX_LEN,
+				      dlm_convert_lock_handler,
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key, 
+				      DLM_UNLOCK_LOCK_MAX_LEN,
+				      dlm_unlock_lock_handler,
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_PROXY_AST_MSG, dlm->key, 
+				      DLM_PROXY_AST_MAX_LEN,
+				      dlm_proxy_ast_handler,
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
+				      sizeof(dlm_exit_domain),
+				      dlm_exit_domain_handler,
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
+				      sizeof(dlm_migrate_request), 
+				      dlm_migrate_request_handler, 
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key, 
+				      DLM_MIG_LOCKRES_MAX_LEN, 
+				      dlm_mig_lockres_handler, 
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
+				      sizeof(dlm_master_requery), 
+				      dlm_master_requery_handler, 
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
+				      sizeof(dlm_lock_request), 
+				      dlm_request_all_locks_handler, 
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
+				      sizeof(dlm_reco_data_done), 
+				      dlm_reco_data_done_handler, 
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
+				      sizeof(dlm_begin_reco), 
+				      dlm_begin_reco_handler, 
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
+				      sizeof(dlm_finalize_reco), 
+				      dlm_finalize_reco_handler, 
+				      dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+
+bail:
+	if (status)
+		dlm_unregister_domain_handlers(dlm);
+
+	return status;
+}
+
+static int dlm_join_domain(dlm_ctxt *dlm)
+{
+	int status;
+
+	BUG_ON(!dlm);
+
+	dlmprintk("Join domain %s\n", dlm->name);
+
+	status = dlm_register_domain_handlers(dlm);
+	if (status) {
+		dlmprintk("Error %d registering handlers!\n", status);
+		goto bail;
+	}
+
+
+
+	status = dlm_launch_thread(dlm);
+	if (status < 0) {
+		dlmprintk("could not launch dlm thread!\n");
+		goto bail;
+	}
+
+
+
+	do {
+		status = dlm_try_to_join_domain(dlm);
+
+		/* If we're racing another node to the join, then we
+		 * need to back off temporarily and let them
+		 * complete. */
+		if (status == -EAGAIN) {
+			schedule();
+
+			if (signal_pending(current)) {
+				status = -EINTR;
+				goto bail;
+			}
+		}
+	} while (status == -EAGAIN);
+
+	if (status < 0) {
+		dlmprintk("Joining broke! %d\n", status);
+		goto bail;
+	}
+
+	status = dlm_launch_recovery_thread(dlm);
+	if (status < 0) {
+		dlmprintk("could not launch dlm recovery thread!\n");
+		goto bail;
+	}
+
+
+	spin_lock(&dlm_domain_lock);
+	dlm->num_joins++;
+	dlm->dlm_state = DLM_CTXT_JOINED;
+	spin_unlock(&dlm_domain_lock);
+
+	status = 0;
+bail:
+	wake_up(&dlm_domain_events);
+
+	return status;
+}
+
+static dlm_ctxt *dlm_alloc_ctxt(const char *domain,
+				u32 key)
+{
+	int i;
+	dlm_ctxt *dlm = NULL;
+
+	dlm = kmalloc(sizeof(dlm_ctxt), GFP_KERNEL);
+	if (!dlm) {
+		dlmprintk0("could not allocate dlm_ctxt\n");
+		goto leave;
+	}
+	memset(dlm, 0, sizeof(dlm_ctxt));
+
+	dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
+	if (dlm->name == NULL) {
+		dlmprintk0("could not allocate dlm domain name\n");
+		kfree(dlm);
+		dlm = NULL;
+		goto leave;
+	}
+
+	dlm->resources = (struct list_head *) __get_free_page(GFP_KERNEL);
+	if (!dlm->resources) {
+		dlmprintk0("could not allocate dlm hash\n");
+		kfree(dlm->name);
+		kfree(dlm);
+		dlm = NULL;
+		goto leave;
+	}
+	memset(dlm->resources, 0, PAGE_SIZE);
+
+	for (i=0; i<DLM_HASH_SIZE; i++)
+		INIT_LIST_HEAD(&dlm->resources[i]);
+
+	strcpy(dlm->name, domain);
+	dlm->key = key;
+	dlm->node_num = nm_this_node();
+
+	spin_lock_init(&dlm->spinlock);
+	spin_lock_init(&dlm->master_lock);
+	INIT_LIST_HEAD(&dlm->list);
+	INIT_LIST_HEAD(&dlm->dirty_list);
+	INIT_LIST_HEAD(&dlm->reco.resources);
+	INIT_LIST_HEAD(&dlm->reco.received);
+	INIT_LIST_HEAD(&dlm->reco.node_data);
+	INIT_LIST_HEAD(&dlm->purge_list);
+	INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
+
+	INIT_LIST_HEAD(&dlm->pending_asts);
+	INIT_LIST_HEAD(&dlm->pending_basts);
+
+	dlmprintk("dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
+		  dlm->recovery_map, &(dlm->recovery_map[0]));
+
+	memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
+	memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
+	memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
+
+	dlm->dlm_thread_task = NULL;
+	dlm->dlm_reco_thread_task = NULL;
+	init_waitqueue_head(&dlm->dlm_thread_wq);
+	init_waitqueue_head(&dlm->dlm_reco_thread_wq);
+	INIT_LIST_HEAD(&dlm->master_list);
+	INIT_LIST_HEAD(&dlm->mle_hb_events);
+	init_rwsem(&dlm->recovery_sem);
+
+	dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
+	init_waitqueue_head(&dlm->dlm_join_events);
+
+	dlm->reco.new_master = NM_INVALID_SLOT_NUM;
+	dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
+	atomic_set(&dlm->local_resources, 0);
+	atomic_set(&dlm->remote_resources, 0);
+	atomic_set(&dlm->unknown_resources, 0);
+
+	spin_lock_init(&dlm->work_lock);
+	INIT_LIST_HEAD(&dlm->work_list);
+	INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work, dlm);
+
+	kref_init(&dlm->dlm_refs, dlm_ctxt_release);
+	dlm->dlm_state = DLM_CTXT_NEW;
+
+	dlmprintk("context init: refcount %u\n",
+		  atomic_read(&dlm->dlm_refs.refcount));
+
+leave:
+	return dlm;
+}
+
+/*
+ * dlm_register_domain: one-time setup per "domain"
+ */
+dlm_ctxt * dlm_register_domain(const char *domain,
+			       u32 key)
+{
+	int ret;
+	dlm_ctxt *dlm = NULL;
+	dlm_ctxt *new_ctxt = NULL;
+
+	if (strlen(domain) > NM_MAX_NAME_LEN) {
+		dlmprintk0("domain name length too long\n");
+		goto leave;
+	}
+
+	if (!hb_check_local_node_heartbeating()) {
+		dlmprintk0("the local node has not ben configured, or is not "
+			   "heartbeating\n");
+		goto leave;
+	}
+
+	dlmprintk("register called for domain \"%s\"\n", domain);
+
+retry:
+	dlm = NULL;
+	if (signal_pending(current))
+		goto leave;
+
+	spin_lock(&dlm_domain_lock);
+
+	dlm = __dlm_lookup_domain(domain);
+	if (dlm) {
+		if (dlm->dlm_state != DLM_CTXT_JOINED) {
+			spin_unlock(&dlm_domain_lock);
+
+			dlmprintk("This ctxt is not joined yet!\n");
+			wait_event_interruptible(dlm_domain_events,
+						 dlm_wait_on_domain_helper(
+							 domain));
+			goto retry;
+		}
+
+		__dlm_get(dlm);
+		dlm->num_joins++;
+
+		spin_unlock(&dlm_domain_lock);
+		goto leave;
+	}
+
+	/* doesn't exist */
+	if (!new_ctxt) {
+		spin_unlock(&dlm_domain_lock);
+
+		new_ctxt = dlm_alloc_ctxt(domain, key);
+		if (new_ctxt)
+			goto retry;
+		goto leave;
+	}
+
+	/* a little variable switch-a-roo here... */
+	dlm = new_ctxt;
+	new_ctxt = NULL;
+
+	/* add the new domain */
+	list_add_tail(&dlm->list, &dlm_domains);
+	spin_unlock(&dlm_domain_lock);
+
+	ret = dlm_join_domain(dlm);
+	if (ret) {
+		dlmprintk("return code %d from join_domain!\n", ret);
+		dlm_put(dlm);
+		dlm = NULL;
+	}
+
+leave:
+	if (new_ctxt)
+		dlm_free_ctxt_mem(new_ctxt);
+
+	return dlm;
+}
+EXPORT_SYMBOL(dlm_register_domain);
+
+
+static LIST_HEAD(dlm_join_handlers);
+
+static void dlm_unregister_net_handlers(void)
+{
+	net_unregister_handler_list(&dlm_join_handlers);
+}
+
+static int dlm_register_net_handlers(void)
+{
+	int status = 0;
+
+	status = net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
+				      sizeof(dlm_query_join_request),
+				      dlm_query_join_handler,
+				      NULL, &dlm_join_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
+				      sizeof(dlm_assert_joined),
+				      dlm_assert_joined_handler,
+				      NULL, &dlm_join_handlers);
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
+				      sizeof(dlm_cancel_join),
+				      dlm_cancel_join_handler,
+				      NULL, &dlm_join_handlers);
+
+bail:
+	if (status < 0)
+		dlm_unregister_net_handlers();
+
+	return status;
+}
+
+static int __init dlm_init(void)
+{
+	int status;
+
+	dlmprintk0("Loaded dlm module\n");
+
+	status = dlm_register_net_handlers();
+	if (status)
+		return -1;
+
+	dlm_create_dlm_debug_proc_entry();
+
+	return 0;
+}
+
+static void __exit dlm_exit (void)
+{
+	dlm_unregister_net_handlers();
+	dlmprintk0("Unloaded dlm module\n");
+}				/* dlm_driver_exit */
+
+MODULE_LICENSE ("GPL");
+MODULE_AUTHOR("Oracle");
+
+module_init (dlm_init);
+module_exit (dlm_exit);

Added: trunk/fs/ocfs2/dlm/dlmdomain.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmdomain.h	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmdomain.h	2005-03-23 01:21:05 UTC (rev 2038)
@@ -0,0 +1,33 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * dlmdomain.h
+ *
+ * Copyright (C) 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ *
+ */
+
+#ifndef DLMDOMAIN_H
+#define DLMDOMAIN_H
+
+extern spinlock_t dlm_domain_lock;
+extern struct list_head dlm_domains;
+
+dlm_ctxt * __dlm_lookup_domain(const char *domain);
+
+#endif

Modified: trunk/fs/ocfs2/dlm/dlmfs.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmfs.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmfs.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -43,8 +43,7 @@
 #include "cluster/heartbeat.h"
 #include "cluster/tcp.h"
 
-#include "dlmcommon.h"
-#include "dlmmod.h"
+#include "dlmapi.h"
 
 #include "dlmfs_compat.h"
 #include "userdlm.h"

Modified: trunk/fs/ocfs2/dlm/dlmfs_compat.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmfs_compat.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmfs_compat.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -33,8 +33,7 @@
 #include "cluster/nodemanager.h"
 #include "cluster/tcp.h"
 
-#include "dlmcommon.h"
-#include "dlmmod.h"
+#include "dlmapi.h"
 
 #include "dlmfs_compat.h"
 #include "userdlm.h"

Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmlock.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -44,9 +44,12 @@
 #include "cluster/nodemanager.h"
 #include "cluster/tcp.h"
 
+#include "dlmapi.h"
 #include "dlmcommon.h"
-#include "dlmmod.h"
 
+static spinlock_t dlm_cookie_lock = SPIN_LOCK_UNLOCKED;
+static u64 dlm_next_cookie = 1;
+
 static dlm_status dlm_send_remote_lock_request(dlm_ctxt *dlm, 
 					       dlm_lock_resource *res, 
 					       dlm_lock *lock, int flags);
@@ -341,3 +344,195 @@
 
 	return status;
 }
+
+/* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
+static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
+{
+	u64 tmpnode = node_num;
+
+	/* shift single byte of node num into top 8 bits */
+	tmpnode <<= 56;
+
+	spin_lock(&dlm_cookie_lock);
+	*cookie = (dlm_next_cookie | tmpnode);
+	if (++dlm_next_cookie & 0xff00000000000000ull) {
+		dlmprintk0("eek! this node's cookie will now wrap!\n");
+		dlm_next_cookie = 1;
+	}
+	spin_unlock(&dlm_cookie_lock);
+}
+
+dlm_status dlmlock(dlm_ctxt *dlm, int mode, dlm_lockstatus *lksb, int flags, 
+		   const char *name, dlm_astlockfunc_t *ast, void *data, 
+		   dlm_bastlockfunc_t *bast)
+{
+	dlm_status status;
+	dlm_lock_resource *res = NULL;
+	dlm_lock *lock = NULL;
+	int convert = 0, recovery = 0;
+
+	if (!lksb)
+		return DLM_BADARGS;
+
+	status = DLM_BADPARAM;
+	if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE)
+		goto error;
+
+	if (flags & ~LKM_VALID_FLAGS)
+		goto error;
+
+	convert = (flags & LKM_CONVERT);
+	recovery = (flags & LKM_RECOVERY);
+
+	if (recovery && (!dlm_is_recovery_lock(name, strlen(name)) ||
+		 convert) ) {
+		goto error;
+	}
+	if (convert && (flags & LKM_LOCAL)) {
+		dlmprintk0("strange LOCAL convert request!\n");
+		goto error;
+	}
+
+	if (convert) {
+		/* CONVERT request */
+
+		/* if converting, must pass in a valid dlm_lock */
+		if (!lksb->lockid || !lksb->lockid->lockres)
+			goto error;
+		lock = lksb->lockid;
+
+		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are 
+	 	 * static after the original lock call.  convert requests will 
+		 * ensure that everything is the same, or return DLM_BADARGS.
+	 	 * this means that DLM_DENIED_NOASTS will never be returned.
+	 	 */
+		if (lock->lksb != lksb || lock->ast != ast ||
+		    lock->bast != bast || lock->astdata != data) {
+			status = DLM_BADARGS;
+			dlmprintk("ERROR new args:  lksb=%p, ast=%p, bast=%p, "
+				  "astdata=%p\n", lksb, ast, bast, data);
+			dlmprintk("      orig args: lksb=%p, ast=%p, bast=%p, "
+				  "astdata=%p\n", lock->lksb, lock->ast, 
+				  lock->bast, lock->astdata);
+			goto error;
+		}
+		res = dlm_lockres_grab(dlm, lock->lockres);
+retry_convert:
+		down_read(&dlm->recovery_sem);
+
+		if (res->owner == dlm->node_num)
+			status = dlmconvert_master(dlm, res, lock, flags, mode);
+		else 
+			status = dlmconvert_remote(dlm, res, lock, flags, mode);
+		if (status == DLM_RECOVERING || status == DLM_MIGRATING) {
+			/* for now, see how this works without sleeping
+			 * and just retry right away.  I suspect the reco
+			 * or migration will complete fast enough that
+			 * no waiting will be necessary */
+			dlmprintk0("retrying convert with migration or "
+				   "recovery in progress\n");
+			up_read(&dlm->recovery_sem);
+			goto retry_convert;
+		}
+	} else {
+		/* LOCK request */
+		status = DLM_BADARGS;
+		if (!name)
+			goto error;
+
+		status = DLM_IVBUFLEN;
+		if (strlen(name) > DLM_LOCKID_NAME_MAX)
+			goto error;
+
+		lock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);  /* dlm_lock */
+		if (!lock)
+			goto error;
+
+		lksb->lockid = lock;
+
+		if (!recovery)
+			down_read(&dlm->recovery_sem);
+
+		/* find or create the lock resource */
+		res = dlm_get_lock_resource(dlm, name, flags);
+		if (!res) {
+			status = DLM_IVLOCKID;
+			goto up_error;
+		}
+
+		dlmprintk("type=%d, flags = 0x%x\n", mode, flags);
+		dlmprintk("creating lock: lock=%p res=%p\n", lock, res);
+
+#warning move this into dlm_init_lock
+		memset(lock, 0, sizeof(dlm_lock));
+		INIT_LIST_HEAD(&lock->list);
+		INIT_LIST_HEAD(&lock->ast_list);
+		INIT_LIST_HEAD(&lock->bast_list);
+		spin_lock_init(&lock->spinlock);
+		lock->lockres = res;
+		lock->ml.type = mode;
+		lock->ml.convert_type = LKM_IVMODE;
+		lock->ml.highest_blocked = LKM_IVMODE;
+		lock->ml.node = dlm->node_num;
+		lock->ast = ast;
+		lock->bast = bast;
+		lock->astdata = data;
+		lock->lksb = lksb;
+		lock->ast_pending = 0;
+		lock->bast_pending = 0;
+
+		dlm_get_next_cookie(lock->ml.node, &lock->ml.cookie);
+
+retry_lock:
+		if (flags & LKM_VALBLK) {
+			dlmprintk("LKM_VALBLK passed by caller\n");
+
+			/* LVB requests for non PR, PW or EX locks are
+			 * ignored. */
+			if (mode < LKM_PRMODE)
+				flags &= ~LKM_VALBLK;
+			else {
+				flags |= LKM_GET_LVB;
+				lock->lksb->flags |= DLM_LKSB_GET_LVB;
+			}
+		}
+
+		if (res->owner == dlm->node_num)
+			status = dlmlock_master(dlm, res, lock, flags);
+		else 
+			status = dlmlock_remote(dlm, res, lock, flags);
+
+		if (status == DLM_RECOVERING || status == DLM_MIGRATING) {
+			dlmprintk0("retrying lock with migration or "
+				   "recovery in progress\n");
+			up_read(&dlm->recovery_sem);
+			down_read(&dlm->recovery_sem);
+			goto retry_lock;
+		}
+
+		if (status != DLM_NORMAL) {
+			lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
+			goto up_error;
+		}
+	}
+
+up_error:
+	if (!recovery)
+		up_read(&dlm->recovery_sem);
+
+error:
+	if (status != DLM_NORMAL) {
+		if (lock && !convert) {
+			kfree(lock);
+			lksb->lockid = NULL;
+		}
+		// this is kind of unnecessary
+		lksb->status = status;
+	}
+
+	if (res)
+		dlm_lockres_put(dlm, res);
+
+	return status;
+}
+EXPORT_SYMBOL(dlmlock);

Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -44,8 +44,8 @@
 #include "cluster/nodemanager.h"
 #include "cluster/tcp.h"
 
+#include "dlmapi.h"
 #include "dlmcommon.h"
-#include "dlmmod.h"
 
 
 #ifdef DLM_MLE_DEBUG

Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -45,8 +45,8 @@
 #include "cluster/nodemanager.h"
 #include "cluster/tcp.h"
 
+#include "dlmapi.h"
 #include "dlmcommon.h"
-#include "dlmmod.h"
 
 static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node);
 
@@ -119,9 +119,41 @@
 	spin_unlock(&dlm->spinlock);
 }
 
+/* Worker function used during recovery. */
+void dlm_dispatch_work(void *data)
+{
+	dlm_ctxt *dlm = (dlm_ctxt *)data;
+	LIST_HEAD(tmp_list);
+	struct list_head *iter, *iter2;
+	dlm_work_item *item;
+	dlm_workfunc_t *workfunc;
 
+	DLM_ASSERT(dlm);
 
+	spin_lock(&dlm->work_lock);
+	list_splice_init(&dlm->work_list, &tmp_list);
+	spin_unlock(&dlm->work_lock);
 
+	list_for_each_safe(iter, iter2, &tmp_list) {
+		item = list_entry(iter, dlm_work_item, list);
+		DLM_ASSERT(item);
+		workfunc = item->func;
+		list_del_init(&item->list);
+
+		/* already have ref on dlm to avoid having
+		 * it disappear.  just double-check. */
+		DLM_ASSERT(item->dlm == dlm);
+		DLM_ASSERT(workfunc);
+	
+		/* this is allowed to sleep and
+		 * call network stuff */
+		workfunc(item, item->data);
+
+		dlm_put(dlm);
+		kfree(item);
+	}
+}
+
 /*
  * RECOVERY THREAD
  */

Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmthread.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -46,8 +46,8 @@
 #include "cluster/nodemanager.h"
 #include "cluster/tcp.h"
 
+#include "dlmapi.h"
 #include "dlmcommon.h"
-#include "dlmmod.h"
 
 extern spinlock_t dlm_domain_lock;
 extern struct list_head dlm_domains;
@@ -57,6 +57,26 @@
 
 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
 
+/* will exit holding res->spinlock, but may drop in function */
+void __dlm_wait_on_lockres_flags(dlm_lock_resource *res, int flags)
+{
+	DECLARE_WAITQUEUE(wait, current);
+
+	assert_spin_locked(&res->spinlock);
+
+	add_wait_queue(&res->wq, &wait);
+repeat:
+	set_current_state(TASK_UNINTERRUPTIBLE);
+	if (res->state & flags) {
+		spin_unlock(&res->spinlock);
+		schedule();
+		spin_lock(&res->spinlock);
+		goto repeat;
+	}
+	remove_wait_queue(&res->wq, &wait);
+	current->state = TASK_RUNNING;
+}
+
 int __dlm_lockres_unused(dlm_lock_resource *res)
 {
 	if (list_empty(&res->granted) &&

Modified: trunk/fs/ocfs2/dlm/dlmunlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmunlock.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/dlmunlock.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -44,8 +44,8 @@
 #include "cluster/nodemanager.h"
 #include "cluster/tcp.h"
 
+#include "dlmapi.h"
 #include "dlmcommon.h"
-#include "dlmmod.h"
 
 
 #define DLM_UNLOCK_FREE_LOCK    0x00000001
@@ -423,3 +423,76 @@
 	return status;
 }
 
+/* there seems to be no point in doing this async
+ * since (even for the remote case) there is really
+ * no work to queue up... so just do it and fire the
+ * unlockast by hand when done... */
+dlm_status dlmunlock(dlm_ctxt *dlm, dlm_lockstatus *lksb, int flags, 
+		     dlm_astunlockfunc_t *unlockast, void *data)
+{
+	dlm_status status;
+	dlm_lock_resource *res;
+	dlm_lock *lock = NULL;
+	int call_ast = 0;
+
+	dlmprintk0("\n");
+
+	if (!lksb)
+		return DLM_BADARGS;
+
+	if (flags & ~(LKM_CANCEL | LKM_VALBLK | LKM_INVVALBLK))
+		return DLM_BADPARAM;
+
+	if ((flags & (LKM_VALBLK | LKM_CANCEL)) == (LKM_VALBLK | LKM_CANCEL)) {
+		dlmprintk0("VALBLK given with CANCEL: ignoring VALBLK\n");
+		flags &= ~LKM_VALBLK;
+	}
+
+	if (!lksb->lockid || !lksb->lockid->lockres)
+		return DLM_BADPARAM;
+
+	lock = lksb->lockid;
+	res = dlm_lockres_grab(dlm, lock->lockres);
+
+	DLM_ASSERT(lock);
+	DLM_ASSERT(res);
+retry:
+	dlmprintk("lock=%p res=%p\n", lock, res);
+
+	if (res->owner == dlm->node_num) {
+		status = dlmunlock_master(dlm, res, lock, lksb, flags, 
+					  &call_ast);
+		dlmprintk("done calling dlmunlock_master: returned %d, "
+			  "call_ast is %d\n", status, call_ast);
+	} else {
+		status = dlmunlock_remote(dlm, res, lock, lksb, flags, 
+					  &call_ast);
+		dlmprintk("done calling dlmunlock_remote: returned %d, "
+			  "call_ast is %d\n", status, call_ast);
+	}
+
+	if (status == DLM_RECOVERING ||
+	    status == DLM_MIGRATING) {
+		dlmprintk0("retrying unlock due to pending recovery "
+			   "or migration\n");
+		goto retry;
+	}
+	if (call_ast) {
+		dlmprintk("calling unlockast(%p, %d)\n",
+			  data, lksb->status);
+		(*unlockast)(data, lksb->status);
+	}
+
+	if (status == DLM_NORMAL) {
+		dlmprintk("kicking the thread\n");
+		dlm_kick_thread(dlm, res);
+	}
+
+	dlm_lockres_calc_usage(dlm, res);
+	dlm_lockres_put(dlm, res);
+
+	dlmprintk("returning status=%d!\n", status);
+	return status;
+}
+EXPORT_SYMBOL(dlmunlock);
+

Modified: trunk/fs/ocfs2/dlm/userdlm.c
===================================================================
--- trunk/fs/ocfs2/dlm/userdlm.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlm/userdlm.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -40,8 +40,7 @@
 #include "cluster/heartbeat.h"
 #include "cluster/tcp.h"
 
-#include "dlmcommon.h"
-#include "dlmmod.h"
+#include "dlmapi.h"
 
 #include "userdlm.h"
 

Modified: trunk/fs/ocfs2/dlmglue.c
===================================================================
--- trunk/fs/ocfs2/dlmglue.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/dlmglue.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -36,9 +36,9 @@
 #include <cluster/heartbeat.h>
 #include <cluster/nodemanager.h>
 #include <cluster/tcp.h>
-#include <dlm/dlmcommon.h>
-#include <dlm/dlmmod.h>
 
+#include <dlm/dlmapi.h>
+
 #include "ocfs_log.h"
 #include "ocfs.h"
 #include "ocfs2.h"

Modified: trunk/fs/ocfs2/heartbeat.c
===================================================================
--- trunk/fs/ocfs2/heartbeat.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/heartbeat.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -34,7 +34,6 @@
 
 #include <cluster/heartbeat.h>
 #include <cluster/nodemanager.h>
-#include <dlm/dlmcommon.h>
 
 #include "ocfs_log.h"
 #include "ocfs.h"

Modified: trunk/fs/ocfs2/ocfs.h
===================================================================
--- trunk/fs/ocfs2/ocfs.h	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/ocfs.h	2005-03-23 01:21:05 UTC (rev 2038)
@@ -45,9 +45,9 @@
 #include "cluster/nodemanager.h"
 #include "cluster/heartbeat.h"
 #include "cluster/tcp.h"
-#include "dlm/dlmcommon.h"
-#include "dlm/dlmmod.h"
 
+#include "dlm/dlmapi.h"
+
 /* convenience macro */
 
 #define OCFS_ASSERT(x)             do { if (!(x)) BUG(); } while (0)

Modified: trunk/fs/ocfs2/super.c
===================================================================
--- trunk/fs/ocfs2/super.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/super.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -43,7 +43,6 @@
 #include <linux/inet.h>
 
 #include <cluster/nodemanager.h>
-#include <dlm/dlmcommon.h>
 
 #include "ocfs_log.h"
 #include "ocfs.h"

Modified: trunk/fs/ocfs2/vote.c
===================================================================
--- trunk/fs/ocfs2/vote.c	2005-03-22 23:03:52 UTC (rev 2037)
+++ trunk/fs/ocfs2/vote.c	2005-03-23 01:21:05 UTC (rev 2038)
@@ -34,9 +34,9 @@
 #include <cluster/heartbeat.h>
 #include <cluster/nodemanager.h>
 #include <cluster/tcp.h>
-#include <dlm/dlmcommon.h>
-#include <dlm/dlmmod.h>
 
+#include <dlm/dlmapi.h>
+
 #include "ocfs_log.h"
 #include "ocfs.h"
 #include "ocfs2.h"



More information about the Ocfs2-commits mailing list