[Ocfs2-commits] mfasheh commits r1655 - branches/dlm-glue/src

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Fri Nov 19 17:44:30 CST 2004


Author: mfasheh
Date: 2004-11-19 17:44:28 -0600 (Fri, 19 Nov 2004)
New Revision: 1655

Added:
   branches/dlm-glue/src/slot_map.c
   branches/dlm-glue/src/slot_map.h
Modified:
   branches/dlm-glue/src/Makefile
   branches/dlm-glue/src/alloc.c
   branches/dlm-glue/src/dlmglue.c
   branches/dlm-glue/src/dlmglue.h
   branches/dlm-glue/src/heartbeat.c
   branches/dlm-glue/src/heartbeat.h
   branches/dlm-glue/src/inode.c
   branches/dlm-glue/src/journal.c
   branches/dlm-glue/src/localalloc.c
   branches/dlm-glue/src/namei.c
   branches/dlm-glue/src/ocfs.h
   branches/dlm-glue/src/ocfs2_fs.h
   branches/dlm-glue/src/ocfs_journal.h
   branches/dlm-glue/src/ocfs_log.h
   branches/dlm-glue/src/proc.c
   branches/dlm-glue/src/suballoc.c
   branches/dlm-glue/src/super.c
   branches/dlm-glue/src/sysfile.c
   branches/dlm-glue/src/sysfile.h
Log:
* too many changes to list here. highlights:
  - dlm-glue stuff is even more abstracted out. it could still use more
    though..
  - we now have a cluster wide super block lock
  - we now use a disk slot map to figure out who's mounted and translate
    between global node numbers and disk slots.
  - first cut at getting cluster recovery going with this new stuff.    
  - lots more...



Modified: branches/dlm-glue/src/Makefile
===================================================================
--- branches/dlm-glue/src/Makefile	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/Makefile	2004-11-19 23:44:28 UTC (rev 1655)
@@ -79,6 +79,7 @@
 	localalloc.c		\
 	namei.c			\
 	proc.c			\
+	slot_map.c		\
 	suballoc.c		\
 	super.c			\
 	symlink.c		\
@@ -107,6 +108,7 @@
 	localalloc.h		\
 	namei.h			\
 	proc.h			\
+	slot_map.h		\
 	suballoc.h		\
 	super.h			\
 	symlink.h		\

Modified: branches/dlm-glue/src/alloc.c
===================================================================
--- branches/dlm-glue/src/alloc.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/alloc.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -209,7 +209,7 @@
 			/* we always use node zeros suballocator */
 			eb->h_suballoc_node = 0;
 #else
-			eb->h_suballoc_node = osb->node_num;
+			eb->h_suballoc_node = osb->slot_num;
 #endif
 			eb->h_suballoc_bit = suballoc_bit_start;
 			eb->h_list.l_count = ocfs2_extent_recs_per_eb(osb->sb);

Modified: branches/dlm-glue/src/dlmglue.c
===================================================================
--- branches/dlm-glue/src/dlmglue.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/dlmglue.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -32,6 +32,7 @@
 
 #include <dlmutil.h>
 #include <dlmcommon.h>
+#include <dlmhb.h>
 #include <dlmnm.h>
 #include <dlmnet.h>
 #include <dlmmod.h>
@@ -45,6 +46,7 @@
 #include "extent_map.h"
 #include "heartbeat.h"
 #include "inode.h"
+#include "slot_map.h"
 #include "util.h"
 
 #include "ocfs_journal.h"
@@ -52,6 +54,7 @@
 
 #define OCFS_DEBUG_CONTEXT    OCFS_DEBUG_CONTEXT_DLMGLUE
 
+#if 0
 #warning REMOVE THESE STUBS
 
 dlm_status dlmlock(dlm_ctxt *dlm,
@@ -88,6 +91,16 @@
 	kfree(dlm);
 }
 
+int hb_register_callback(int type, hb_cb_func *func, void *data, int priority)
+{
+	return 0;
+}
+int hb_unregister_callback(int type, hb_cb_func *func, void *data)
+{
+	return 0;
+}
+#endif
+
 /* lock ids are made up in the following manner:
  * name[0]     --> type
  * name[1-6]   --> 6 pad characters, reserved for now
@@ -99,7 +112,8 @@
 
 static char ocfs2_lock_type_char[OCFS_NUM_LOCK_TYPES] = {
 	[OCFS_TYPE_META]	'M',
-	[OCFS_TYPE_DATA] 	'D'
+	[OCFS_TYPE_DATA] 	'D',
+	[OCFS_TYPE_SUPER]       'S'
 };
 
 static int ocfs2_build_lock_name(enum ocfs2_lock_type type,
@@ -107,18 +121,49 @@
 				 u32 generation,
 				 char **ret);
 
-static void ocfs2_ast_func(void *opaque);
-/* so far, all locks have gotten along with the same BAST. */
-static void ocfs2_bast_func(void *opaque, int level);
+static void ocfs2_inode_ast_func(void *opaque);
+static void ocfs2_inode_bast_func(void *opaque, int level);
+static void ocfs2_super_ast_func(void *opaque);
+static void ocfs2_super_bast_func(void *opaque, int level);
+/* so far, all locks have gotten along with the same unlock ast */
+static void ocfs2_unlock_ast_func(void *opaque, dlm_status status);
 
 static dlm_astlockfunc_t *ocfs2_lock_type_asts[OCFS_NUM_LOCK_TYPES] = {
-	[OCFS_TYPE_META]	ocfs2_ast_func,
-	[OCFS_TYPE_DATA] 	ocfs2_ast_func
+	[OCFS_TYPE_META]	ocfs2_inode_ast_func,
+	[OCFS_TYPE_DATA] 	ocfs2_inode_ast_func,
+	[OCFS_TYPE_SUPER]	ocfs2_super_ast_func
 };
 static dlm_bastlockfunc_t *ocfs2_lock_type_basts[OCFS_NUM_LOCK_TYPES] = {
-	[OCFS_TYPE_META]	ocfs2_bast_func,
-	[OCFS_TYPE_DATA] 	ocfs2_bast_func
+	[OCFS_TYPE_META]	ocfs2_inode_bast_func,
+	[OCFS_TYPE_DATA] 	ocfs2_inode_bast_func,
+	[OCFS_TYPE_SUPER] 	ocfs2_super_bast_func
 };
+
+static inline int ocfs2_is_inode_lock(ocfs2_lock_res *lockres)
+{
+	return lockres->l_type == OCFS_TYPE_META || 
+		lockres->l_type == OCFS_TYPE_DATA;
+}
+
+static inline int ocfs2_is_super_lock(ocfs2_lock_res *lockres)
+{
+	return lockres->l_type == OCFS_TYPE_SUPER;
+}
+
+static inline ocfs_super * ocfs2_lock_res_super(ocfs2_lock_res *lockres)
+{
+	OCFS_ASSERT(ocfs2_is_super_lock(lockres));
+
+	return (ocfs_super *) lockres->l_priv;
+}
+
+static inline struct inode * ocfs2_lock_res_inode(ocfs2_lock_res *lockres)
+{
+	OCFS_ASSERT(ocfs2_is_inode_lock(lockres));
+
+	return (struct inode *) lockres->l_priv;
+}
+
 static int ocfs2_lock_create(ocfs_super *osb,
 			     ocfs2_lock_res *lockres,
 			     int level,
@@ -127,11 +172,15 @@
 						     int wanted);
 static int ocfs2_cluster_lock(ocfs_super *osb,
 			      ocfs2_lock_res *lockres,
-			      int level);
-static void ocfs2_unlock_ast_func(void *opaque, dlm_status status);
+			      int level,
+			      int lkm_flags);
+void ocfs2_cluster_unlock(ocfs_super *osb,
+			  ocfs2_lock_res *lockres,
+			  int level);
 static void ocfs2_inc_inode_seq(ocfs_super *osb,
 				struct inode *inode);
 static void ocfs2_schedule_blocked_inode(struct inode *inode);
+static void ocfs2_schedule_blocked_super(ocfs_super *osb);
 static inline void ocfs2_recover_from_dlm_error(ocfs2_lock_res *lockres,
 						int convert);
 static void ocfs2_vote_on_unlock(ocfs_super *osb,
@@ -168,7 +217,8 @@
 static int ocfs2_process_blocked_data(struct inode *inode,
 				      int *requeue);
 static int ocfs2_do_request_vote(ocfs_super *osb,
-				 struct inode *inode,
+				 u64 blkno,
+				 unsigned int generation,
 				 enum ocfs2_vote_request type);
 
 static inline int ocfs2_lvb_is_trustable(ocfs2_lock_res *lockres)
@@ -246,22 +296,24 @@
 	return (len);
 }
 
-int ocfs2_lock_res_init(ocfs2_lock_res *res,
-			enum ocfs2_lock_type type,
-			struct inode *inode)
+int ocfs2_inode_lock_res_init(ocfs2_lock_res *res,
+			      enum ocfs2_lock_type type,
+			      struct inode *inode)
 {
 	int status;
 
 	LOG_ENTRY();
 
+	OCFS_ASSERT(type == OCFS_TYPE_META ||
+		    type == OCFS_TYPE_DATA);
+
 	memset(res, 0, sizeof(ocfs2_lock_res));
-
 	spin_lock_init(&res->l_lock);
 	init_waitqueue_head(&res->l_event);
-	res->l_inode = inode;
 	res->l_type = type;
 	res->l_level = LKM_IVMODE;
 
+	res->l_priv = inode;
 	status = ocfs2_build_lock_name(type,
 				       OCFS_I(inode)->ip_blkno,
 				       inode->i_generation,
@@ -276,6 +328,35 @@
 	return status;
 }
 
+int ocfs2_super_lock_res_init(ocfs2_lock_res *res,
+     			      ocfs_super *osb)
+{
+	enum ocfs2_lock_type type = OCFS_TYPE_SUPER;
+	int status;
+
+	LOG_ENTRY();
+
+	memset(res, 0, sizeof(ocfs2_lock_res));
+	spin_lock_init(&res->l_lock);
+	init_waitqueue_head(&res->l_event);
+	res->l_type = type;
+	res->l_level = LKM_IVMODE;
+
+	res->l_priv = osb;
+	status = ocfs2_build_lock_name(type,
+				       0ULL,
+				       0,
+				       &res->l_name);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+bail:
+	LOG_EXIT_STATUS(status);
+	return status;
+}
+
 void ocfs2_lock_res_free(ocfs2_lock_res *res)
 {
 	if (res->l_name)
@@ -318,7 +399,7 @@
 	}
 }
 
-static inline void ocfs2_handle_downconvert_action(ocfs2_lock_res *lockres)
+static inline void ocfs2_generic_handle_downconvert_action(ocfs2_lock_res *lockres)
 {
 	OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_BUSY);
 	OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_ATTACHED);
@@ -331,16 +412,6 @@
 	wake_up_all(&lockres->l_event);
 }
 
-static inline void ocfs2_handle_data_convert_action(struct inode *inode,
-						    ocfs2_lock_res *lockres)
-{
-	OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_BUSY);
-	OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_ATTACHED);
-
-	lockres->l_level = lockres->l_requested;
-	lockres->l_flags &= ~OCFS2_LOCK_BUSY;
-}
-
 static void ocfs2_inc_inode_seq(ocfs_super *osb,
 				struct inode *inode)
 {
@@ -365,66 +436,63 @@
 		       atomic_read(seq));
 }
 
-static inline void ocfs2_handle_meta_convert_action(struct inode *inode,
-						    ocfs2_lock_res *lockres)
+static inline void ocfs2_generic_handle_convert_action(ocfs2_lock_res *lockres)
 {
-	ocfs_super *osb = OCFS2_SB(inode->i_sb);
-
 	OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_BUSY);
 	OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_ATTACHED);
 
 	/* Convert from RO to EX doesn't really need anything as our
 	 * information is already up to data. Convert from NL to
-	 * *anything* however should mark the inode as needing an
-	 * update. */
-	if (lockres->l_level == LKM_NLMODE) {
-		ocfs2_inc_inode_seq(osb, inode);
+	 * *anything* however should mark ourselves as needing an
+	 * update */
+	if (lockres->l_level == LKM_NLMODE)
 		lockres->l_flags |= OCFS2_LOCK_NEEDS_REFRESH;
-	}
 
 	lockres->l_level = lockres->l_requested;
 	lockres->l_flags &= ~OCFS2_LOCK_BUSY;
 }
 
-static inline void ocfs2_handle_attach_action(struct inode *inode,
-					      ocfs2_lock_res *lockres)
+static inline void ocfs2_handle_meta_convert_action(struct inode *inode,
+						    ocfs2_lock_res *lockres)
 {
 	ocfs_super *osb = OCFS2_SB(inode->i_sb);
 
+	/* generic_handle_convert_action will set the refresh flag for us. */
+	if (lockres->l_level == LKM_NLMODE)
+		ocfs2_inc_inode_seq(osb, inode);
+	ocfs2_generic_handle_convert_action(lockres);
+}
+
+static inline void ocfs2_generic_handle_attach_action(ocfs2_lock_res *lockres)
+{
 	OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_BUSY);
 	OCFS_ASSERT(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
 
-	/* skip the cache thunk for nlmode requests and local (new
-	 * inode) locks. */
 	if (lockres->l_requested > LKM_NLMODE &&
-	    !(lockres->l_flags & OCFS2_LOCK_LOCAL)) {
-		ocfs2_inc_inode_seq(osb, inode);
+	    !(lockres->l_flags & OCFS2_LOCK_LOCAL))
 		lockres->l_flags |= OCFS2_LOCK_NEEDS_REFRESH;
-	}
 
 	lockres->l_level = lockres->l_requested;
 	lockres->l_flags |= OCFS2_LOCK_ATTACHED;
-	/* should this part be in ocfs2_ast_func? */
 	lockres->l_flags &= ~OCFS2_LOCK_BUSY;
 }
 
-/* can we get a lock type in this proto to? */
-static void ocfs2_ast_func(void *opaque)
+static void ocfs2_inode_ast_func(void *opaque)
 {
 	ocfs2_lock_res *lockres = opaque;
-	struct inode *inode = lockres->l_inode;
+	struct inode *inode = ocfs2_lock_res_inode(lockres);
+	ocfs_super *osb = OCFS2_SB(inode->i_sb);
 	dlm_lockstatus *lksb;
 
 #ifdef OCFS2_VERBOSE_LOCKING_TRACE
 	printk("AST fired for inode %llu\n", OCFS_I(inode)->ip_blkno);
 #endif
-	OCFS_ASSERT(lockres->l_type == OCFS_TYPE_META ||
-		    lockres->l_type == OCFS_TYPE_DATA);
+	OCFS_ASSERT(ocfs2_is_inode_lock(lockres));
 
 	spin_lock(&lockres->l_lock);
 	lksb = &(lockres->l_lksb);
 	if (lksb->status != DLM_NORMAL) {
-		printk("ocfs2_meta_ast_func: lksb status value of %u on "
+		printk("ocfs2_inode_ast_func: lksb status value of %u on "
 		       "inode %llu\n", lksb->status, OCFS_I(inode)->ip_blkno);
 		spin_unlock(&lockres->l_lock);
 		return;
@@ -432,20 +500,30 @@
 
 	switch(lockres->l_action) {
 	case OCFS2_AST_ATTACH:
-		ocfs2_handle_attach_action(inode, lockres);
+		if (lockres->l_type == OCFS_TYPE_META &&
+		    lockres->l_requested > LKM_NLMODE &&
+		    !(lockres->l_flags & OCFS2_LOCK_LOCAL))
+			ocfs2_inc_inode_seq(osb, inode);
+
+		ocfs2_generic_handle_attach_action(lockres);
 		break;
 	case OCFS2_AST_CONVERT:
 		if (lockres->l_type == OCFS_TYPE_META)
 			ocfs2_handle_meta_convert_action(inode, lockres);
 		else
-			ocfs2_handle_data_convert_action(inode, lockres);
+			ocfs2_generic_handle_convert_action(lockres);
 		break;
 	case OCFS2_AST_DOWNCONVERT:
-		ocfs2_handle_downconvert_action(lockres);
+		ocfs2_generic_handle_downconvert_action(lockres);
 		break;
 	default:
 		BUG();
 	}
+
+	/* data locking ignores refresh flag for now. */
+	if (lockres->l_type == OCFS_TYPE_DATA)
+		lockres->l_flags &= ~OCFS2_LOCK_NEEDS_REFRESH;
+
 	/* set it to something invalid so if we get called again we
 	 * can catch it. */
 	lockres->l_action = OCFS2_AST_INVALID;
@@ -453,21 +531,80 @@
 	wake_up_all(&lockres->l_event);
 }
 
-static void ocfs2_bast_func(void *opaque, int level)
+static void ocfs2_generic_handle_bast(ocfs2_lock_res *lockres, int level)
 {
+	spin_lock(&lockres->l_lock);
+	lockres->l_flags |= OCFS2_LOCK_BLOCKED;
+	if (level > lockres->l_blocking)
+		lockres->l_blocking = level;
+	spin_unlock(&lockres->l_lock);
+}
+
+static void ocfs2_inode_bast_func(void *opaque, int level)
+{
 	ocfs2_lock_res *lockres = opaque;
-	struct inode *inode = lockres->l_inode;
+	struct inode *inode = ocfs2_lock_res_inode(lockres);
 	ocfs_super *osb = OCFS2_SB(inode->i_sb);
 
 #ifdef OCFS2_VERBOSE_LOCKING_TRACE
 	printk("BAST fired for inode %llu\n", OCFS_I(inode)->ip_blkno);
 #endif
+	ocfs2_generic_handle_bast(lockres, level);
+
+	ocfs2_schedule_blocked_inode(inode);
+	ocfs2_kick_vote_thread(osb);
+}
+
+static void ocfs2_super_ast_func(void *opaque)
+{
+	ocfs2_lock_res *lockres = opaque;
+	dlm_lockstatus *lksb;
+
+#ifdef OCFS2_VERBOSE_LOCKING_TRACE
+	printk("AST fired for inode %llu\n", OCFS_I(inode)->ip_blkno);
+#endif
+	OCFS_ASSERT(ocfs2_is_super_lock(lockres));
+
 	spin_lock(&lockres->l_lock);
-	lockres->l_flags |= OCFS2_LOCK_BLOCKED;
-	if (level > lockres->l_blocking)
-		lockres->l_blocking = level;
+	lksb = &(lockres->l_lksb);
+	if (lksb->status != DLM_NORMAL) {
+		printk("ocfs2_super_ast_func: lksb status value of %u!\n",
+		       lksb->status);
+		spin_unlock(&lockres->l_lock);
+		return;
+	}
+
+	switch(lockres->l_action) {
+	case OCFS2_AST_ATTACH:
+		ocfs2_generic_handle_attach_action(lockres);
+		break;
+	case OCFS2_AST_CONVERT:
+		ocfs2_generic_handle_convert_action(lockres);
+		break;
+	case OCFS2_AST_DOWNCONVERT:
+		ocfs2_generic_handle_downconvert_action(lockres);
+		break;
+	default:
+		BUG();
+	}
+	/* set it to something invalid so if we get called again we
+	 * can catch it. */
+	lockres->l_action = OCFS2_AST_INVALID;
 	spin_unlock(&lockres->l_lock);
-	ocfs2_schedule_blocked_inode(inode);
+	wake_up_all(&lockres->l_event);
+}
+
+static void ocfs2_super_bast_func(void *opaque, int level)
+{
+	ocfs2_lock_res *lockres = opaque;
+	ocfs_super *osb = ocfs2_lock_res_super(lockres);
+
+#ifdef OCFS2_VERBOSE_LOCKING_TRACE
+	printk("Superblock BAST fired\n");
+#endif
+	ocfs2_generic_handle_bast(lockres, level);
+
+	ocfs2_schedule_blocked_super(osb);
 	ocfs2_kick_vote_thread(osb);
 }
 
@@ -570,7 +707,8 @@
 
 static int ocfs2_cluster_lock(ocfs_super *osb,
 			      ocfs2_lock_res *lockres,
-			      int level)
+			      int level,
+			      int lkm_flags)
 {
 	int ret;
 	enum ocfs2_lock_type type = lockres->l_type;
@@ -578,6 +716,12 @@
 
 	LOG_ENTRY();
 
+#warning "this is ignored for now!"
+	/* Still waiting for this to be implemented in dlmmod, for now
+	 * we fake a response */
+	if (lkm_flags & LKM_NOQUEUE)
+		return -EAGAIN;
+
 again:
 	if (signal_pending(current)) {
 		ret = -EINTR;
@@ -634,8 +778,13 @@
 				 lockres,
 				 ocfs2_lock_type_basts[type]);
 		if (status != DLM_NORMAL) {
-			LOG_ERROR_ARGS("Dlm returns %d\n", status);
-			ret = -ENOENT;
+			if ((lkm_flags & LKM_NOQUEUE) &&
+			    (status == DLM_NOTQUEUED))
+				ret = -EAGAIN;
+			else {
+				LOG_ERROR_ARGS("Dlm returns %d\n", status);
+				ret = -ENOENT;
+			}
 			ocfs2_recover_from_dlm_error(lockres, 1);
 			goto bail;
 		}
@@ -655,6 +804,16 @@
 	return ret;
 }
 
+void ocfs2_cluster_unlock(ocfs_super *osb,
+			  ocfs2_lock_res *lockres,
+			  int level)
+{
+	spin_lock(&lockres->l_lock);
+	ocfs2_dec_holders(lockres, level);
+	ocfs2_vote_on_unlock(osb, lockres);
+	spin_unlock(&lockres->l_lock);
+}
+
 /* Grants us an EX lock on the data and metadata resources, skipping
  * the normal cluster directory lookup. Use this ONLY on newly created
  * inodes which other nodes can't possibly see, and which haven't been
@@ -722,7 +881,7 @@
 
 	level = write ? LKM_EXMODE : LKM_PRMODE;
 
-	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level);
+	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0);
 	if (status < 0 && status != -EINTR)
 		LOG_ERROR_STATUS(status);
 
@@ -760,15 +919,10 @@
 void ocfs2_data_unlock(struct inode *inode,
 		       int write)
 {
-	int level;
+	int level = write ? LKM_EXMODE : LKM_PRMODE;
 	ocfs2_lock_res *lockres = &OCFS_I(inode)->ip_data_lockres;
 
-	level = write ? LKM_EXMODE : LKM_PRMODE;
-
-	spin_lock(&lockres->l_lock);
-	ocfs2_dec_holders(lockres, level);
-	ocfs2_vote_on_unlock(OCFS2_SB(inode->i_sb), lockres);
-	spin_unlock(&lockres->l_lock);
+	ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
 }
 
 static inline int ocfs2_wait_on_recovery(ocfs_super *osb)
@@ -860,17 +1014,18 @@
 	}
 }
 
-/* may or may not return a bh if it went to disk. */
-static int ocfs2_meta_lock_update(struct inode *inode,
-				  struct buffer_head **bh)
+/* Determine whether a lock resource needs to be refreshed, and
+ * arbitrate who gets to refresh it.
+ *
+ * -1 means error, 0 means no refresh needed, > 0 means you need to
+ *   refresh this and you MUST call ocfs2_complete_lock_res_refresh
+ *   afterwards. */
+static int ocfs2_should_refresh_lock_res(ocfs2_lock_res *lockres)
 {
+
 	int status = 0;
-	u32 trustable_clusters = 0;
-	ocfs2_lock_res *lockres;
-	ocfs2_dinode *fe;
+	LOG_ENTRY();
 
-	lockres = &OCFS_I(inode)->ip_meta_lockres;
-
 refresh_check:
 	spin_lock(&lockres->l_lock);
 	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
@@ -892,6 +1047,46 @@
 	lockres->l_flags |= OCFS2_LOCK_REFRESHING;
 	spin_unlock(&lockres->l_lock);
 
+	status = 1;
+bail:
+	LOG_EXIT_STATUS(status);
+	return status;
+}
+
+/* If status is non zero, I'll mark it as not being in refresh
+ * anymroe, but i won't clear the needs refresh flag. */
+static inline void ocfs2_complete_lock_res_refresh(ocfs2_lock_res *lockres,
+						   int status)
+{
+	spin_lock(&lockres->l_lock);
+	lockres->l_flags &= ~OCFS2_LOCK_REFRESHING;
+	if (!status)
+		lockres->l_flags &= ~OCFS2_LOCK_NEEDS_REFRESH;
+	spin_unlock(&lockres->l_lock);
+
+	wake_up_all(&lockres->l_event);
+}
+
+/* may or may not return a bh if it went to disk. */
+static int ocfs2_meta_lock_update(struct inode *inode,
+				  struct buffer_head **bh)
+{
+	int status;
+	u32 trustable_clusters = 0;
+	ocfs2_lock_res *lockres;
+	ocfs2_dinode *fe;
+
+	lockres = &OCFS_I(inode)->ip_meta_lockres;
+
+	status = ocfs2_should_refresh_lock_res(lockres);
+	if (!status)
+		goto bail;
+	if (status < 0) {
+		if (status != -EINTR)
+			LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
 	/* we don't want to use the LVB for bitmap files as the
 	 * used/set bit union is not currently sent over the wire. */
 	if (!(OCFS_I(inode)->ip_flags & OCFS_INODE_BITMAP) &&
@@ -932,22 +1127,18 @@
 	ocfs2_set_local_seq_from_lvb(lockres);
 	ocfs2_reset_meta_lvb_values(inode);
 
-	spin_lock(&lockres->l_lock);
-	lockres->l_flags &= ~OCFS2_LOCK_REFRESHING;
-	lockres->l_flags &= ~OCFS2_LOCK_NEEDS_REFRESH;
-	spin_unlock(&lockres->l_lock);
-
-	wake_up_all(&lockres->l_event);
+	ocfs2_complete_lock_res_refresh(lockres, 0);
 bail:
 	return status;
 }
 
-int ocfs2_meta_lock(struct inode *inode,
-		    ocfs_journal_handle *handle,
-		    struct buffer_head **ret_bh,
-		    int ex)
+int ocfs2_meta_lock_flags(struct inode *inode,
+			  ocfs_journal_handle *handle,
+			  struct buffer_head **ret_bh,
+			  int ex,
+			  int flags)
 {
-	int status, level;
+	int status, level, dlm_flags;
 	ocfs2_lock_res *lockres;
 	ocfs_super *osb = OCFS2_SB(inode->i_sb);
 	struct buffer_head *bh = NULL;
@@ -964,9 +1155,7 @@
 	       ex ? "EXMODE" : "PRMODE", OCFS_I(inode)->ip_blkno);
 #endif
 
-	/* we skip recovery wait on journal inodes as those can be
-	 * locked from ocfs_recover_node. */
-	if (!INODE_JOURNAL(inode)) {
+	if (!(flags & OCFS2_META_LOCK_RECOVERY)) {
 		status = ocfs2_wait_on_recovery(osb);
 		if (status < 0)
 			goto bail;
@@ -974,15 +1163,18 @@
 
 	lockres = &OCFS_I(inode)->ip_meta_lockres;
 	level = ex ? LKM_EXMODE : LKM_PRMODE;
+	dlm_flags = 0;
+	if (flags & OCFS2_META_LOCK_NOQUEUE)
+		dlm_flags |= LKM_NOQUEUE;
 
-	status = ocfs2_cluster_lock(osb, lockres, level);
+	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags);
 	if (status < 0) {
-		if (status != -EINTR)
+		if (status != -EINTR && status != -EAGAIN)
 			LOG_ERROR_STATUS(status);
 		goto bail;
 	}
 
-	if (!INODE_JOURNAL(inode)) {
+	if (!(flags & OCFS2_META_LOCK_RECOVERY)) {
 		status = ocfs2_wait_on_recovery(osb);
 		if (status < 0)
 			goto bail;
@@ -1023,17 +1215,63 @@
 void ocfs2_meta_unlock(struct inode *inode,
 		       int ex)
 {
-	int level;
+	int level = ex ? LKM_EXMODE : LKM_PRMODE;
 	ocfs2_lock_res *lockres = &OCFS_I(inode)->ip_meta_lockres;
 
-	level = ex ? LKM_EXMODE : LKM_PRMODE;
+	ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
+}
 
-	spin_lock(&lockres->l_lock);
-	ocfs2_dec_holders(lockres, level);
-	ocfs2_vote_on_unlock(OCFS2_SB(inode->i_sb), lockres);
-	spin_unlock(&lockres->l_lock);
+int ocfs2_super_lock(ocfs_super *osb,
+		     int ex)
+{
+	int status;
+	int level = ex ? LKM_EXMODE : LKM_PRMODE;
+	ocfs2_lock_res *lockres = &osb->super_lockres;
+	struct buffer_head *bh;
+	ocfs2_slot_info *si = osb->slot_info;
+
+	LOG_ENTRY();
+
+	status = ocfs2_cluster_lock(osb, lockres, level, 0);
+	if (status < 0) {
+		if (status != -EINTR)
+			LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	/* The super block lock path is really in the best position to
+	 * know when resources covered by the lock need to be
+	 * refreshed, so we do it here. Of course, making sense of
+	 * everything is up to the caller :) */
+	status = ocfs2_should_refresh_lock_res(lockres);
+	if (status < 0) {
+		if (status != -EINTR)
+			LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+	if (status) {
+		bh = si->si_bh;
+		status = ocfs_read_block(osb, bh->b_blocknr, &bh, 0,
+					 si->si_inode);
+		if (status < 0)
+			LOG_ERROR_STATUS(status);
+
+		ocfs2_complete_lock_res_refresh(lockres, status);
+	}
+bail:
+	LOG_EXIT_STATUS(status);
+	return status;
 }
 
+void ocfs2_super_unlock(ocfs_super *osb,
+			int ex)
+{
+	int level = ex ? LKM_EXMODE : LKM_PRMODE;
+	ocfs2_lock_res *lockres = &osb->super_lockres;
+
+	ocfs2_cluster_unlock(osb, lockres, level);
+}
+
 int ocfs2_dlm_init(ocfs_super *osb)
 {
 	int status, pid;
@@ -1071,8 +1309,10 @@
 	}
 
 	osb->dlm = dlm;
-	/* sets osb->dlm */
-	status = 0;
+
+	status = ocfs2_super_lock_res_init(&osb->super_lockres, osb);
+	if (status < 0)
+		LOG_ERROR_STATUS(status);
 bail:
 
 	LOG_EXIT_STATUS(status);
@@ -1096,21 +1336,10 @@
 	if (wait_on_vote_task)
 		wait_for_completion(&osb->vote_event_complete);
 
+	ocfs2_lock_res_free(&osb->super_lockres);
 	dlm_unregister_domain(osb->dlm);
 }
 
-int ocfs2_find_slot(ocfs_super *osb)
-{
-#warning "finish this"
-
-	/* TODO: We take a lock on the super block, read in our node
-	 * map and find ourselves a slot. Right now hard code things
-	 * such that slot_num == global_node_num. */
-
-	osb->slot_num = osb->global_node_num;
-	return 0;
-}
-
 static void ocfs2_unlock_ast_func(void *opaque, dlm_status status)
 {
 	ocfs2_lock_res *lockres = opaque;
@@ -1449,6 +1678,69 @@
 					new_level, 0);
 }
 
+/* TODO: This is very generic, and looks much like
+ * ocfs2_process_blocked_data. Lets try to find a way to combine these
+ * two. */
+static void ocfs2_process_blocked_super(ocfs_super *osb)
+{
+	int status = 0;
+	int new_level;
+	ocfs2_lock_res *lockres = &osb->super_lockres;
+
+	spin_lock(&lockres->l_lock);
+	if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
+		spin_unlock(&lockres->l_lock);
+		return;
+	}
+
+	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
+		if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
+			/* If we're already trying to cancel a lock conversion
+			 * then just drop the spinlock and requeue ourselves
+			 * to check again later. */
+			spin_unlock(&lockres->l_lock);
+		} else
+			status = __ocfs2_cancel_convert(osb,
+							lockres);
+		if (status < 0)
+			LOG_ERROR_STATUS(status);
+		goto requeue;
+	}
+
+	/* if we're blocking an exclusive and we have *any* holders,
+	 * then requeue. */
+	if ((lockres->l_blocking == LKM_EXMODE) 
+	    && (lockres->l_ex_holders || lockres->l_ro_holders)) {
+		spin_unlock(&lockres->l_lock);
+		goto requeue;
+	}
+
+	/* If it's a PR we're blocking, then only
+	 * requeue if we've got anyone holding an EX */
+	if (lockres->l_blocking == LKM_PRMODE &&
+	    lockres->l_ex_holders) {
+		spin_unlock(&lockres->l_lock);
+		goto requeue;
+	}
+
+	/* if we get here, then we know that we have no incompatible
+	 * holders, and since we're marked, anyone asking for an
+	 * incompatible lock will block. We can safely downconvert
+	 * now. */
+	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
+	spin_unlock(&lockres->l_lock);
+
+	status = __ocfs2_downconvert_lock(osb, lockres, new_level, 0);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto requeue;
+	}
+
+	return;
+requeue:
+	ocfs2_schedule_blocked_super(osb);
+}
+
 static void ocfs2_process_blocked_inode(struct inode *inode)
 {
 	int status;
@@ -1467,6 +1759,13 @@
 		ocfs2_schedule_blocked_inode(inode);
 }
 
+static void ocfs2_schedule_blocked_super(ocfs_super *osb)
+{
+	spin_lock(&osb->vote_task_lock);
+	osb->blocked_super_count++;
+	spin_unlock(&osb->vote_task_lock);
+}
+
 static void ocfs2_schedule_blocked_inode(struct inode *inode)
 {
 	ocfs_super *osb = OCFS2_SB(inode->i_sb);
@@ -1485,6 +1784,23 @@
 	spin_unlock(&osb->vote_task_lock);
 }
 
+static void ocfs2_process_mount_request(ocfs_super *osb,
+					unsigned int node_num)
+{
+	printk("MOUNT vote from node %u\n", node_num);
+	/* The other node only sends us this message when he has an EX
+	 * on the superblock, so our recovery threads (if having been
+	 * launched) are waiting on it.*/
+	ocfs_recovery_map_clear(osb, node_num);
+}
+
+static void ocfs2_process_umount_request(ocfs_super *osb,
+					 unsigned int node_num)
+{
+	printk("UMOUNT vote from node %u\n", node_num);
+	ocfs_node_map_set_bit(osb, &osb->umount_map, node_num);
+}
+
 static int ocfs2_process_delete_request(struct inode *inode)
 {
 	int response = -EBUSY;
@@ -1537,8 +1853,8 @@
 	return response;
 }
 
-static int ocfs2_process_dentry_request(struct inode *inode,
-					int rename)
+static void ocfs2_process_dentry_request(struct inode *inode,
+					 int rename)
 {
 	d_prune_aliases (inode);
 
@@ -1549,9 +1865,6 @@
 		else
 			inode->i_nlink--;
 	}
-
-	/* we always vote yes on this request type. */
-	return 0;
 }
 
 static void ocfs2_process_vote(ocfs_super *osb,
@@ -1563,6 +1876,18 @@
 
 	OCFS_ASSERT(!memcmp(msg->m_hdr.h_uuid, osb->uuid, MAX_VOL_ID_LENGTH));
 
+	switch (msg->m_request) {
+	case OCFS2_VOTE_REQ_UMOUNT:
+		ocfs2_process_umount_request(osb, msg->m_req_node);
+		goto respond;
+		break;
+	case OCFS2_VOTE_REQ_MOUNT:
+		ocfs2_process_mount_request(osb, msg->m_req_node);
+		goto respond;
+		break;
+	}
+
+	/* If we get here, then the request is against an inode. */
 	inode = ocfs_ilookup(osb, msg->m_blkno);
 	if (!inode)
 		goto respond;
@@ -1576,7 +1901,7 @@
 	case OCFS2_VOTE_REQ_RENAME:
 		rename = 1;
 	case OCFS2_VOTE_REQ_UNLINK:
-		vote_response = ocfs2_process_dentry_request(inode, rename);
+		ocfs2_process_dentry_request(inode, rename);
 		break;
 	default:
 		printk("ocfs2_process_vote: node %u, invalid request: %u\n",
@@ -1598,6 +1923,16 @@
 	ocfs2_vote_work *work;
 
 	spin_lock(&osb->vote_task_lock);
+	if (osb->blocked_super_count) {
+		OCFS_ASSERT(osb->blocked_super_count == 1);
+		osb->blocked_super_count = 0;
+		spin_unlock(&osb->vote_task_lock);
+
+		ocfs2_process_blocked_super(osb);
+
+		spin_lock(&osb->vote_task_lock);
+	}
+
 	processed = osb->blocked_inode_count;
 	while (processed) {
 		OCFS_ASSERT(!list_empty(&osb->blocked_inode_list));
@@ -1634,6 +1969,16 @@
 	spin_unlock(&osb->vote_task_lock);
 }
 
+static inline int ocfs2_vote_thread_has_work(ocfs_super *osb)
+{
+	if (list_empty(&osb->blocked_inode_list) &&
+	    list_empty(&osb->vote_list) &&
+	    !osb->blocked_super_count)
+		return 0;
+
+	return 1;
+}
+
 static int ocfs2_vote_thread(void *arg)
 {
 	int status = 0;
@@ -1651,8 +1996,7 @@
 
 	while (1) {
 		if (osb->vote_exit) {
-			if (list_empty(&osb->blocked_inode_list) &&
-			    list_empty(&osb->vote_list))
+			if (!ocfs2_vote_thread_has_work(osb))
 				break;
 			/* don't want to sleep if we're supposed to quit. */
 			atomic_set(&osb->wake_vote_task, 1);
@@ -1679,7 +2023,8 @@
 }
 
 static int ocfs2_do_request_vote(ocfs_super *osb,
-				 struct inode *inode,
+				 u64 blkno,
+				 unsigned int generation,
 				 enum ocfs2_vote_request type)
 {
 	int status;
@@ -1688,7 +2033,8 @@
 
 	OCFS_ASSERT(type == OCFS2_VOTE_REQ_DELETE ||
 		    type == OCFS2_VOTE_REQ_UNLINK ||
-		    type == OCFS2_VOTE_REQ_RENAME);
+		    type == OCFS2_VOTE_REQ_RENAME ||
+		    type == OCFS2_VOTE_REQ_UMOUNT);
 
 	request = kmalloc(sizeof(*request), GFP_KERNEL);
 	if (!request) {
@@ -1709,8 +2055,8 @@
 	request->m_hdr.h_type = OCFS2_MESSAGE_TYPE_VOTE;
 	request->m_req_node = osb->node_num;
 	request->m_request = type;
-	request->m_blkno = OCFS_I(inode)->ip_blkno;
-	request->m_generation = inode->i_generation;
+	request->m_blkno = blkno;
+	request->m_generation = generation;
 
 	/* register for the response here */
 	/* send the broadcast request here */
@@ -1750,7 +2096,17 @@
 		if (ocfs_node_map_is_only(osb, &osb->node_map, osb->node_num))
 			return 0;
 
-		status = ocfs2_do_request_vote(osb, inode, type);
+		status = ocfs2_super_lock(osb, 0);
+		if (status < 0) {
+			if (status != -EINTR)
+				LOG_ERROR_STATUS(status);
+			break;
+		}
+
+		status = ocfs2_do_request_vote(osb, OCFS_I(inode)->ip_blkno,
+					       inode->i_generation, type);
+
+		ocfs2_super_unlock(osb, 0);
 	}
 	return status;
 }
@@ -1769,3 +2125,39 @@
 {
 	return ocfs2_request_vote(inode, OCFS2_VOTE_REQ_RENAME);
 }
+
+int ocfs2_request_mount_vote(ocfs_super *osb)
+{
+	int status;
+
+	status = -EAGAIN;
+	while (status == -EAGAIN) {
+		if (signal_pending(current))
+			return -EINTR;
+
+		if (ocfs_node_map_is_only(osb, &osb->node_map, osb->node_num))
+			return 0;
+
+		status = ocfs2_do_request_vote(osb, 0ULL, 0,
+					       OCFS2_VOTE_REQ_MOUNT);
+	}
+	return status;
+}
+
+int ocfs2_request_umount_vote(ocfs_super *osb)
+{
+	int status;
+
+	status = -EAGAIN;
+	while (status == -EAGAIN) {
+		if (signal_pending(current))
+			return -EINTR;
+
+		if (ocfs_node_map_is_only(osb, &osb->node_map, osb->node_num))
+			return 0;
+
+		status = ocfs2_do_request_vote(osb, 0ULL, 0,
+					       OCFS2_VOTE_REQ_UMOUNT);
+	}
+	return status;
+}

Modified: branches/dlm-glue/src/dlmglue.h
===================================================================
--- branches/dlm-glue/src/dlmglue.h	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/dlmglue.h	2004-11-19 23:44:28 UTC (rev 1655)
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; -*-
  * vim: noexpandtab sw=8 ts=8 sts=0:
  *
- * middle.h
+ * dlmglue.h
  *
  * description here
  *
@@ -79,10 +79,11 @@
 
 int ocfs2_dlm_init(ocfs_super *osb);
 void ocfs2_dlm_shutdown(ocfs_super *osb);
-int ocfs2_find_slot(ocfs_super *osb);
-int ocfs2_lock_res_init(ocfs2_lock_res *res,
-			enum ocfs2_lock_type type,
-			struct inode *inode);
+int ocfs2_inode_lock_res_init(ocfs2_lock_res *res,
+			      enum ocfs2_lock_type type,
+			      struct inode *inode);
+int ocfs2_super_lock_res_init(ocfs2_lock_res *res,
+			      ocfs_super *osb);
 void ocfs2_lock_res_free(ocfs2_lock_res *res);
 int ocfs2_create_new_inode_locks(struct inode *inode);
 int ocfs2_drop_inode_locks(struct inode *inode);
@@ -90,14 +91,24 @@
 		    int write);
 void ocfs2_data_unlock(struct inode *inode,
 		       int write);
-int ocfs2_meta_lock(struct inode *inode,
-		    ocfs_journal_handle *handle,
-		    struct buffer_head **ret_bh,
-		    int ex);
+/* don't wait on recovery. */
+#define OCFS2_META_LOCK_RECOVERY	(0x01)
+/* Instruct the dlm not to queue ourselves on the other node. */
+#define OCFS2_META_LOCK_NOQUEUE		(0x02)
+/* 99% of the time we don't want to supply any additional flags --
+ * those are for very specific cases only. */
+#define ocfs2_meta_lock(i, h, b, e) ocfs2_meta_lock_flags(i, h, b, e, 0)
+int ocfs2_meta_lock_flags(struct inode *inode,
+			  ocfs_journal_handle *handle,
+			  struct buffer_head **ret_bh,
+			  int ex,
+			  int flags);
 void ocfs2_meta_unlock(struct inode *inode,
 		       int ex);
-int ocfs2_super_lock(ocfs_super *osb, int ex);
-void ocfs2_super_unlock(ocfs_super *osb);
+int ocfs2_super_lock(ocfs_super *osb,
+		     int ex);
+void ocfs2_super_unlock(ocfs_super *osb,
+			int ex);
 
 static inline void ocfs2_kick_vote_thread(ocfs_super *osb)
 {
@@ -107,6 +118,8 @@
 int ocfs2_request_delete_vote(struct inode *inode);
 int ocfs2_request_unlink_vote(struct inode *inode);
 int ocfs2_request_rename_vote(struct inode *inode);
+int ocfs2_request_mount_vote(ocfs_super *osb);
+int ocfs2_request_umount_vote(ocfs_super *osb);
 
 static inline void ocfs2_lvb_set_trunc_clusters(struct inode *inode,
 						unsigned int trunc_clusters)
@@ -158,7 +171,9 @@
 	OCFS2_VOTE_REQ_INVALID = 0,
 	OCFS2_VOTE_REQ_DELETE,
 	OCFS2_VOTE_REQ_UNLINK,
-	OCFS2_VOTE_REQ_RENAME
+	OCFS2_VOTE_REQ_RENAME,
+	OCFS2_VOTE_REQ_MOUNT,
+	OCFS2_VOTE_REQ_UMOUNT
 };
 
 #endif

Modified: branches/dlm-glue/src/heartbeat.c
===================================================================
--- branches/dlm-glue/src/heartbeat.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/heartbeat.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -50,11 +50,16 @@
 #define OCFS_DEBUG_CONTEXT      OCFS_DEBUG_CONTEXT_HEARTBEAT
 
 #define OCFS2_HB_NODE_DOWN_PRI     (0x0000001)
+#define OCFS2_HB_NODE_UP_PRI	   OCFS2_HB_NODE_DOWN_PRI
 
-static void ocfs2_hb_node_down_cb(void *ptr1,
-				  void *ptr2,
+static void ocfs2_hb_node_down_cb(struct inode *group,
+				  struct inode *node,
 				  int node_num,
 				  void *data);
+static void ocfs2_hb_node_up_cb(struct inode *group,
+				struct inode *node,
+				int node_num,
+				void *data);
 
 static void ocfs_node_map_init(ocfs_super *osb, ocfs_node_map *map);
 static void __ocfs_node_map_dup(ocfs_super *osb,
@@ -75,40 +80,79 @@
 	spin_lock_init(&osb->node_map_lock);
 	ocfs_node_map_init(osb, &osb->node_map);
 	ocfs_node_map_init(osb, &osb->recovery_map);
+	ocfs_node_map_init(osb, &osb->umount_map);
 }
 
-static void ocfs2_hb_node_down_cb(void *ptr1,
-				  void *ptr2,
+static void ocfs2_hb_node_down_cb(struct inode *group,
+				  struct inode *node,
 				  int node_num,
 				  void *data)
 {
 	ocfs_super *osb = data;
 
-	if (atomic_read(&osb->vol_state) != VOLUME_MOUNTED) {
-		printk("ocfs2: Ignoring node down callback for node %d\n",
-		       node_num);
+	ocfs_node_map_clear_bit(osb, &osb->node_map, node_num);
+	if (osb->group_inode != group)
 		return;
-	}
 
-	OCFS_ASSERT(osb->global_node_num != node_num);
+	OCFS_ASSERT(osb->node_num != node_num);
 
 	printk("ocfs2: node down event for %d\n", node_num);
-	/* uhm, do some recovery stuff.. */
+
+	if (ocfs_node_map_test_bit(osb, &osb->umount_map, node_num)) {
+		/* If a node is in the umount map, then we've been
+		 * expecting him to go down and we know ahead of time
+		 * that recovery is not necessary. */
+		ocfs_node_map_clear_bit(osb, &osb->umount_map, node_num);
+		return;
+	}
+
+	ocfs_recovery_thread(osb, node_num);
 }
 
+static void ocfs2_hb_node_up_cb(struct inode *group,
+				struct inode *node,
+				int node_num,
+				void *data)
+{
+	ocfs_super *osb = data;
+
+	ocfs_node_map_set_bit(osb, &osb->node_map, node_num);
+	if (osb->group_inode != group)
+		return;
+
+	OCFS_ASSERT(osb->node_num != node_num);
+
+	printk("ocfs2: node up event for %d\n", node_num);
+	ocfs_node_map_clear_bit(osb, &osb->umount_map, node_num);
+}
+
 /* Most functions here are just stubs for now... */
 int ocfs2_register_hb_callbacks(ocfs_super *osb)
 {
 	int status;
-	ocfs_node_map_set_bit(osb, &osb->node_map, osb->node_num);
 
 	status = hb_register_callback(HB_NODE_DOWN_CB,
 				      ocfs2_hb_node_down_cb,
 				      osb,
 				      OCFS2_HB_NODE_DOWN_PRI);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	status = hb_register_callback(HB_NODE_UP_CB,
+				      ocfs2_hb_node_up_cb,
+				      osb,
+				      OCFS2_HB_NODE_UP_PRI);
 	if (status < 0)
 		LOG_ERROR_STATUS(status);
 
+	status = hb_fill_node_map(osb->group_inode, &osb->node_map.map,
+				  sizeof(osb->node_map.map));
+	if (status < 0)
+		LOG_ERROR_STATUS(status);
+
+bail:
 	return status;
 }
 
@@ -121,7 +165,11 @@
 	if (status < 0)
 		LOG_ERROR_STATUS(status);
 
-	ocfs_node_map_clear_bit(osb, &osb->node_map, osb->node_num);
+	status = hb_unregister_callback(HB_NODE_UP_CB,
+					ocfs2_hb_node_up_cb, osb);
+	if (status < 0)
+		LOG_ERROR_STATUS(status);
+
 }
 
 /* special case -1 for now
@@ -259,10 +307,47 @@
 	ocfs_node_map_clear_bit(osb, &osb->recovery_map, num);
 }
 
+int ocfs_node_map_first_set_bit(ocfs_super *osb,
+				ocfs_node_map *map)
+{
+	int i, ret = -1;
 
+	spin_lock(&osb->node_map_lock);
+	for(i = 0; i < map->num_nodes; i++)
+		if (test_bit(i, map->map)) {
+			ret = i;
+			break;
+		}
+	spin_unlock(&osb->node_map_lock);
+	return ret;
+}
+
 #if 0
 /* unused (for now) node map functions. */
 
+/* uses the heartbeat api to test whether a given global node num is
+ * heartbeating. Warning: this function can sleep in
+ * hb_fill_node_map() */
+int ocfs2_is_node_alive(ocfs_super *osb,
+			unsigned int node_num)
+{
+	int ret;
+	ocfs_node_map tmpmap;
+
+	ocfs_node_map_init(osb, &tmpmap);
+
+	ret = hb_fill_node_map(osb->group_inode, &tmpmap, sizeof(tmpmap.map));
+	if (ret < 0) {
+		LOG_ERROR_STATUS(ret);
+		goto bail;
+	}
+
+	ret = ocfs_node_map_test_bit(osb, &tmpmap, node_num);
+
+bail:
+	return ret;
+}
+
 static int ocfs_node_map_stringify(ocfs_node_map *map, char **str)
 {
 	int i, n;
@@ -347,12 +432,3 @@
 }
 #endif
 
-#if 0
-				if (node_map[i].miss_cnt >= MISS_COUNT_NODE_DEAD) {
-					ocfs_recovery_map_set(osb, i);
-					ocfs_publish_map_clear(&osb->publ_map, i);
-
-					/* Ok, we'd better recover him now...*/
-					ocfs_recovery_thread(osb, i);
-				}
-#endif

Modified: branches/dlm-glue/src/heartbeat.h
===================================================================
--- branches/dlm-glue/src/heartbeat.h	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/heartbeat.h	2004-11-19 23:44:28 UTC (rev 1655)
@@ -44,6 +44,8 @@
 int ocfs_node_map_test_bit(ocfs_super *osb,
 			   ocfs_node_map *map,
 			   int bit);
+int ocfs_node_map_first_set_bit(ocfs_super *osb,
+				ocfs_node_map *map);
 void ocfs_recovery_map_set(ocfs_super *osb,
 			   int num);
 void ocfs_recovery_map_clear(ocfs_super *osb,

Modified: branches/dlm-glue/src/inode.c
===================================================================
--- branches/dlm-glue/src/inode.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/inode.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -426,14 +426,14 @@
 		    break;
 	}
 
-	status = ocfs2_lock_res_init(&OCFS_I(inode)->ip_meta_lockres,
-				     OCFS_TYPE_META, inode);
+	status = ocfs2_inode_lock_res_init(&OCFS_I(inode)->ip_meta_lockres,
+					   OCFS_TYPE_META, inode);
 	if (status < 0) {
 		LOG_ERROR_STATUS(status);
 		goto bail;
 	}
-	status = ocfs2_lock_res_init(&OCFS_I(inode)->ip_data_lockres,
-				     OCFS_TYPE_DATA, inode);
+	status = ocfs2_inode_lock_res_init(&OCFS_I(inode)->ip_data_lockres,
+					   OCFS_TYPE_DATA, inode);
 	if (status < 0)
 		LOG_ERROR_STATUS(status);
 bail:

Modified: branches/dlm-glue/src/journal.c
===================================================================
--- branches/dlm-glue/src/journal.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/journal.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -42,6 +42,7 @@
 #include "journal.h"
 #include "localalloc.h"
 #include "namei.h"
+#include "slot_map.h"
 #include "super.h"
 #include "util.h"
 #include "sysfile.h"
@@ -64,6 +65,8 @@
 static void ocfs_commit_unstarted_handle(ocfs_journal_handle *handle);
 static int ocfs_journal_toggle_dirty(ocfs_super *osb,
 				     int dirty);
+static int ocfs2_trylock_journal(ocfs_super *osb,
+				 int slot_num);
 
 /* 
  * ocfs_commit_cache()
@@ -619,7 +622,8 @@
 		BUG();
 
 	/* already have the inode for our journal */
-	inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, osb->node_num);
+	inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, 
+					   osb->slot_num);
 	if (inode == NULL) {
 		LOG_ERROR_STR("access error");
 		status = -EACCES;
@@ -680,8 +684,6 @@
 
 	/* yay, pass the proper info back to our journal structure. */
 	osb->journal->osb = osb;
-	/* eventually this will be a value passed into us */
-	osb->journal->node_num = osb->node_num;
 	osb->journal->k_journal = k_journal;
 	osb->journal->k_inode = inode;
 	osb->journal->version = OCFS_JOURNAL_CURRENT_VERSION;
@@ -962,75 +964,93 @@
 	return(status);
 }
 
-struct ocfs_recover_arg {
-	ocfs_super *osb;
-	int node_num;
-};
-
 static int __ocfs_recovery_thread(void *arg)
 {
-	struct ocfs_recover_arg *recover_arg = arg;
-	ocfs_super *osb = recover_arg->osb;
-	int node_num = recover_arg->node_num;
+	ocfs_super *osb = arg;
 	int status = 0;
+	int node_num;
 	char proc[16];
 
-	LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n", node_num,
-		       osb->node_num);
+	LOG_ENTRY();
 
 	sprintf (proc, "ocfs2rec-%d", osb->osb_id);
 	ocfs_daemonize (proc, strlen(proc), 0);
 
-#ifdef HAVE_NPTL
-	spin_lock_irq (&current->sighand->siglock);
-	sigfillset(&current->blocked);
-	recalc_sigpending();
-	spin_unlock_irq (&current->sighand->siglock);
-#else
-	spin_lock_irq(&current->sigmask_lock);
-	sigfillset(&current->blocked);
-	recalc_sigpending(current);
-	spin_unlock_irq(&current->sigmask_lock);
-#endif
+	status = ocfs_wait_on_mount(osb);
+	if (status < 0) {
+		if (status == -EBUSY)
+			status = 0;
+		goto bail;
+	}
 
-	status = ocfs_recover_node(osb, node_num);
-	if (status < 0)
+restart:
+	status = ocfs2_super_lock(osb, 1);
+	if (status < 0) {
 		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
 
+	while(!ocfs_node_map_is_empty(osb, &osb->recovery_map)) {
+		node_num = ocfs_node_map_first_set_bit(osb,
+						       &osb->recovery_map);
+		if (node_num < 0) {
+			LOG_TRACE_ARGS("Out of nodes to recover.\n");
+			break;
+		}
+
+		ocfs_recovery_map_clear(osb, node_num);
+		/* TODO: Figure out how we're going to save all the
+		 * local alloc stuff for after recovery on all nodes
+		 * is complete? */
+		status = ocfs_recover_node(osb, node_num);
+		if (status < 0) {
+			printk("ocfs2: Error %d recovering node %d on device "
+				"(%u,%u)!\n", status, node_num,
+			       MAJOR(osb->sb->s_dev),MINOR(osb->sb->s_dev));
+			printk("ocfs2: Volume requires unmount.\n");
+			continue;
+		}
+		atomic_dec(&osb->num_recovery_threads);
+	}
+	ocfs2_super_unlock(osb, 1);
+
+bail:
+	down(&osb->recovery_lock);
+	if (!ocfs_node_map_is_empty(osb, &osb->recovery_map)) {
+		up(&osb->recovery_lock);
+		goto restart;
+	}
+	osb->recovery_launched = 0;
+	up(&osb->recovery_lock);
+
+	wake_up_all(&osb->recovery_event);
+
 	LOG_EXIT_STATUS(status);
-
-	kfree(arg);
 	return status;
 }
 
-void ocfs_recovery_thread(ocfs_super *osb, int node_num) 
+void ocfs_recovery_thread(ocfs_super *osb, int node_num)
 {
-	struct ocfs_recover_arg *arg;
+	LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n",
+		       node_num, osb->node_num);
 
-	LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n", node_num,
-		       osb->node_num);
-
+	down(&osb->recovery_lock);
 	/* atomic_inc this here and let recover_vol dec it when
 	 * done. We do it this way to avoid races with umount. People
 	 * waiting on recovery will wait on this value to drop back
 	 * down to zero. */
 	atomic_inc(&osb->num_recovery_threads);
+	ocfs_recovery_map_set(osb, node_num);
 
-	arg = kmalloc(sizeof(struct ocfs_recover_arg), GFP_KERNEL);
-	if (arg == NULL) {
-		LOG_ERROR_STATUS(-ENOMEM);
-		goto done;
-	}
-
-	arg->osb = osb;
-	arg->node_num = node_num;
-
 	LOG_TRACE_STR("starting recovery thread...");
 
-	kernel_thread(__ocfs_recovery_thread, arg,
-		      CLONE_VM | CLONE_FS | CLONE_FILES);
+	if (!osb->recovery_launched) {
+		kernel_thread(__ocfs_recovery_thread, osb,
+			      CLONE_VM | CLONE_FS | CLONE_FILES);
+		osb->recovery_launched = 1;
+	}
 
-done:
+	up(&osb->recovery_lock);
 	wake_up_all(&osb->recovery_event);
 
 	LOG_EXIT();
@@ -1039,43 +1059,44 @@
 
 static int ocfs_recover_node(ocfs_super *osb, int node_num) 
 {
-	int status = -1;
-	int tmpstat;
+	int status = 0;
+//	int tmpstat;
+	int slot_num;
 	ocfs2_dinode *fe;
 	ocfs2_dinode *local_alloc = NULL;
 	struct inode *inode = NULL;
 	journal_t *k_journal = NULL;
 	struct buffer_head *bh = NULL;
 	ocfs_journal * journal = NULL;
-	int recovery_lock = 0, got_lock = 0, clean_orphans = 0;
+	int got_lock = 0, clean_orphans = 0;
+	ocfs2_slot_info *si = osb->slot_info;
 
-	LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n", node_num,
-		       osb->node_num);
+	LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n",
+		       node_num, osb->node_num);
 
-	if (!osb || (node_num >= osb->max_nodes)) {
-		LOG_ERROR_STATUS (status = -EINVAL);
+	printk("ocfs2_recover_node: checking node %d\n", node_num);
+
+	/* Should not ever be called to recover ourselves -- in that
+	 * case we should've called ocfs_journal_load instead. */
+	if (osb->node_num == node_num)
+		BUG();
+
+	ocfs2_update_slot_info(si);
+	slot_num = ocfs2_node_num_to_slot(si, node_num);
+	if (slot_num == OCFS_INVALID_NODE_NUM) {
+		printk("ocfs2_recover_node: no slot for this node, so no "
+		       "recovery required.\n");
 		goto done;
 	}
 
-	status = ocfs_wait_on_mount(osb);
-	if (status < 0) {
-		if (status == -EBUSY)
-			status = 0;
-		goto done;
-	}
+	printk("ocfs2_recover_node: node %d was using slot %d\n", node_num,
+	       slot_num);
+
 	journal = osb->journal;
 
-	/* Grab the local recovery resource to ensure no other thread
-	 * comes in from this node for recovery */
-	down(&(osb->recovery_lock));
-	recovery_lock = 1;
-	if (osb->disable_recovery) {
-		LOG_TRACE_STR("Shutting down so skipping reovery.");
-		goto done;
-	}
-
 	/* Ok, look up the inode for our journal */
-	inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, node_num);
+	inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
+					   slot_num);
 	if (inode == NULL) {
 		LOG_ERROR_STR("access error");
 		status = -EACCES;
@@ -1091,12 +1112,8 @@
 
 	SET_INODE_JOURNAL(inode);
 
-	/* Should not ever be called to recover ourselves -- in that
-	 * case we should've called ocfs_journal_load instead. */
-	if (osb->node_num == node_num)
-		BUG();
-
-	status = ocfs2_meta_lock(inode, NULL, &bh, 1);
+	status = ocfs2_meta_lock_flags(inode, NULL, &bh, 1,
+				       OCFS2_META_LOCK_RECOVERY);
 	if (status < 0) {
 		LOG_TRACE_ARGS("status returned from ocfs2_meta_lock=%d\n", 
 			       status);
@@ -1105,17 +1122,19 @@
 		goto done;
 	}
 	got_lock = 1;
-	
+
 	fe = (ocfs2_dinode *) bh->b_data;
 
 	if (!(fe->id1.journal1.i_flags & OCFS2_JOURNAL_DIRTY_FL)) {
 		LOG_TRACE_ARGS("No recovery required for node %d\n", node_num);
-		status = 0;
+		printk("ocfs2_recover_node: No recovery required for node "
+		       "%d\n", node_num);
 		goto clear_node;
 	}
 
-	printk("ocfs2: Recovering node %d from device (%u,%u)\n", node_num, 
-	       MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
+	printk("ocfs2: Recovering node %d from slot %d on device (%u,%u)\n",
+	       node_num, slot_num, MAJOR(osb->sb->s_dev),
+	       MINOR(osb->sb->s_dev));
 	clean_orphans = 1;
 
 	OCFS_I(inode)->ip_clusters = fe->i_clusters;
@@ -1164,8 +1183,9 @@
 	/* shutdown the journal */
 	journal_destroy(k_journal);
 
+#warning "we can't complete local alloc recovery in this function!"
 	/* recover his local alloc file, AFTER recovering his journal... */
-	status = ocfs_begin_local_alloc_recovery(osb, node_num, &local_alloc);
+	status = ocfs_begin_local_alloc_recovery(osb, slot_num, &local_alloc);
 	if (status < 0) {
 		LOG_ERROR_STATUS(status);
 		goto done;
@@ -1174,11 +1194,11 @@
 	status = 0;
 
 clear_node:
-	ocfs_recovery_map_clear(osb, node_num);
+	ocfs2_clear_slot(si, slot_num);
+	status = ocfs2_update_disk_slots(osb, si);
+	if (status < 0)
+		LOG_ERROR_STATUS(status);
 done:
-	if (recovery_lock)
-		up(&(osb->recovery_lock));
-
 	/* drop the lock on this nodes journal */
 	if (got_lock)
 		ocfs2_meta_unlock(inode, 1);
@@ -1188,28 +1208,114 @@
 
 	if (bh)
 		brelse(bh);
-
+#if 0
 	if (local_alloc && !status) {
 		tmpstat = ocfs_complete_local_alloc_recovery(osb, local_alloc);
 		if (tmpstat < 0)
 			LOG_ERROR_STATUS(tmpstat);
 	}
-
+#endif
 	if (local_alloc)
 		kfree(local_alloc);
-
+#if 0
 	if (clean_orphans && !status) {
 		tmpstat = ocfs_recover_orphans(osb);
 		if (tmpstat < 0)
 			LOG_ERROR_STATUS(tmpstat);
 	}
+#endif
 
-	atomic_dec(&osb->num_recovery_threads);
-
 	LOG_EXIT_STATUS(status);
 	return(status);
 }
 
+/* Test node liveness by trylocking his journal. If we get the lock,
+ * we drop it here. Return 0 if we got the lock, -EAGAIN if node is
+ * still alive (we couldn't get the lock) and < 0 on error. */
+static int ocfs2_trylock_journal(ocfs_super *osb,
+				 int slot_num)
+{
+	int status, flags;
+	struct inode *inode = NULL;
+
+	inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
+					   slot_num);
+	if (inode == NULL) {
+		LOG_ERROR_STR("access error");
+		status = -EACCES;
+		goto bail;
+	}
+	if (is_bad_inode (inode)) {
+		LOG_ERROR_STR("access error (bad inode)");
+		iput (inode);
+		inode = NULL;
+		status = -EACCES;
+		goto bail;
+	}
+	SET_INODE_JOURNAL(inode);
+
+	flags = OCFS2_META_LOCK_RECOVERY|OCFS2_META_LOCK_NOQUEUE;
+	status = ocfs2_meta_lock_flags(inode, NULL, NULL, 1, flags);
+	if (status < 0) {
+		if (status != -EAGAIN || status != -EINTR)
+			LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	ocfs2_meta_unlock(inode, 1);
+bail:
+	if (inode)
+		iput(inode);
+
+	return status;
+}
+
+/* Call this underneath ocfs2_super_lock. It also assumes that the
+ * slot info struct has been updated from disk. */
+int ocfs2_mark_dead_nodes(ocfs_super *osb)
+{
+	int status, i, node_num;
+	ocfs2_slot_info *si = osb->slot_info;
+
+	/* This is called with the super block cluster lock, so we
+	 * know that the slot map can't change underneath us. */
+
+	spin_lock(&si->si_lock);
+	for(i = 0; i < si->si_num_slots; i++) {
+		node_num = si->si_global_node_nums[i];
+		if (i == osb->slot_num)
+			continue;
+		if (node_num == OCFS_INVALID_NODE_NUM)
+			continue;
+		if (ocfs_node_map_test_bit(osb, &osb->recovery_map, node_num))
+			continue;
+		spin_unlock(&si->si_lock);
+
+		/* Ok, we have a slot occupied by another node which
+		 * is not in the recovery map. We trylock his journal
+		 * file here to test if he's alive. */
+		status = ocfs2_trylock_journal(osb, i);
+		if (!status) {
+			/* Since we're called from mount, we know that
+			 * the recovery thread can't race us on
+			 * setting / checking the recovery bits. */
+			ocfs_recovery_thread(osb, node_num);
+		} else if ((status < 0) && (status != -EAGAIN)) {
+			if (status != -EINTR)
+				LOG_ERROR_STATUS(status);
+			goto bail;
+		}
+
+		spin_lock(&si->si_lock);
+	}
+	spin_unlock(&si->si_lock);
+
+	status = 0;
+bail:
+	LOG_EXIT_STATUS(status);
+	return status;
+}
+
 int ocfs_recover_orphans(ocfs_super *osb)
 {
 	int status = 0;
@@ -1222,8 +1328,6 @@
 	struct ocfs2_dir_entry *de;
 	struct super_block *sb = osb->sb;
 
-	down(&osb->orphan_recovery_lock);
-
 	orphan_dir_inode = ocfs_get_system_file_inode(osb, 
 						      ORPHAN_DIR_SYSTEM_INODE, 
 						      -1);
@@ -1321,8 +1425,6 @@
 	}
 
 bail:
-	up(&osb->orphan_recovery_lock);
-
 	if (have_disk_lock)
 		ocfs2_meta_unlock(orphan_dir_inode, 0);
 

Modified: branches/dlm-glue/src/localalloc.c
===================================================================
--- branches/dlm-glue/src/localalloc.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/localalloc.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -147,7 +147,7 @@
 
 	/* read the alloc off disk */
 	inode = ocfs_get_system_file_inode(osb, LOCAL_ALLOC_SYSTEM_INODE, 
-					   osb->node_num);
+					   osb->slot_num);
 	if (!inode) {
 		LOG_ERROR_STATUS(status=-EINVAL);
 		goto bail;
@@ -236,7 +236,7 @@
 	local_alloc_inode = 
 		ocfs_get_system_file_inode(osb, 
 					   LOCAL_ALLOC_SYSTEM_INODE,
-					   osb->node_num);
+					   osb->slot_num);
 	if (!local_alloc_inode) {
 		status = -ENOENT;
 		LOG_ERROR_STATUS(status);
@@ -348,7 +348,7 @@
  * caller to process with ocfs_complete_local_alloc_recovery
  */
 int ocfs_begin_local_alloc_recovery(ocfs_super *osb, 
-				    int node_num, 
+				    int slot_num, 
 				    ocfs2_dinode **alloc_copy)
 {
 	int status = 0;
@@ -356,13 +356,13 @@
 	struct inode *inode = NULL;
 	ocfs2_dinode *alloc;
 
-	LOG_ENTRY_ARGS("(node_num = %d)\n", node_num);
+	LOG_ENTRY_ARGS("(slot_num = %d)\n", slot_num);
 
 	*alloc_copy = NULL;
 
 	inode = ocfs_get_system_file_inode(osb, 
 					   LOCAL_ALLOC_SYSTEM_INODE, 
-					   node_num);
+					   slot_num);
 	if (!inode) {
 		LOG_ERROR_STATUS(status=-EINVAL);
 		goto bail;
@@ -501,7 +501,7 @@
 	local_alloc_inode = 
 		ocfs_get_system_file_inode(osb, 
 					   LOCAL_ALLOC_SYSTEM_INODE,
-					   osb->node_num);
+					   osb->slot_num);
 	if (!local_alloc_inode) {
 		status = -ENOENT;
 		LOG_ERROR_STATUS(status);

Modified: branches/dlm-glue/src/namei.c
===================================================================
--- branches/dlm-glue/src/namei.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/namei.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -523,7 +523,7 @@
 	fe->i_generation = cpu_to_le32(inode->i_generation);
 	fe->i_blkno = fe_blkno;
 	fe->i_suballoc_bit = suballoc_bit;
-	fe->i_suballoc_node = osb->node_num;
+	fe->i_suballoc_node = osb->slot_num;
 	fe->i_uid = current->fsuid;
 	if (dir->i_mode & S_ISGID) {
 		fe->i_gid = dir->i_gid;

Modified: branches/dlm-glue/src/ocfs.h
===================================================================
--- branches/dlm-glue/src/ocfs.h	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/ocfs.h	2004-11-19 23:44:28 UTC (rev 1655)
@@ -189,6 +189,7 @@
 enum ocfs2_lock_type {
 	OCFS_TYPE_META = 0,
 	OCFS_TYPE_DATA,
+	OCFS_TYPE_SUPER,
 	OCFS_NUM_LOCK_TYPES
 };
 
@@ -204,8 +205,7 @@
 #define OCFS2_LOCK_REFRESHING    (0x00000020)
 
 typedef struct _ocfs2_lock_res {
-	/* do i need this or can't i just use container_of? */
-	struct inode            *l_inode;
+	void                    *l_priv;
 	spinlock_t               l_lock;
 
 	enum ocfs2_lock_type     l_type;
@@ -343,6 +343,7 @@
 } ocfs_alloc_stats;
 
 struct _ocfs_journal;
+struct _ocfs2_slot_info;
 
 /*
  * ocfs_super
@@ -360,9 +361,12 @@
 	struct inode *sys_root_inode;
 	struct inode *system_inodes[NUM_SYSTEM_INODES];
 
+	struct _ocfs2_slot_info *slot_info;
+
 	spinlock_t node_map_lock;
 	ocfs_node_map node_map;
 	ocfs_node_map recovery_map;
+	ocfs_node_map umount_map;
 
 	/* new */
 	u32 num_clusters;
@@ -384,8 +388,7 @@
 	u16 max_nodes;
 	u16 num_nodes;
 	s16 node_num;
-	s16 global_node_num;
-	unsigned int slot_num;
+	s16 slot_num;
 	int reclaim_id;		/* reclaim the original node number*/
 	int s_sectsize_bits;
 	int s_clustersize;
@@ -393,8 +396,8 @@
 	struct proc_dir_entry *proc_sub_dir; /* points to /proc/fs/ocfs2/<maj_min> */
 
 	atomic_t vol_state;
-	struct semaphore orphan_recovery_lock;
 	struct semaphore recovery_lock;
+	int recovery_launched;
 	int disable_recovery;
 	atomic_t num_recovery_threads;
 	wait_queue_head_t flush_event;
@@ -402,15 +405,24 @@
 	struct _ocfs_journal *journal;
 	atomic_t clean_buffer_seq;
 	spinlock_t clean_buffer_lock;
+
 	int have_local_alloc;
 	struct buffer_head *local_alloc_bh;
+
+	/* Next two fields are for local node slot recovery during
+	 * mount. */
+	int dirty;
+	ocfs2_dinode *local_alloc_copy;
+
 	ocfs_dlm_stats net_reqst_stats;	/* stats of netdlm vote requests */
 	ocfs_dlm_stats net_reply_stats;	/* stats of netdlm vote reponses */
 	ocfs_alloc_stats alloc_stats;
 	char dev_str[20];		/* "major,minor" of the device */
 
 	char *group_name;
+	struct inode *group_inode;
 	dlm_ctxt *dlm;
+	ocfs2_lock_res super_lockres;
 
 	wait_queue_head_t recovery_event;
 
@@ -419,6 +431,7 @@
 	wait_queue_head_t vote_event;
 	atomic_t wake_vote_task;
 	int vote_exit;
+	int blocked_super_count;
 	struct list_head blocked_inode_list;
 	int blocked_inode_count;
 	struct list_head vote_list;
@@ -434,7 +447,6 @@
 	kmem_cache_t *inode_cache;
 	kmem_cache_t *lock_cache;
 	__u32 flags;
-	__s16 pref_node_num;		/* preferred... osb has the real one */
 	char *node_name;		/* human readable node identification */
 	char *cluster_name;		/* unused */
 	int comm_info_read;		/* ipc info loaded from config file */

Modified: branches/dlm-glue/src/ocfs2_fs.h
===================================================================
--- branches/dlm-glue/src/ocfs2_fs.h	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/ocfs2_fs.h	2004-11-19 23:44:28 UTC (rev 1655)
@@ -116,13 +116,13 @@
 #define OCFS2_DEFAULT_JOURNAL_SIZE	(8 * ONE_MEGA_BYTE)
 #define OCFS2_MIN_JOURNAL_SIZE		(4 * ONE_MEGA_BYTE)
 
-
 /* System file index */
 enum {
 	BAD_BLOCK_SYSTEM_INODE = 0,
 	GLOBAL_INODE_ALLOC_SYSTEM_INODE,
+	SLOT_MAP_SYSTEM_INODE,
+#define OCFS2_FIRST_ONLINE_SYSTEM_INODE SLOT_MAP_SYSTEM_INODE
 	DLM_SYSTEM_INODE,
-#define OCFS2_FIRST_ONLINE_SYSTEM_INODE DLM_SYSTEM_INODE
 	GLOBAL_BITMAP_SYSTEM_INODE,
 	ORPHAN_DIR_SYSTEM_INODE,
 #define OCFS2_LAST_GLOBAL_SYSTEM_INODE ORPHAN_DIR_SYSTEM_INODE
@@ -140,6 +140,7 @@
 	[GLOBAL_INODE_ALLOC_SYSTEM_INODE] 	"global_inode_alloc",
 
 	/* These are used by the running filesystem */
+	[SLOT_MAP_SYSTEM_INODE]			"slot_map",
 	[DLM_SYSTEM_INODE]			"dlm",
 	[GLOBAL_BITMAP_SYSTEM_INODE]		"global_bitmap",
 	[ORPHAN_DIR_SYSTEM_INODE]		"orphan_dir",
@@ -174,9 +175,11 @@
  */
 #define OCFS2_DIR_PAD			4
 #define OCFS2_DIR_ROUND			(OCFS2_DIR_PAD - 1)
-#define OCFS2_DIR_REC_LEN(name_len)	(((name_len) + 12 + \
+#define OCFS2_DIR_MEMBER_LEN 		offsetof(struct ocfs2_dir_entry, name)
+#define OCFS2_DIR_REC_LEN(name_len)	(((name_len) + OCFS2_DIR_MEMBER_LEN + \
                                           OCFS2_DIR_ROUND) & \
 					 ~OCFS2_DIR_ROUND)
+
 #define OCFS2_LINK_MAX		32000
 
 #define S_SHIFT			12

Modified: branches/dlm-glue/src/ocfs_journal.h
===================================================================
--- branches/dlm-glue/src/ocfs_journal.h	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/ocfs_journal.h	2004-11-19 23:44:28 UTC (rev 1655)
@@ -59,7 +59,6 @@
 					       * which we usually run
 					       * from (recovery,
 					       * etc)                     */
-	__u32                     node_num;   /* Whose journal are we?    */
 	struct buffer_head        *lockbh;    /* Journal disk lock, used 
 						 to access file entry	  */
 	atomic_t                  num_trans;  /* Number of transactions 
@@ -179,7 +178,7 @@
 
 /*
  *  Journal Control:
- *  Initialize, Load, Shutdown, Wipe, Create a journal.
+ *  Initialize, Load, Shutdown, Wipe a journal.
  *  
  *  ocfs_journal_init     - Initialize journal structures in the OSB.
  *  ocfs_journal_load     - Load the given journal off disk. Replay it if
@@ -189,6 +188,8 @@
  *  ocfs_journal_wipe     - Wipe transactions from a journal. Optionally 
  *                          zero out each block.
  *  ocfs_recovery_thread  - Perform recovery on a node. osb is our own osb.
+ *  ocfs2_mark_dead_nodes - Start recovery on nodes we won't get a heartbeat
+ *                          event on.
  *  ocfs_start_checkpoint - Kick the commit thread to do a checkpoint.
  */
 int    ocfs_journal_init(struct _ocfs_super *osb, int *dirty);
@@ -196,6 +197,7 @@
 int    ocfs_journal_wipe(ocfs_journal *journal, int full);
 int    ocfs_journal_load(ocfs_journal *journal);
 void   ocfs_recovery_thread(struct _ocfs_super *osb, int node_num);
+int    ocfs2_mark_dead_nodes(ocfs_super *osb);
 static inline void ocfs_start_checkpoint(struct _ocfs_super *osb)
 {
 	atomic_set(&osb->flush_event_woken, 1);

Modified: branches/dlm-glue/src/ocfs_log.h
===================================================================
--- branches/dlm-glue/src/ocfs_log.h	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/ocfs_log.h	2004-11-19 23:44:28 UTC (rev 1655)
@@ -100,7 +100,7 @@
 #define OCFS_DEBUG_CONTEXT_ALLOC       0x00000001	/* alloc.c    */
 #define OCFS_DEBUG_CONTEXT_DIR         0x00000002	/* dir.c      */
 #define OCFS_DEBUG_CONTEXT_EXTMAP      0x00000004	/* extmap.c   */
-#define OCFS_DEBUG_CONTEXT_UNUSED1     0x00000008	/*            */
+#define OCFS_DEBUG_CONTEXT_SLOTMAP     0x00000008	/*            */
 #define OCFS_DEBUG_CONTEXT_IOCTL       0x00000010	/* ioctl.c    */
 #define OCFS_DEBUG_CONTEXT_UNUSED2     0x00000020	/*            */
 #define OCFS_DEBUG_CONTEXT_PROC        0x00000040	/* proc.c     */

Modified: branches/dlm-glue/src/proc.c
===================================================================
--- branches/dlm-glue/src/proc.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/proc.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -51,6 +51,7 @@
 static int ocfs_proc_dlm_stats(char *page, char **start, off_t off, int count, int *eof, void *data);
 static int ocfs_proc_version (char *page, char **start, off_t off, int count, int *eof, void *data);
 static int ocfs_proc_nodenum (char *page, char **start, off_t off, int count, int *eof, void *data);
+static int ocfs_proc_slotnum (char *page, char **start, off_t off, int count, int *eof, void *data);
 static int ocfs_proc_nodename (char *page, char **start, off_t off, int count, int *eof, void *data);
 static int ocfs_proc_mountpoint (char *page, char **start, off_t off, int count, int *eof, void *data);
 static int ocfs_proc_statistics (char *page, char **start, off_t off, int count, int *eof, void *data);
@@ -75,6 +76,7 @@
 ocfs_proc_list sub_dir[] = {
 	{ "nodenum", NULL, ocfs_proc_nodenum },
 	{ "mountpoint", NULL, ocfs_proc_mountpoint },
+	{ "slotnum", NULL, ocfs_proc_slotnum },
 	{ "statistics", NULL, ocfs_proc_statistics },
 	{ "lockstat", NULL, ocfs_proc_dlm_stats },
 	{ "device", NULL, ocfs_proc_device },
@@ -297,6 +299,29 @@
 }				/* ocfs_proc_nodenum */
 
 /*
+ * ocfs_proc_slotnum()
+ *
+ */
+static int ocfs_proc_slotnum (char *page, char **start, off_t off,
+			      int count, int *eof, void *data)
+{
+	int len;
+	int ret;
+	ocfs_super *osb;
+
+	LOG_ENTRY ();
+
+	osb = data;
+	sprintf (page, "%d\n", osb->slot_num);
+	len = strlen (page);
+
+	ret = ocfs_proc_calc_metrics (page, start, off, count, eof, len);
+
+	LOG_EXIT_INT (ret);
+	return ret;
+}				/* ocfs_proc_slotnum */
+
+/*
  * ocfs_proc_nodename()
  *
  */
@@ -397,46 +422,25 @@
 				 int count, int *eof, void *data)
 {
 	int len;
-	char *pubmap = NULL;
 	ocfs_super *osb;
-	int ret = 0, i;
-	char *ptr;
+	int ret = 0;
 
 	LOG_ENTRY ();
 
 	osb = data;
 
-	pubmap = ocfs_malloc (100);
-	if (!pubmap) {
-		LOG_ERROR_STATUS (-ENOMEM);
-		goto bail;
-	}
-
-	ptr = pubmap;
-	for (i = 0; i < osb->max_nodes; i++) {
-		if (ocfs_node_map_test_bit(osb, &osb->node_map, i))
-			ptr += sprintf (ptr, "%d ", i);
-	}
-	if (pubmap != ptr)
-		*(ptr - 1) = '\0';
-
 #define PROC_STATS                             \
-  "Publish map              : %s\n"		\
   "Number of nodes          : %u\n"		\
   "Cluster size             : %d\n"		\
   "Volume size              : %llu\n"		\
   "Open Transactions:       : %u\n"		
 
-	len = sprintf (page, PROC_STATS, pubmap,
-		       osb->num_nodes, osb->s_clustersize, 
+	len = sprintf (page, PROC_STATS, osb->num_nodes, osb->s_clustersize, 
 		       ocfs2_clusters_to_bytes(osb->sb, osb->num_clusters),
 		       atomic_read(&osb->journal->num_trans));
 
 	ret = ocfs_proc_calc_metrics (page, start, off, count, eof, len);
 
-bail:
-	if (pubmap)
-		kfree(pubmap);
 	LOG_EXIT_INT (ret);
 	return ret;
 }				/* ocfs_proc_statistics */

Added: branches/dlm-glue/src/slot_map.c
===================================================================
--- branches/dlm-glue/src/slot_map.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/slot_map.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -0,0 +1,280 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * slot_map.c
+ *
+ *
+ *
+ * Copyright (C) 2002, 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+
+#include "ocfs_compat.h"
+
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+#include <linux/smp_lock.h>
+
+#include "ocfs_log.h"
+#include "ocfs.h"
+#include "ocfs2.h"
+
+#include "dlmglue.h"
+#include "extent_map.h"
+#include "slot_map.h"
+#include "sysfile.h"
+
+#include "buffer_head_io.h"
+
+#define OCFS_DEBUG_CONTEXT  OCFS_DEBUG_CONTEXT_SLOTMAP
+
+static int ocfs2_update_disk_slots(ocfs_super *osb,
+				   ocfs2_slot_info *si);
+static s16 __ocfs2_node_num_to_slot(ocfs2_slot_info *si,
+				    s16 global);
+static void __ocfs2_fill_slot(ocfs2_slot_info *si,
+			      s16 slot_num,
+			      s16 node_num);
+
+/* post the slot information on disk into our slot_info struct. */
+void ocfs2_update_slot_info(ocfs2_slot_info *si)
+{
+	int i;
+	s16 *disk_info;
+
+	/* we don't read the slot block here as ocfs2_super_lock
+	 * should've made sure we have the most recent copy. */
+	spin_lock(&si->si_lock);
+	disk_info = (s16 *) si->si_bh->b_data;
+
+	for (i = 0; i < si->si_size; i++)
+		si->si_global_node_nums[i] = le16_to_cpu(disk_info[i]);
+
+	spin_unlock(&si->si_lock);
+}
+
+/* post the our slot info stuff into it's destination bh and write it
+ * out. */
+int ocfs2_update_disk_slots(ocfs_super *osb,
+			    ocfs2_slot_info *si)
+{
+	int status, i;
+	s16 *disk_info = (s16 *) si->si_bh->b_data;
+
+	spin_lock(&si->si_lock);
+	for (i = 0; i < si->si_size; i++)
+		disk_info[i] = cpu_to_le16(si->si_global_node_nums[i]);
+	spin_unlock(&si->si_lock);
+
+	status = ocfs_write_block(osb, si->si_bh, si->si_inode);
+	if (status < 0)
+		LOG_ERROR_STATUS(status);
+
+	return status;
+}
+
+/* try to find global node in the slot info. Returns
+ * OCFS_INVALID_NODE_NUM if nothing is found. */
+static s16 __ocfs2_node_num_to_slot(ocfs2_slot_info *si,
+				    s16 global)
+{
+	int i;
+	s16 ret = OCFS_INVALID_NODE_NUM;
+
+	for(i = 0; i < si->si_num_slots; i++) {
+		if (global == si->si_global_node_nums[i]) {
+			ret = (s16) i;
+			break;
+		}
+	}
+	return ret;
+}
+
+s16 ocfs2_node_num_to_slot(ocfs2_slot_info *si,
+			   s16 global)
+{
+	s16 ret;
+
+	spin_lock(&si->si_lock);
+	ret = __ocfs2_node_num_to_slot(si, global);
+	spin_unlock(&si->si_lock);
+}
+
+static void __ocfs2_fill_slot(ocfs2_slot_info *si,
+			      s16 slot_num,
+			      s16 node_num)
+{
+	OCFS_ASSERT(slot_num != OCFS_INVALID_NODE_NUM);
+	OCFS_ASSERT(slot_num < si->si_num_slots);
+	OCFS_ASSERT((node_num == OCFS_INVALID_NODE_NUM) || 
+		    (node_num < OCFS2_MAX_NODES));
+
+	si->si_global_node_nums[slot_num] = node_num;
+}
+
+void ocfs2_clear_slot(ocfs2_slot_info *si,
+		      s16 slot_num)
+{
+	spin_lock(&si->si_lock);
+	__ocfs2_fill_slot(si, slot_num, OCFS_INVALID_NODE_NUM);
+	spin_unlock(&si->si_lock);
+}
+
+int ocfs2_init_slot_info(ocfs_super *osb)
+{
+	int status, i;
+	u64 blkno;
+	struct inode *inode = NULL;
+	struct buffer_head *bh = NULL;
+	ocfs2_slot_info *si;
+
+	si = kmalloc(sizeof(ocfs2_slot_info), GFP_KERNEL);
+	if (!si) {
+		status = -ENOMEM;
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+	memset(si, 0, sizeof(ocfs2_slot_info));
+	spin_lock_init(&si->si_lock);
+	si->si_num_slots = osb->max_nodes;
+	si->si_size = OCFS2_MAX_NODES;
+
+	for(i = 0; i < si->si_num_slots; i++)
+		si->si_global_node_nums[i] = OCFS_INVALID_NODE_NUM;
+
+	inode = ocfs_get_system_file_inode(osb, SLOT_MAP_SYSTEM_INODE, -1);
+	if (!inode) {
+		LOG_ERROR_STATUS(status = -EINVAL);
+		goto bail;
+	}
+
+	status = ocfs2_extent_map_get_blocks(inode, 0ULL, 1, &blkno, NULL);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	status = ocfs_read_block(osb, blkno, &bh, 0, inode);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	si->si_inode = inode;
+	si->si_bh = bh;
+	osb->slot_info = si;
+bail:
+	if (status < 0 && si)
+		ocfs2_free_slot_info(si);
+
+	return status;
+}
+
+void ocfs2_free_slot_info(ocfs2_slot_info *si)
+{
+	if (si->si_inode)
+		iput(si->si_inode);
+	if (si->si_bh)
+		brelse(si->si_bh);
+	kfree(si);
+}
+
+int ocfs2_find_slot(ocfs_super *osb)
+{
+	int status;
+	s16 slot;
+	ocfs2_slot_info *si;
+
+	LOG_ENTRY();
+
+	si = osb->slot_info;
+
+	ocfs2_update_slot_info(si);
+
+	spin_lock(&si->si_lock);
+	/* search for ourselves first and take the slot if it already
+	 * exists. Perhaps we need to mark this in a variable for our
+	 * own journal recovery? Possibly not, though we certainly
+	 * need to warn to the user */
+	slot = __ocfs2_node_num_to_slot(si, osb->node_num);
+	if (slot == OCFS_INVALID_NODE_NUM) {
+		/* if no slot yet, then just take 1st available
+		 * one. */
+		slot = __ocfs2_node_num_to_slot(si, OCFS_INVALID_NODE_NUM);
+		if (slot == OCFS_INVALID_NODE_NUM) {
+			spin_unlock(&si->si_lock);
+			printk("ocfs2: no free slots available!\n");
+			status = -EINVAL;
+			goto bail;
+		}
+	} else
+		printk("ocfs2: slot %d is already allocated to this node!\n",
+		       slot);
+
+	__ocfs2_fill_slot(si, slot, osb->node_num);
+	osb->slot_num = slot;
+	spin_unlock(&si->si_lock);
+
+	printk("ocfs2: taking node slot %d\n", osb->slot_num);
+
+	status = ocfs2_update_disk_slots(osb, si);
+	if (status < 0)
+		LOG_ERROR_STATUS(status);
+
+bail:
+	LOG_EXIT_STATUS(status);
+	return status;
+}
+
+void ocfs2_put_slot(ocfs_super *osb)
+{
+	int status;
+	ocfs2_slot_info *si = osb->slot_info;
+
+	if (!si)
+		return;
+
+	status = ocfs2_request_umount_vote(osb);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	/* so what happens if someone does recovery while we're
+	 * waiting for the ex? */
+
+	/* cluster lock */
+
+	ocfs2_update_slot_info(si);
+
+	spin_lock(&si->si_lock);
+	__ocfs2_fill_slot(si, osb->slot_num, OCFS_INVALID_NODE_NUM);
+	osb->slot_num = OCFS_INVALID_NODE_NUM;
+	spin_unlock(&si->si_lock);
+
+	ocfs2_update_disk_slots(osb, si);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+bail:
+	osb->slot_info = NULL;
+	kfree(si);
+}
+

Added: branches/dlm-glue/src/slot_map.h
===================================================================
--- branches/dlm-glue/src/slot_map.h	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/slot_map.h	2004-11-19 23:44:28 UTC (rev 1655)
@@ -0,0 +1,55 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * slotmap.h
+ *
+ * description here
+ *
+ * Copyright (C) 2002, 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+
+
+#ifndef SLOTMAP_H
+#define SLOTMAP_H
+
+typedef struct _ocfs2_slot_info {
+	spinlock_t si_lock;
+
+       	struct inode *si_inode;
+	struct buffer_head *si_bh;
+	unsigned int si_num_slots;
+	unsigned int si_size;
+	s16 si_global_node_nums[OCFS2_MAX_NODES];
+} ocfs2_slot_info;
+
+int ocfs2_init_slot_info(ocfs_super *osb);
+void ocfs2_free_slot_info(ocfs2_slot_info *si);
+
+int ocfs2_find_slot(ocfs_super *osb);
+void ocfs2_put_slot(ocfs_super *osb);
+
+void ocfs2_update_slot_info(ocfs2_slot_info *si);
+int ocfs2_update_disk_slots(ocfs_super *osb,
+			    ocfs2_slot_info *si);
+
+s16 ocfs2_node_num_to_slot(ocfs2_slot_info *si,
+			   s16 global);
+void ocfs2_clear_slot(ocfs2_slot_info *si,
+		      s16 slot_num);
+
+#endif

Modified: branches/dlm-glue/src/suballoc.c
===================================================================
--- branches/dlm-glue/src/suballoc.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/suballoc.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -491,7 +491,7 @@
 #ifndef OCFS_USE_ALL_METADATA_SUBALLOCATORS
 	alloc_inode = ocfs_get_system_file_inode(osb, EXTENT_ALLOC_SYSTEM_INODE, 0);
 #else
-	alloc_inode = ocfs_get_system_file_inode(osb, EXTENT_ALLOC_SYSTEM_INODE, osb->node_num);
+	alloc_inode = ocfs_get_system_file_inode(osb, EXTENT_ALLOC_SYSTEM_INODE, osb->slot_num);
 #endif
 	if (!alloc_inode) {
 		status = -ENOMEM;
@@ -540,7 +540,7 @@
 	(*ac)->ac_handle = handle;
 	(*ac)->ac_which = OCFS_AC_USE_INODE;
 
-	alloc_inode = ocfs_get_system_file_inode(osb, INODE_ALLOC_SYSTEM_INODE, osb->node_num);
+	alloc_inode = ocfs_get_system_file_inode(osb, INODE_ALLOC_SYSTEM_INODE, osb->slot_num);
 	if (!alloc_inode) {
 		status = -ENOMEM;
 		LOG_ERROR_STATUS(status);

Modified: branches/dlm-glue/src/super.c
===================================================================
--- branches/dlm-glue/src/super.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/super.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -61,6 +61,7 @@
 #include "journal.h"
 #include "localalloc.h"
 #include "proc.h"
+#include "slot_map.h"
 #include "super.h"
 #include "sysfile.h"
 #include "util.h"
@@ -156,6 +157,7 @@
 static int ocfs_init_local_system_inodes(ocfs_super *osb);
 static int ocfs_release_system_inodes(ocfs_super *osb);
 static int ocfs2_fill_node_info(ocfs_super *osb);
+static int ocfs2_complete_mount_recovery(ocfs_super *osb);
 static int ocfs_check_volume(ocfs_super * osb);
 static int ocfs_verify_volume(ocfs2_dinode *di, struct buffer_head *bh,
 			      __u32 sectsize);
@@ -239,7 +241,7 @@
 
 	for (i = OCFS2_FIRST_ONLINE_SYSTEM_INODE;
 	     i <= OCFS2_LAST_GLOBAL_SYSTEM_INODE; i++) {
-		new = ocfs_get_system_file_inode(osb, i, osb->node_num);
+		new = ocfs_get_system_file_inode(osb, i, osb->slot_num);
 		if (!new) {
 			ocfs_release_system_inodes(osb);
 			LOG_ERROR_STATUS(status = -EINVAL);
@@ -268,7 +270,7 @@
 	LOG_ENTRY();
 
 	for (i = OCFS2_LAST_GLOBAL_SYSTEM_INODE + 1; i < NUM_SYSTEM_INODES ; i++) {
-		new = ocfs_get_system_file_inode(osb, i, osb->node_num);
+		new = ocfs_get_system_file_inode(osb, i, osb->slot_num);
 		if (!new) {
 			ocfs_release_system_inodes(osb);
 			LOG_ERROR_STATUS(status = -EINVAL);
@@ -364,11 +366,20 @@
 
 	sb->s_root = root;
 
-	printk ("ocfs2: Mounting device (%u,%u) on %s (node %d)\n",
+	printk ("ocfs2: Mounting device (%u,%u) on %s (node %d, slot %d)\n",
 		MAJOR(sb->s_dev), MINOR(sb->s_dev),
-		OcfsGlobalCtxt.node_name, osb->node_num);
+		OcfsGlobalCtxt.node_name, osb->node_num, osb->slot_num);
 
 	atomic_set(&osb->vol_state, VOLUME_MOUNTED);
+
+	if (osb->dirty) {
+		/* This must happen *after* setting the volume to
+		 * MOUNTED as we may sleep on any recovery threads. */
+		status = ocfs2_complete_mount_recovery(osb);
+		if (status < 0)
+			LOG_EXIT_STATUS(status);
+	}
+
 	LOG_EXIT_STATUS(status);
 	return status;		
 
@@ -845,15 +856,14 @@
 		goto bail;
 	}
 
-	osb->global_node_num = osb->node_num = nm_this_node(group);
+	osb->group_inode = group;
+	osb->node_num = nm_this_node(group);
 
-	printk("ocfs2: I am node %u, a member of group %s\n", osb->node_num,
+	printk("ocfs2: I am node %d, a member of group %s\n", osb->node_num,
 	       osb->group_name);
 
 	status = 0;
 bail:
-	if (group)
-		iput(group);
 
 	return status;
 }
@@ -864,10 +874,10 @@
  */
 static int ocfs_mount_volume (struct super_block *sb, int reclaim_id, struct inode *root)
 {
-	int status;
-	ocfs_super *osb;
+	int status, sector_size;
+	int unlock_super = 0;
+	ocfs_super *osb = NULL;
 	struct buffer_head *bh = NULL;
-	int sector_size;
 
 	LOG_ENTRY ();
 
@@ -920,6 +930,13 @@
 		goto leave;
 	}
 
+	status = ocfs2_super_lock(osb, 1);
+	if (status < 0) {
+		LOG_ERROR_STATUS (status);
+		goto leave;
+	}
+	unlock_super = 1;
+
 	/* This will load up the node map and add ourselves to it. */
 	status = ocfs2_find_slot(osb);
 	if (status < 0) {
@@ -946,7 +963,19 @@
 		goto leave;
 	}
 
+	/* This should be sent *after* we recovered our journal as it
+	 * will cause other nodes to unmark us as needing
+	 * recovery. However, we need to send it *before* dropping the
+	 * super block lock as otherwise their recovery threads might
+	 * try to clean us up while we're live! */
+	status = ocfs2_request_mount_vote(osb);
+	if (status < 0)
+		LOG_ERROR_STATUS (status);
+
 leave:
+	if (unlock_super)
+		ocfs2_super_unlock(osb, 1);
+
 	if (bh != NULL)
 		brelse(bh);
 	LOG_EXIT_STATUS (status);
@@ -989,6 +1018,8 @@
 	/* Dismount */
 	OCFS_SET_FLAG (osb->osb_flags, OCFS_OSB_FLAGS_BEING_DISMOUNTED);
 
+	ocfs2_put_slot(osb);
+
 	ocfs2_dlm_shutdown(osb);
 
 	ocfs2_clear_hb_callbacks(osb);
@@ -1103,7 +1134,6 @@
 		 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
 
 	init_MUTEX (&(osb->recovery_lock));
-	init_MUTEX (&(osb->orphan_recovery_lock));
 
 	osb->disable_recovery = 0;
 
@@ -1115,6 +1145,7 @@
 	spin_lock_init (&osb->clean_buffer_lock);
 
 	osb->node_num = OCFS_INVALID_NODE_NUM;
+	osb->slot_num = OCFS_INVALID_NODE_NUM;
 
 	osb->have_local_alloc = 0;
 	osb->local_alloc_bh = NULL;
@@ -1236,8 +1267,11 @@
 	printk("cluster bitmap inode: %llu, clusters per group: %u\n",
 	       osb->bitmap_blkno, osb->bitmap_cpg);
 
-	/* We might need to add a variable in Global List of osb to */
-	/* delay any creation, if any other node is already creating a file */
+	status = ocfs2_init_slot_info(osb);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
 
 	/*  Link this osb onto the global linked list of all osb structures. */
 	/*  The Global Link List is mainted for the whole driver . */
@@ -1324,6 +1358,35 @@
 	return status;
 }				/* ocfs_verify_volume */
 
+/* This part of local node recovery needs to happen after we've
+ * discovered all other nodes that need recovery and we've recovered
+ * them. */
+static int ocfs2_complete_mount_recovery(ocfs_super *osb)
+{
+	int status = 0;
+	ocfs2_dinode *local_alloc = osb->local_alloc_copy;
+
+	osb->local_alloc_copy = NULL;
+
+	if (osb->dirty) {
+		status = ocfs_complete_local_alloc_recovery(osb, local_alloc);
+		if (status < 0) {
+			LOG_ERROR_STATUS(status);
+			goto finally;
+		}
+
+		status = ocfs_recover_orphans(osb);
+		if (status < 0)
+			LOG_ERROR_STATUS(status);
+	}
+	osb->dirty = 0;
+
+finally:
+	if (local_alloc)
+		kfree(local_alloc);
+	return status;
+}
+
 /*
  * ocfs_check_volume()
  *
@@ -1331,7 +1394,6 @@
 static int ocfs_check_volume (ocfs_super * osb)
 {
 	int status = 0;
-	int node_num = osb->node_num;
 	int dirty;
 	ocfs2_dinode *local_alloc = NULL; /* only used if we
 					   * recover
@@ -1366,7 +1428,7 @@
 	if (dirty) {
 		/* recover my local alloc if we didn't unmount cleanly. */
 		status = ocfs_begin_local_alloc_recovery(osb,
-							 node_num,
+							 osb->slot_num,
 							 &local_alloc);
 		if (status < 0) {
 			LOG_ERROR_STATUS(status);
@@ -1379,21 +1441,26 @@
 	LOG_TRACE_STR("Journal loaded.");
 
 	status = ocfs_load_local_alloc(osb);
-	if (status < 0)
+	if (status < 0) {
 		LOG_ERROR_STATUS(status);
+		goto finally;
+	}
 
 	if (dirty) {
-		status = ocfs_complete_local_alloc_recovery(osb, local_alloc);
-		if (status < 0) {
-			LOG_ERROR_STATUS(status);
-			goto finally;
-		}
-
-		status = ocfs_recover_orphans(osb);
-		if (status < 0)
-			LOG_ERROR_STATUS(status);
+		/* Recovery will be completed after we've mounted the
+		 * rest of the volume. */
+		osb->dirty = 1;
+		osb->local_alloc_copy = local_alloc;
+		local_alloc = NULL;
 	}
 
+	/* go through each journal, trylock it and if you get the
+	 * lock, and it's marked as dirty, set the bit in the recover
+	 * map and launch a recovery thread for it. */
+	status = ocfs2_mark_dead_nodes(osb);
+	if (status < 0)
+		LOG_ERROR_STATUS(status);
+
 finally:
 	if (local_alloc)
 		kfree(local_alloc);
@@ -1424,6 +1491,12 @@
 		list_del (&(osb->osb_next));
 	up (&(OcfsGlobalCtxt.global_res));
 
+	if (osb->slot_info)
+		ocfs2_free_slot_info(osb->slot_info);
+
+	if (osb->group_inode)
+		iput(osb->group_inode);
+
 	/* FIXME
 	 * This belongs in journal shutdown, but because we have to
 	 * allocate osb->journal at the start of ocfs_initalize_osb(),
@@ -1432,7 +1505,8 @@
 	kfree(osb->journal);
 	if (osb->group_name)
 		kfree(osb->group_name);
-
+	if (osb->local_alloc_copy)
+		kfree(osb->local_alloc_copy);
 	memset (osb, 0, sizeof (ocfs_super));
 
 	LOG_EXIT ();

Modified: branches/dlm-glue/src/sysfile.c
===================================================================
--- branches/dlm-glue/src/sysfile.c	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/sysfile.c	2004-11-19 23:44:28 UTC (rev 1655)
@@ -46,10 +46,10 @@
 /* Tracing */
 #define OCFS_DEBUG_CONTEXT    OCFS_DEBUG_CONTEXT_SYSFILE
 
-static struct inode * _ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 node);
+static struct inode * _ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 slot);
 
 static inline int is_global_system_inode(int type);
-static inline int is_in_system_inode_array(ocfs_super *osb, int type, __u32 node);
+static inline int is_in_system_inode_array(ocfs_super *osb, int type, __u32 slot);
 
 static inline int is_global_system_inode(int type)
 {
@@ -57,19 +57,19 @@
 		type <= OCFS2_LAST_GLOBAL_SYSTEM_INODE);
 }
 
-static inline int is_in_system_inode_array(ocfs_super *osb, int type, __u32 node)
+static inline int is_in_system_inode_array(ocfs_super *osb, int type, __u32 slot)
 {
-	return (node == osb->node_num || is_global_system_inode(type));
+	return (slot == osb->slot_num || is_global_system_inode(type));
 }
 
 struct inode *ocfs_get_system_file_inode(ocfs_super *osb, int type,
-					 __u32 node)
+					 __u32 slot)
 {
 	struct inode *inode = NULL;
 	struct inode **arr = NULL;
 
 	/* avoid the lookup if cached in local system file array */
-	if (is_in_system_inode_array(osb, type, node))
+	if (is_in_system_inode_array(osb, type, slot))
 		arr = &(osb->system_inodes[type]);
 
 	if (arr && ((inode = *arr) != NULL)) {
@@ -82,7 +82,7 @@
 	}
 	
 	/* this gets one ref thru iget */
-	inode = _ocfs_get_system_file_inode(osb, type, node);
+	inode = _ocfs_get_system_file_inode(osb, type, slot);
 
 	/* add one more if putting into array for first time */
 	if (arr && inode) {
@@ -93,7 +93,7 @@
 	return inode;
 }
 
-static struct inode * _ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 node)
+static struct inode * _ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 slot)
 {
 	char namebuf[40];
 	struct inode *inode = NULL;
@@ -104,7 +104,7 @@
 
 	ocfs2_sprintf_system_inode_name(namebuf,
 					sizeof(namebuf),
-					type, node);
+					type, slot);
 	
 	status = ocfs_find_files_on_disk(osb, namebuf, strlen(namebuf),
 					 &blkno, osb->sys_root_inode, 

Modified: branches/dlm-glue/src/sysfile.h
===================================================================
--- branches/dlm-glue/src/sysfile.h	2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/sysfile.h	2004-11-19 23:44:28 UTC (rev 1655)
@@ -26,6 +26,6 @@
 #ifndef OCFS2_SYSFILE_H
 #define OCFS2_SYSFILE_H
 
-struct inode * ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 node);
+struct inode * ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 slot);
 
 #endif /* OCFS2_SYSFILE_H */



More information about the Ocfs2-commits mailing list