[Ocfs2-commits] jlbec commits r2790 - in branches/ocfs2-1.2-cert: . patches

svn-commits@oss.oracle.com svn-commits at oss.oracle.com
Mon Mar 27 16:29:14 CST 2006


Author: jlbec
Signed-off-by: mfasheh
Date: 2006-03-27 16:29:12 -0600 (Mon, 27 Mar 2006)
New Revision: 2790

Added:
   branches/ocfs2-1.2-cert/patches/debug-mastery.patch
   branches/ocfs2-1.2-cert/patches/dlm-eloop.patch
   branches/ocfs2-1.2-cert/patches/dlm-kurt-mar20-2.patch
   branches/ocfs2-1.2-cert/patches/em-silence-eexist.patch
   branches/ocfs2-1.2-cert/patches/extra-em-trace3.patch
   branches/ocfs2-1.2-cert/patches/fix-purge-lockres.patch
   branches/ocfs2-1.2-cert/patches/hold-recovery-ref.patch
   branches/ocfs2-1.2-cert/patches/leave-other-dead-nodes-on-recovery-list.patch
   branches/ocfs2-1.2-cert/patches/lockres-release-info.patch
   branches/ocfs2-1.2-cert/patches/lvb-recovery-fix.patch
   branches/ocfs2-1.2-cert/patches/mar20-full-3.patch
   branches/ocfs2-1.2-cert/patches/mar24-create-lock-handler.patch
   branches/ocfs2-1.2-cert/patches/mastery-restart-recovery.patch
   branches/ocfs2-1.2-cert/patches/ocfs2-disable_lvbs1.patch
   branches/ocfs2-1.2-cert/patches/ocfs2-extend_file_more_info_on_oops.patch
   branches/ocfs2-1.2-cert/patches/ocfs2-journal_start_stop_msgs.patch
   branches/ocfs2-1.2-cert/patches/ocfs2-reco_nofs.patch
   branches/ocfs2-1.2-cert/patches/ocfs2-validate_lvb_contents1.patch
   branches/ocfs2-1.2-cert/patches/ocfs2_dlm-do_lvb_puts_inline2.patch
   branches/ocfs2-1.2-cert/patches/series
   branches/ocfs2-1.2-cert/patches/two-stage-finalize.patch
Modified:
   branches/ocfs2-1.2-cert/
Log:

o Added quilt patches as of 2006.03.27 14:28

Signed-off-by: mfasheh




Property changes on: branches/ocfs2-1.2-cert
___________________________________________________________________
Name: svn:ignore
   - configure
Config.make
config.cache
config.log
config.status
autom4te.cache
*.rpm
*.tar.gz
.*.sw?

   + configure
Config.make
config.cache
config.log
config.status
autom4te.cache
*.rpm
*.tar.gz
.*.sw?
.pc


Added: branches/ocfs2-1.2-cert/patches/debug-mastery.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/debug-mastery.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/debug-mastery.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,46 @@
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmmaster.c	2006-03-21 13:11:58.618620000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c	2006-03-21 13:19:43.675820000 -0800
+@@ -1621,6 +1621,8 @@ again:
+ 	dlm_node_iter_init(nodemap, &iter);
+ 	while ((to = dlm_node_iter_next(&iter)) >= 0) {
+ 		int r = 0;
++		struct dlm_master_list_entry *mle = NULL;
++
+ 		mlog(0, "sending assert master to %d (%.*s)\n", to,
+ 		     namelen, lockname);
+ 		memset(&assert, 0, sizeof(assert));
+@@ -1645,7 +1647,16 @@ again:
+ 			/* ok, something horribly messed.  kill thyself. */
+ 			mlog(ML_ERROR,"during assert master of %.*s to %u, "
+ 			     "got %d.\n", namelen, lockname, to, r);
+-			dlm_dump_lock_resources(dlm);
++			spin_lock(&dlm->spinlock);
++			spin_lock(&dlm->master_lock);
++			if (dlm_find_mle(dlm, &mle, (char *)lockname,
++					 namelen)) {
++				dlm_print_one_mle(mle);
++				__dlm_put_mle(mle);
++			}
++			spin_unlock(&dlm->master_lock);
++			spin_unlock(&dlm->spinlock);
++			// dlm_dump_lock_resources(dlm);
+ 			BUG();
+ 		} else if (r == EAGAIN) {
+ 			mlog(0, "%.*s: node %u create mles on other "
+@@ -1909,12 +1920,13 @@ done:
+ 
+ kill:
+ 	/* kill the caller! */
++	__dlm_print_one_lock_resource(res);
+ 	spin_unlock(&res->spinlock);
+ 	spin_unlock(&dlm->spinlock);
+ 	dlm_lockres_put(res);
+ 	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
+ 	     "and killing the other node now!  This node is OK and can continue.\n");
+-	dlm_dump_lock_resources(dlm);
++	// dlm_dump_lock_resources(dlm);
+ 	dlm_put(dlm);
+ 	return -EINVAL;
+ }

Added: branches/ocfs2-1.2-cert/patches/dlm-eloop.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/dlm-eloop.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/dlm-eloop.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,25 @@
+Index: fs/ocfs2/dlm/dlmmaster.c
+===================================================================
+--- fs/ocfs2/dlm/dlmmaster.c.orig	2006-03-22 14:36:02.303204000 -0800
++++ fs/ocfs2/dlm/dlmmaster.c	2006-03-23 18:44:36.290960000 -0800
+@@ -983,12 +983,14 @@ recheck:
+ 		spin_unlock(&res->spinlock);
+ 		/* this will cause the master to re-assert across
+ 		 * the whole cluster, freeing up mles */
+-		ret = dlm_do_master_request(mle, res->owner);
+-		if (ret < 0) {
+-			/* give recovery a chance to run */
+-			mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
+-			msleep(500);
+-			goto recheck;
++		if (res->owner != dlm->node_num) {
++			ret = dlm_do_master_request(mle, res->owner);
++			if (ret < 0) {
++				/* give recovery a chance to run */
++				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
++				msleep(500);
++				goto recheck;
++			}
+ 		}
+ 		ret = 0;
+ 		goto leave;

Added: branches/ocfs2-1.2-cert/patches/dlm-kurt-mar20-2.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/dlm-kurt-mar20-2.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/dlm-kurt-mar20-2.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,326 @@
+diff -u dlmthread.c dlmthread.c
+--- fs/ocfs2/dlm/dlmthread.c	(working copy)
++++ fs/ocfs2/dlm/dlmthread.c	(working copy)
+@@ -39,6 +39,7 @@
+ #include <linux/inet.h>
+ #include <linux/timer.h>
+ #include <linux/kthread.h>
++#include <linux/delay.h>
+ 
+ 
+ #include "cluster/heartbeat.h"
+@@ -166,6 +167,7 @@
+ 	} else if (ret < 0) {
+ 		mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
+ 		     lockres->lockname.len, lockres->lockname.name);
++		msleep(100);
+ 		goto again;
+ 	}
+ 
+diff -u dlmmaster.c dlmmaster.c
+--- fs/ocfs2/dlm/dlmmaster.c	(working copy)
++++ fs/ocfs2/dlm/dlmmaster.c	(working copy)
+@@ -1519,15 +1519,12 @@
+ 				mlog_errno(-ENOMEM);
+ 				goto send_response;
+ 			}
+-			spin_lock(&dlm->spinlock);
+-			dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
+-					 name, namelen);
+-			spin_unlock(&dlm->spinlock);
+ 			goto way_up_top;
+ 		}
+ 
+ 		// mlog(0, "this is second time thru, already allocated, "
+ 		// "add the block.\n");
++		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
+ 		set_bit(request->node_idx, mle->maybe_map);
+ 		list_add(&mle->list, &dlm->master_list);
+ 		response = DLM_MASTER_RESP_NO;
+@@ -1700,7 +1697,7 @@
+ 		if (bit >= O2NM_MAX_NODES) {
+ 			/* not necessarily an error, though less likely.
+ 			 * could be master just re-asserting. */
+-			mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
++			mlog(ML_NOTICE, "no bits set in the maybe_map, but %u "
+ 			     "is asserting! (%.*s)\n", assert->node_idx,
+ 			     namelen, name);
+ 		} else if (bit != assert->node_idx) {
+@@ -1712,13 +1709,30 @@
+ 				 * number winning the mastery will respond
+ 				 * YES to mastery requests, but this node
+ 				 * had no way of knowing.  let it pass. */
+-				mlog(ML_ERROR, "%u is the lowest node, "
++				mlog(ML_NOTICE, "%u is the lowest node, "
+ 				     "%u is asserting. (%.*s)  %u must "
+ 				     "have begun after %u won.\n", bit,
+ 				     assert->node_idx, namelen, name, bit,
+ 				     assert->node_idx);
+ 			}
+ 		}
++		if (mle->type == DLM_MLE_MIGRATION) {
++			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
++				mlog(ML_NOTICE, "%s:%.*s: got cleanup assert"
++				     " from %u for migration\n",
++				     dlm->name, namelen, name,
++				     assert->node_idx);
++			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
++				mlog(ML_NOTICE, "%s:%.*s: got unrelated assert"
++				     " from %u for migration, ignoring\n",
++				     dlm->name, namelen, name,
++				     assert->node_idx);
++				__dlm_put_mle(mle);
++				spin_unlock(&dlm->master_lock);
++				spin_unlock(&dlm->spinlock);
++				goto done;
++			}	
++		}
+ 	}
+ 	spin_unlock(&dlm->master_lock);
+ 
+@@ -2228,6 +2242,9 @@
+ 		dlm_mle_detach_hb_events(dlm, mle);
+ 		dlm_put_mle(mle);
+ 		dlm_put_mle_inuse(mle);
++		spin_lock(&res->spinlock);
++		res->state &= ~DLM_LOCK_RES_MIGRATING;
++		spin_unlock(&res->spinlock);
+ 		goto leave;
+ 	}
+ 
+@@ -2257,8 +2274,8 @@
+ 			/* avoid hang during shutdown when migrating lockres 
+ 			 * to a node which also goes down */
+ 			if (dlm_is_node_dead(dlm, target)) {
+-				mlog(0, "%s:%.*s: expected migration target %u "
+-				     "is no longer up.  restarting.\n",
++				mlog(ML_NOTICE, "%s:%.*s: expected migration "
++				     "target %u is no longer up, restarting\n",
+ 				     dlm->name, res->lockname.len,
+ 				     res->lockname.name, target);
+ 				ret = -ERESTARTSYS;
+@@ -2269,6 +2286,9 @@
+ 			dlm_mle_detach_hb_events(dlm, mle);
+ 			dlm_put_mle(mle);
+ 			dlm_put_mle_inuse(mle);
++			spin_lock(&res->spinlock);
++			res->state &= ~DLM_LOCK_RES_MIGRATING;
++			spin_unlock(&res->spinlock);
+ 			goto leave;
+ 		}
+ 		/* TODO: if node died: stop, clean up, return error */
+@@ -2671,6 +2691,7 @@
+ 			/* remove it from the list so that only one
+ 			 * mle will be found */
+ 			list_del_init(&tmp->list);
++			__dlm_mle_detach_hb_events(dlm, mle);
+ 		}
+ 		spin_unlock(&tmp->spinlock);
+ 	}
+@@ -2764,14 +2785,15 @@
+ 
+ 		/* remove from the list early.  NOTE: unlinking
+ 		 * list_head while in list_for_each_safe */
++		__dlm_mle_detach_hb_events(dlm, mle);
+ 		spin_lock(&mle->spinlock);
+ 		list_del_init(&mle->list);
+ 		atomic_set(&mle->woken, 1);
+ 		spin_unlock(&mle->spinlock);
+ 		wake_up(&mle->wq);
+ 
+-		mlog(0, "node %u died during migration from "
+-		     "%u to %u!\n", dead_node,
++		mlog(ML_NOTICE, "%s: node %u died during migration from "
++		     "%u to %u!\n", dlm->name, dead_node,
+ 		     mle->master, mle->new_master);
+ 		/* if there is a lockres associated with this
+ 	 	 * mle, find it and set its owner to UNKNOWN */
+diff -u dlmrecovery.c dlmrecovery.c
+--- fs/ocfs2/dlm/dlmrecovery.c	(working copy)
++++ fs/ocfs2/dlm/dlmrecovery.c	(working copy)
+@@ -835,6 +835,7 @@
+ 	struct list_head *iter;
+ 	int ret;
+ 	u8 dead_node, reco_master;
++	int skip_all_done = 0;
+ 
+ 	dlm = item->dlm;
+ 	dead_node = item->u.ral.dead_node;
+@@ -874,12 +875,21 @@
+ 	dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
+ 
+ 	/* now we can begin blasting lockreses without the dlm lock */
++
++	/* any errors returned will be due to the new_master dying,
++	 * the dlm_reco_thread should detect this */
+ 	list_for_each(iter, &resources) {
+ 		res = list_entry (iter, struct dlm_lock_resource, recovering);
+ 		ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
+ 				   	DLM_MRES_RECOVERY);
+-		if (ret < 0)
++		if (ret < 0) {
+ 			mlog_errno(ret);
++			mlog(ML_ERROR, "%s: node %u went down while sending "
++			     "recovery state for dead node %u\n", dlm->name,
++			     reco_master, dead_node);
++			skip_all_done = 1;
++			break;
++		}
+ 	}
+ 
+ 	/* move the resources back to the list */
+@@ -887,9 +897,15 @@
+ 	list_splice_init(&resources, &dlm->reco.resources);
+ 	spin_unlock(&dlm->spinlock);
+ 
+-	ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
+-	if (ret < 0)
+-		mlog_errno(ret);
++	if (skip_all_done) {
++		ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
++		if (ret < 0) {
++			mlog_errno(ret);
++			mlog(ML_ERROR, "%s: node %u went down while sending "
++			     "recovery all-done for dead node %u\n", dlm->name,
++			     reco_master, dead_node);
++		}
++	}
+ 
+ 	free_page((unsigned long)data);
+ }
+@@ -909,8 +925,14 @@
+ 
+ 	ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
+ 				 sizeof(done_msg), send_to, &tmpret);
+-	/* negative status is ignored by the caller */
+-	if (ret >= 0)
++	if (ret < 0) {
++		if (!dlm_is_host_down(ret)) {
++			mlog_errno(ret);
++			mlog(ML_ERROR, "%s: unknown error sending data-done "
++			     "to %u\n", dlm->name, send_to);
++			BUG();
++		}
++	} else
+ 		ret = tmpret;
+ 	return ret;
+ }
+@@ -1136,8 +1158,9 @@
+ 		    ml->type == LKM_PRMODE) {
+ 			/* if it is already set, this had better be a PR
+ 			 * and it has to match */
+-			if (mres->lvb[0] && (ml->type == LKM_EXMODE ||
+-			    memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
++			if (!dlm_lvb_is_empty(mres->lvb) && 
++			    (ml->type == LKM_EXMODE ||
++			     memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
+ 				mlog(ML_ERROR, "mismatched lvbs!\n");
+ 				__dlm_print_one_lock_resource(lock->lockres);
+ 				BUG();
+@@ -1196,22 +1219,25 @@
+ 			 * we must send it immediately. */
+ 			ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
+ 						       res, total_locks);
+-			if (ret < 0) {
+-				// TODO
+-				mlog(ML_ERROR, "dlm_send_mig_lockres_msg "
+-				     "returned %d, TODO\n", ret);
+-				BUG();
+-			}
++			if (ret < 0)
++				goto error;
+ 		}
+ 	}
+ 	/* flush any remaining locks */
+ 	ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
+-	if (ret < 0) {
+-		// TODO
+-		mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, "
+-		     "TODO\n", ret);
++	if (ret < 0)
++		goto error;
++	return ret;
++
++error:
++	mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",
++	     dlm->name, ret);
++	if (!dlm_is_host_down(ret))
+ 		BUG();
+-	}
++	mlog(ML_NOTICE, "%s: node %u went down while sending %s "
++	     "lockres %.*s\n", dlm->name, send_to, 
++	     flags & DLM_MRES_RECOVERY ?  "recovery" : "migration",
++	     res->lockname.len, res->lockname.name);
+ 	return ret;
+ }
+ 
+@@ -1560,6 +1586,7 @@
+ 	ret += list_num;
+ 	return ret;
+ }
++
+ /* TODO: do ast flush business
+  * TODO: do MIGRATING and RECOVERING spinning
+  */
+@@ -1667,7 +1694,7 @@
+ 		lksb->flags |= (ml->flags &
+ 				(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
+ 			
+-		if (mres->lvb[0]) {
++		if (!dlm_lvb_is_empty(mres->lvb)) {
+ 			if (lksb->flags & DLM_LKSB_PUT_LVB) {
+ 				/* other node was trying to update
+ 				 * lvb when node died.  recreate the
+@@ -1678,8 +1705,9 @@
+ 				 * most recent valid lvb info */
+ 				BUG_ON(ml->type != LKM_EXMODE &&
+ 				       ml->type != LKM_PRMODE);
+-				if (res->lvb[0] && (ml->type == LKM_EXMODE ||
+-				    memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
++				if (!dlm_lvb_is_empty(res->lvb) && 
++				    (ml->type == LKM_EXMODE ||
++				     memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+ 					mlog(ML_ERROR, "received bad lvb!\n");
+ 					__dlm_print_one_lock_resource(res);
+ 					BUG();
+only in patch2:
+unchanged:
+--- fs/ocfs2/dlm/dlmunlock.c	(revision 2787)
++++ fs/ocfs2/dlm/dlmunlock.c	(working copy)
+@@ -318,6 +318,16 @@ static enum dlm_status dlm_send_remote_u
+ 	size_t veclen = 1;
+ 
+ 	mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
++			
++	if (owner == dlm->node_num) {
++		/* ended up trying to contact ourself.  this means
++		 * that the lockres had been remote but became local
++		 * via a migration.  just retry it, now as local */
++		mlog(0, "%s:%.*s: this node became the master due to a "
++		     "migration, re-evaluate now\n", dlm->name,
++		     res->lockname.len, res->lockname.name);
++		return DLM_FORWARD;
++	}
+ 
+ 	memset(&unlock, 0, sizeof(unlock));
+ 	unlock.node_idx = dlm->node_num;
+only in patch2:
+unchanged:
+--- fs/ocfs2/dlm/dlmcommon.h	(revision 2787)
++++ fs/ocfs2/dlm/dlmcommon.h	(working copy)
+@@ -300,6 +300,15 @@ enum dlm_lockres_list {
+ 	DLM_BLOCKED_LIST
+ };
+ 
++static inline int dlm_lvb_is_empty(char *lvb)
++{
++	int i;
++	for (i=0; i<DLM_LVB_LEN; i++)
++		if (lvb[i])
++			return 0;
++	return 1;
++}
++
+ static inline struct list_head *
+ dlm_list_idx_to_ptr(struct dlm_lock_resource *res, enum dlm_lockres_list idx)
+ {

Added: branches/ocfs2-1.2-cert/patches/em-silence-eexist.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/em-silence-eexist.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/em-silence-eexist.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,73 @@
+Index: fs/ocfs2/extent_map.c
+===================================================================
+--- fs/ocfs2/extent_map.c	(revision 2787)
++++ fs/ocfs2/extent_map.c	(working copy)
+@@ -296,7 +296,7 @@ static int ocfs2_extent_map_find_leaf(st
+ 
+ 		ret = ocfs2_extent_map_insert(inode, rec,
+ 					      le16_to_cpu(el->l_tree_depth));
+-		if (ret) {
++		if (ret && (ret != -EEXIST)) {
+ 			mlog_errno(ret);
+ 			goto out_free;
+ 		}
+@@ -425,6 +425,11 @@ static int ocfs2_extent_map_insert_entry
+ /*
+  * Simple rule: on any return code other than -EAGAIN, anything left
+  * in the insert_context will be freed.
++ *
++ * Simple rule #2: A return code of -EEXIST from this function or
++ * its calls to ocfs2_extent_map_insert_entry() signifies that another
++ * thread beat us to the insert.  It is not an actual error, but it
++ * tells the caller we have no more work to do.
+  */
+ static int ocfs2_extent_map_try_insert(struct inode *inode,
+ 				       struct ocfs2_extent_rec *rec,
+@@ -446,23 +451,32 @@ static int ocfs2_extent_map_try_insert(s
+ 		goto out_unlock;
+ 	}
+ 
++	/* Since insert_entry failed, the map MUST have old_ent */
+ 	old_ent = ocfs2_extent_map_lookup(em, le32_to_cpu(rec->e_cpos),
+-					  le32_to_cpu(rec->e_clusters), NULL,
+-					  NULL);
+-
++					  le32_to_cpu(rec->e_clusters),
++					  NULL, NULL);
+ 	if (!old_ent)
+ 		BUG();
+ 
+-	ret = -EEXIST;
+-	if (old_ent->e_tree_depth < tree_depth)
++	if (old_ent->e_tree_depth < tree_depth) {
++		/* Another thread beat us to the lower tree_depth */
++		ret = -EEXIST;
+ 		goto out_unlock;
++	}
+ 
+ 	if (old_ent->e_tree_depth == tree_depth) {
++		/*
++		 * Another thread beat us to this tree_depth.
++		 * Let's make sure we agree with that thread (the
++		 * extent_rec should be identical).
++		 */
+ 		if (!memcmp(rec, &old_ent->e_rec,
+ 			    sizeof(struct ocfs2_extent_rec)))
+ 			ret = 0;
++		else
++			/* FIXME: Should this be ESRCH/EBADR??? */
++			ret = -EEXIST;
+ 
+-		/* FIXME: Should this be ESRCH/EBADR??? */
+ 		goto out_unlock;
+ 	}
+ 
+@@ -597,7 +611,7 @@ int ocfs2_extent_map_insert(struct inode
+ 						  tree_depth, &ctxt);
+ 	} while (ret == -EAGAIN);
+ 
+-	if (ret < 0)
++	if ((ret < 0) && (ret != -EEXIST))
+ 		mlog_errno(ret);
+ 
+ 	if (ctxt.left_ent)

Added: branches/ocfs2-1.2-cert/patches/extra-em-trace3.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/extra-em-trace3.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/extra-em-trace3.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,74 @@
+diff --git a/fs/ocfs2/extent_map.c b/fs/ocfs2/extent_map.c
+index e6f207e..3b8e393 100644
+--- a/fs/ocfs2/extent_map.c
++++ b/fs/ocfs2/extent_map.c
+@@ -455,13 +455,36 @@ static int ocfs2_extent_map_try_insert(s
+ 	BUG_ON(!old_ent);
+ 
+ 	ret = -EEXIST;
+-	if (old_ent->e_tree_depth < tree_depth)
++	if (old_ent->e_tree_depth < tree_depth) {
++		mlog(0, "Trying to add an extent record at tree depth"
++		        " %d for inode %llu, but the extent map already"
++		        " contains a record at tree depth %d\n",
++		     tree_depth,
++		     (unsigned long long)OCFS2_I(inode)->ip_blkno,
++		     old_ent->e_tree_depth);
++		mlog(0,
++		     "old_ent"
++		     " (depth %d, e_cpos %u, e_clusters %u, e_blkno %llu),"
++		     " new_ent"
++		     " (depth %d, e_cpos %u, e_clusters %u, e_blkno %llu)\n",
++		     old_ent->e_tree_depth,
++		     le32_to_cpu(old_ent->e_rec.e_cpos),
++		     le32_to_cpu(old_ent->e_rec.e_clusters),
++		     (unsigned long long)le64_to_cpu(old_ent->e_rec.e_blkno),
++		     tree_depth,
++		     le32_to_cpu(rec->e_cpos),
++		     le32_to_cpu(rec->e_clusters),
++		     (unsigned long long)le64_to_cpu(rec->e_blkno));
++		mlog_errno(ret);
+ 		goto out_unlock;
++	}
+ 
+ 	if (old_ent->e_tree_depth == tree_depth) {
+ 		if (!memcmp(rec, &old_ent->e_rec,
+ 			    sizeof(struct ocfs2_extent_rec)))
+ 			ret = 0;
++		else
++			mlog_errno(ret);
+ 
+ 		/* FIXME: Should this be ESRCH/EBADR??? */
+ 		goto out_unlock;
+@@ -511,16 +534,20 @@ static int ocfs2_extent_map_try_insert(s
+ 	if (ctxt->need_left) {
+ 		ret = ocfs2_extent_map_insert_entry(em,
+ 						    ctxt->left_ent);
+-		if (ret)
++		if (ret) {
++			mlog_errno(ret);
+ 			goto out_unlock;
++		}
+ 		ctxt->left_ent = NULL;
+ 	}
+ 
+ 	if (ctxt->need_right) {
+ 		ret = ocfs2_extent_map_insert_entry(em,
+ 						    ctxt->right_ent);
+-		if (ret)
++		if (ret) {
++			mlog_errno(ret);
+ 			goto out_unlock;
++		}
+ 		ctxt->right_ent = NULL;
+ 	}
+ 
+@@ -528,6 +555,8 @@ static int ocfs2_extent_map_try_insert(s
+ 
+ 	if (!ret)
+ 		ctxt->new_ent = NULL;
++	else
++		mlog_errno(ret);
+ 
+ out_unlock:
+ 	spin_unlock(&OCFS2_I(inode)->ip_lock);

Added: branches/ocfs2-1.2-cert/patches/fix-purge-lockres.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/fix-purge-lockres.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/fix-purge-lockres.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,64 @@
+Index: fs/ocfs2/dlm/dlmthread.c
+===================================================================
+--- fs/ocfs2/dlm/dlmthread.c.orig	2006-03-22 14:36:02.385122000 -0800
++++ fs/ocfs2/dlm/dlmthread.c	2006-03-22 23:50:11.592040000 -0800
+@@ -57,6 +57,8 @@ extern spinlock_t dlm_domain_lock;
+ extern struct list_head dlm_domains;
+ 
+ static int dlm_thread(void *data);
++static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
++				  struct dlm_lock_resource *lockres);
+ 
+ #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
+ 
+@@ -112,10 +114,23 @@ void __dlm_lockres_calc_usage(struct dlm
+ 			res->last_used = jiffies;
+ 			list_add_tail(&res->purge, &dlm->purge_list);
+ 			dlm->purge_count++;
++
++			/* if this node is not the owner, there is
++			 * no way to keep track of who the owner could be.
++			 * unhash it to avoid serious problems. */
++			if (res->owner != dlm->node_num) {
++				mlog(0, "%s:%.*s: doing immediate "
++				     "purge of lockres owned by %u\n",
++				     dlm->name, res->lockname.len,
++				     res->lockname.name, res->owner);
++
++				dlm_purge_lockres_now(dlm, res);
++			}
+ 		}
+ 	} else if (!list_empty(&res->purge)) {
+-		mlog(0, "removing lockres %.*s from purge list\n",
+-		     res->lockname.len, res->lockname.name);
++		mlog(0, "removing lockres %.*s from purge list, "
++		     "owner=%u\n", res->lockname.len, res->lockname.name,
++		     res->owner);
+ 
+ 		list_del_init(&res->purge);
+ 		dlm->purge_count--;
+@@ -181,6 +196,24 @@ finish:
+ 	__dlm_unhash_lockres(lockres);
+ }
+ 
++/* make an unused lockres go away immediately.
++ * as soon as the dlm spinlock is dropped, this lockres
++ * will not be found. kfree still happens on last put. */
++static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
++				  struct dlm_lock_resource *lockres)
++{
++	assert_spin_locked(&dlm->spinlock);
++	assert_spin_locked(&lockres->spinlock);
++
++	BUG_ON(!__dlm_lockres_unused(lockres));
++
++	if (!list_empty(&lockres->purge)) {
++		list_del_init(&lockres->purge);
++		dlm->purge_count--;
++	}
++	__dlm_unhash_lockres(lockres);
++}
++
+ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
+ 			       int purge_now)
+ {

Added: branches/ocfs2-1.2-cert/patches/hold-recovery-ref.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/hold-recovery-ref.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/hold-recovery-ref.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,73 @@
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmrecovery.c
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmrecovery.c	2006-03-21 19:30:46.726473000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmrecovery.c	2006-03-22 13:20:39.267067000 -0800
+@@ -1783,8 +1783,14 @@ void dlm_move_lockres_to_recovery_list(s
+ 	struct dlm_lock *lock;
+ 
+ 	res->state |= DLM_LOCK_RES_RECOVERING;
+-	if (!list_empty(&res->recovering))
++	if (!list_empty(&res->recovering)) {
++		mlog(ML_NOTICE,
++		     "Recovering res %s:%.*s, is already on recovery list!\n",
++		     dlm->name, res->lockname.len, res->lockname.name);
+ 		list_del_init(&res->recovering);
++	}
++	/* We need to hold a reference while on the recovery list */
++	dlm_lockres_get(res);
+ 	list_add_tail(&res->recovering, &dlm->reco.resources);
+ 
+ 	/* find any pending locks and put them back on proper list */
+@@ -1873,9 +1879,11 @@ static void dlm_finish_local_lockres_rec
+ 			spin_lock(&res->spinlock);
+ 			dlm_change_lockres_owner(dlm, res, new_master);
+ 			res->state &= ~DLM_LOCK_RES_RECOVERING;
+-			__dlm_dirty_lockres(dlm, res);
++			if (!__dlm_lockres_unused(res))
++				__dlm_dirty_lockres(dlm, res);
+ 			spin_unlock(&res->spinlock);
+ 			wake_up(&res->wq);
++			dlm_lockres_put(res);
+ 		}
+ 	}
+ 
+@@ -1908,11 +1916,13 @@ static void dlm_finish_local_lockres_rec
+ 					     dlm->name, res->lockname.len,
+ 					     res->lockname.name, res->owner);
+ 					list_del_init(&res->recovering);
++					dlm_lockres_put(res);
+ 				}
+ 				spin_lock(&res->spinlock);
+ 				dlm_change_lockres_owner(dlm, res, new_master);
+ 				res->state &= ~DLM_LOCK_RES_RECOVERING;
+-				__dlm_dirty_lockres(dlm, res);
++				if (!__dlm_lockres_unused(res))
++					__dlm_dirty_lockres(dlm, res);
+ 				spin_unlock(&res->spinlock);
+ 				wake_up(&res->wq);
+ 			}
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmcommon.h
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmcommon.h	2006-03-21 19:30:46.711489000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmcommon.h	2006-03-22 11:59:46.979544000 -0800
+@@ -858,6 +858,7 @@ int dlm_lock_basts_flushed(struct dlm_ct
+ 
+ 
+ int dlm_dump_all_mles(const char __user *data, unsigned int len);
++int __dlm_lockres_unused(struct dlm_lock_resource *res);
+ 
+ 
+ static inline const char * dlm_lock_mode_name(int mode)
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmthread.c
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmthread.c	2006-03-22 11:58:41.192580000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmthread.c	2006-03-22 11:59:07.989988000 -0800
+@@ -82,7 +82,7 @@ repeat:
+ }
+ 
+ 
+-static int __dlm_lockres_unused(struct dlm_lock_resource *res)
++int __dlm_lockres_unused(struct dlm_lock_resource *res)
+ {
+ 	if (list_empty(&res->granted) &&
+ 	    list_empty(&res->converting) &&

Added: branches/ocfs2-1.2-cert/patches/leave-other-dead-nodes-on-recovery-list.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/leave-other-dead-nodes-on-recovery-list.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/leave-other-dead-nodes-on-recovery-list.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,39 @@
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmrecovery.c
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmrecovery.c	2006-03-24 16:47:23.774339000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmrecovery.c	2006-03-24 16:48:04.772260000 -0800
+@@ -1992,14 +1992,23 @@ static void dlm_finish_local_lockres_rec
+ 		bucket = &(dlm->lockres_hash[i]);
+ 		hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
+ 			if (res->state & DLM_LOCK_RES_RECOVERING) {
++				mlog(ML_NOTICE,
++				     "Continue!  owner = %u, dead_node = %u, this = %u, name = %.*s\n",
++				     res->owner, dead_node,
++				     dlm->node_num,
++				     res->lockname.len,
++				     res->lockname.name);
++				continue;
++
++				/* The rest of this is a bug */
+ 				if (res->owner == dead_node) {
+-					mlog(0, "(this=%u) res %.*s owner=%u "
++					mlog(ML_NOTICE, "(this=%u) res %.*s owner=%u "
+ 					     "was not on recovering list, but "
+ 					     "clearing state anyway\n",
+ 					     dlm->node_num, res->lockname.len,
+ 					     res->lockname.name, new_master);
+ 				} else if (res->owner == dlm->node_num) {
+-					mlog(0, "(this=%u) res %.*s owner=%u "
++					mlog(ML_NOTICE, "(this=%u) res %.*s owner=%u "
+ 					     "was not on recovering list, "
+ 					     "owner is THIS node, clearing\n",
+ 					     dlm->node_num, res->lockname.len,
+@@ -2008,7 +2017,7 @@ static void dlm_finish_local_lockres_rec
+ 					continue;
+ 
+ 				if (!list_empty(&res->recovering)) {
+-					mlog(0, "%s:%.*s: lockres was "
++					mlog(ML_NOTICE, "%s:%.*s: lockres was "
+ 					     "marked RECOVERING, owner=%u\n",
+ 					     dlm->name, res->lockname.len,
+ 					     res->lockname.name, res->owner);

Added: branches/ocfs2-1.2-cert/patches/lockres-release-info.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/lockres-release-info.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/lockres-release-info.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,25 @@
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmmaster.c	2006-03-20 17:08:31.633920000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c	2006-03-21 13:09:51.785260000 -0800
+@@ -613,6 +613,20 @@ static void dlm_lockres_release(struct k
+ 	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
+ 	     res->lockname.name);
+ 
++	if (!hlist_unhashed(&res->hash_node) ||
++	    !list_empty(&res->granted) ||
++	    !list_empty(&res->converting) ||
++	    !list_empty(&res->blocked) ||
++	    !list_empty(&res->dirty) ||
++	    !list_empty(&res->recovering) ||
++	    !list_empty(&res->purge)) {
++		mlog(ML_ERROR,
++		     "Going to BUG for resource %.*s."
++		     "  We're on a list!\n",
++		     res->lockname.len, res->lockname.name);
++		dlm_print_one_lock_resource(res);
++	}
++
+ 	/* By the time we're ready to blow this guy away, we shouldn't
+ 	 * be on any lists. */
+ 	BUG_ON(!hlist_unhashed(&res->hash_node));

Added: branches/ocfs2-1.2-cert/patches/lvb-recovery-fix.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/lvb-recovery-fix.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/lvb-recovery-fix.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,139 @@
+Index: fs/ocfs2/dlm/dlmrecovery.c
+===================================================================
+--- fs/ocfs2/dlm/dlmrecovery.c.orig	2006-03-22 14:36:11.772185000 -0800
++++ fs/ocfs2/dlm/dlmrecovery.c	2006-03-23 18:31:54.877480000 -0800
+@@ -1606,6 +1606,66 @@ dlm_list_num_to_pointer(struct dlm_lock_
+  * TODO: do MIGRATING and RECOVERING spinning
+  */
+ 
++#define DLM_OCFS2_SEC_SHIFT  (64 - 34)
++#define DLM_OCFS2_NSEC_MASK  ((1ULL << DLM_OCFS2_SEC_SHIFT) - 1)
++
++struct floo {
++	__be32       lvb_old_seq;
++	__be32       lvb_version;
++	__be32       lvb_iclusters;
++	__be32       lvb_iuid;
++	__be32       lvb_igid;
++	__be16       lvb_imode;
++	__be16       lvb_inlink;
++	__be64       lvb_iatime_packed;
++	__be64       lvb_ictime_packed;
++	__be64       lvb_imtime_packed;
++	__be64       lvb_isize;
++	__be32       lvb_reserved[2];
++};
++
++// OLDSEQ-- VERSION- CLUSTERS UIUD---- IGID---- MODE NLNK ATIMEPACKED----- CTIMEPACKED----- MTIMEPACKED----- ISIZE----------- RESERVED--------
++// 00000000 00000001 00000001 0000c09f 00000262 41ff 0006 10f45a50844bfa5b 110885ed11acf024 110885ed11acf024 0000000000003000 0000000000000000
++static inline void dlm_print_ocfs2_lvb(unsigned char *lvb)
++{
++	struct floo *raw = (struct floo *)lvb;
++	u32 clusters, uid, gid, oldseq, vers;
++	u16 mode, nlink;
++	u64 isize, atime, mtime, ctime;
++	/* just do some lame decoding, doesn't need to be too
++	 * accurate, just cut the encoded value into smaller values */
++	
++	
++        oldseq  = be32_to_cpu(raw->lvb_old_seq);
++        vers    = be32_to_cpu(raw->lvb_version);
++        clusters= be32_to_cpu(raw->lvb_iclusters);
++        isize   = be64_to_cpu(raw->lvb_isize);
++	uid     = be32_to_cpu(raw->lvb_iuid);
++        gid     = be32_to_cpu(raw->lvb_igid);
++        mode    = be16_to_cpu(raw->lvb_imode);
++        nlink   = be16_to_cpu(raw->lvb_inlink);
++	/* just print out the tv_sec portion */
++        atime   = be64_to_cpu(raw->lvb_iatime_packed) >> DLM_OCFS2_SEC_SHIFT;
++        mtime   = be64_to_cpu(raw->lvb_imtime_packed) >> DLM_OCFS2_SEC_SHIFT;
++        ctime   = be64_to_cpu(raw->lvb_ictime_packed) >> DLM_OCFS2_SEC_SHIFT;
++	printk("[%u:%u:%u:%llu:%u:%u:%u:%u:%llu:%llu:%llu]", oldseq, vers,
++	       clusters, (unsigned long long)isize, uid, gid, mode,
++	       nlink, (unsigned long long)atime,
++	       (unsigned long long)mtime, (unsigned long long)ctime);
++}
++
++static inline void dlm_print_lvb(unsigned char *lvb)
++{
++#if 0
++	int i;
++	for (i=0; i<DLM_LVB_LEN; i++)
++		printk("%02x", (unsigned char)lvb[i]);
++#endif
++	
++	dlm_print_ocfs2_lvb(lvb);
++}
++
++
+ /*
+ * NOTE about in-flight requests during migration:
+ *
+@@ -1708,13 +1768,21 @@ static int dlm_process_recovery_data(str
+ 		}
+ 		lksb->flags |= (ml->flags &
+ 				(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
+-			
++
++		if (ml->type == LKM_NLMODE)
++			goto skip_lvb;
++
+ 		if (!dlm_lvb_is_empty(mres->lvb)) {
+ 			if (lksb->flags & DLM_LKSB_PUT_LVB) {
+ 				/* other node was trying to update
+ 				 * lvb when node died.  recreate the
+ 				 * lksb with the updated lvb. */
+ 				memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
++				/* the lock resource lvb update must happen
++				 * NOW, before the spinlock is dropped.
++				 * we no longer wait for the AST to update
++				 * the lvb. */
++				memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
+ 			} else {
+ 				/* otherwise, the node is sending its 
+ 				 * most recent valid lvb info */
+@@ -1723,17 +1791,19 @@ static int dlm_process_recovery_data(str
+ 				if (!dlm_lvb_is_empty(res->lvb) && 
+ 				    (ml->type == LKM_EXMODE ||
+ 				     memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+-					int i;
++					u64 c = be64_to_cpu(lock->ml.cookie);
+ 					mlog(ML_ERROR, "%s:%.*s: received bad "
+-					     "lvb! type=%d\n", dlm->name,
+-					     res->lockname.len, 
+-					     res->lockname.name, ml->type);
++					     "lvb! type=%d, convtype=%d, "
++					     "node=%u, cookie=%u:%llu\n",
++					     dlm->name, res->lockname.len, 
++					     res->lockname.name, ml->type,
++					     ml->convert_type, ml->node,
++					     dlm_get_lock_cookie_node(c),
++					     dlm_get_lock_cookie_seq(c));
+ 					printk("lockres lvb=[");
+-					for (i=0; i<DLM_LVB_LEN; i++)
+-						printk("%02x", res->lvb[i]);
++					dlm_print_lvb(res->lvb);
+ 					printk("]\nmigrated lvb=[");
+-					for (i=0; i<DLM_LVB_LEN; i++)
+-						printk("%02x", mres->lvb[i]);
++					dlm_print_lvb(mres->lvb);
+ 					printk("]\n");
+ 					dlm_print_one_lock_resource(res);
+ 					BUG();
+@@ -1741,7 +1811,7 @@ static int dlm_process_recovery_data(str
+ 				memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
+ 			}
+ 		}
+-
++skip_lvb:
+ 
+ 		/* NOTE:
+ 		 * wrt lock queue ordering and recovery:
+@@ -1762,6 +1832,7 @@ static int dlm_process_recovery_data(str
+ 		bad = 0;
+ 		spin_lock(&res->spinlock);
+ 		list_for_each_entry(lock, queue, list) {
++#warning does this need be64_to_cpu conversion?
+ 			if (lock->ml.cookie == ml->cookie) {
+ 				u64 c = lock->ml.cookie;
+ 				mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "

Added: branches/ocfs2-1.2-cert/patches/mar20-full-3.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/mar20-full-3.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/mar20-full-3.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,1094 @@
+Index: fs/ocfs2/dlm/dlmthread.c
+===================================================================
+--- fs/ocfs2/dlm/dlmthread.c	(revision 2787)
++++ fs/ocfs2/dlm/dlmthread.c	(working copy)
+@@ -39,6 +39,7 @@
+ #include <linux/inet.h>
+ #include <linux/timer.h>
+ #include <linux/kthread.h>
++#include <linux/delay.h>
+ 
+ 
+ #include "cluster/heartbeat.h"
+@@ -166,6 +167,7 @@ again:
+ 	} else if (ret < 0) {
+ 		mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
+ 		     lockres->lockname.len, lockres->lockname.name);
++		msleep(100);
+ 		goto again;
+ 	}
+ 
+@@ -658,8 +660,9 @@ static int dlm_thread(void *data)
+ 			 * spinlock and do NOT have the dlm lock.
+ 			 * safe to reserve/queue asts and run the lists. */
+ 
+-			mlog(0, "calling dlm_shuffle_lists with dlm=%p, "
+-			     "res=%p\n", dlm, res);
++			mlog(0, "calling dlm_shuffle_lists with dlm=%s, "
++			     "res=%.*s\n", dlm->name,
++			     res->lockname.len, res->lockname.name);
+ 
+ 			/* called while holding lockres lock */
+ 			dlm_shuffle_lists(dlm, res);
+Index: fs/ocfs2/dlm/dlmmaster.c
+===================================================================
+--- fs/ocfs2/dlm/dlmmaster.c	(revision 2787)
++++ fs/ocfs2/dlm/dlmmaster.c	(working copy)
+@@ -73,6 +73,7 @@ struct dlm_master_list_entry
+ 	wait_queue_head_t wq;
+ 	atomic_t woken;
+ 	struct kref mle_refs;
++	int inuse;
+ 	unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+ 	unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+ 	unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+@@ -123,15 +124,30 @@ static inline int dlm_mle_equal(struct d
+ 	return 1;
+ }
+ 
++#define dlm_print_nodemap(m)  _dlm_print_nodemap(m,#m)
++void _dlm_print_nodemap(unsigned long *map, const char *mapname)
++{
++	int i;
++	printk("%s=[ ", mapname);
++	for (i=0; i<O2NM_MAX_NODES; i++)
++		if (test_bit(i, map))
++			printk("%d ", i);
++	printk("]");
++}
++
+ void dlm_print_one_mle(struct dlm_master_list_entry *mle)
+ {
+-	int i = 0, refs;
++	int refs;
+ 	char *type;
+ 	char attached;
+ 	u8 master;
+ 	unsigned int namelen;
+ 	const char *name;
+ 	struct kref *k;
++	unsigned long *maybe = mle->maybe_map, 
++		      *vote = mle->vote_map, 
++		      *resp = mle->response_map, 
++		      *node = mle->node_map;
+ 
+ 	k = &mle->mle_refs;
+ 	if (mle->type == DLM_MLE_BLOCK)
+@@ -151,10 +167,19 @@ void dlm_print_one_mle(struct dlm_master
+ 		namelen = mle->u.res->lockname.len;
+ 		name = mle->u.res->lockname.name;
+ 	}
+-
+-	mlog(ML_NOTICE, "  #%3d: %3s  %3d  %3u   %3u %c    (%d)%.*s\n",
+-		  i, type, refs, master, mle->new_master, attached,
+-		  namelen, namelen, name);
++	
++	mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
++		  namelen, name, type, refs, master, mle->new_master, attached,
++		  mle->inuse);
++	dlm_print_nodemap(maybe);
++	printk(", ");
++	dlm_print_nodemap(vote);
++	printk(", ");
++	dlm_print_nodemap(resp);
++	printk(", ");
++	dlm_print_nodemap(node);
++	printk(", ");
++	printk("\n");
+ }
+ 
+ 			      
+@@ -166,7 +191,6 @@ static void dlm_dump_mles(struct dlm_ctx
+ 	struct list_head *iter;
+ 	
+ 	mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
+-	mlog(ML_NOTICE, "  ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
+ 	spin_lock(&dlm->master_lock);
+ 	list_for_each(iter, &dlm->master_list) {
+ 		mle = list_entry(iter, struct dlm_master_list_entry, list);
+@@ -310,6 +334,31 @@ static inline void dlm_mle_detach_hb_eve
+ 	__dlm_mle_detach_hb_events(dlm, mle);
+ 	spin_unlock(&dlm->spinlock);
+ }
++	
++static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
++{
++	struct dlm_ctxt *dlm;
++	dlm = mle->dlm;
++
++	assert_spin_locked(&dlm->spinlock);
++	assert_spin_locked(&dlm->master_lock);
++	mle->inuse++;
++	kref_get(&mle->mle_refs);
++}
++
++static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
++{
++	struct dlm_ctxt *dlm;
++	dlm = mle->dlm;
++	
++	spin_lock(&dlm->spinlock);
++	spin_lock(&dlm->master_lock);
++	mle->inuse--;
++	__dlm_put_mle(mle);
++	spin_unlock(&dlm->master_lock);
++	spin_unlock(&dlm->spinlock);
++
++}
+ 
+ /* remove from list and free */
+ static void __dlm_put_mle(struct dlm_master_list_entry *mle)
+@@ -319,9 +368,13 @@ static void __dlm_put_mle(struct dlm_mas
+ 
+ 	assert_spin_locked(&dlm->spinlock);
+ 	assert_spin_locked(&dlm->master_lock);
+-	BUG_ON(!atomic_read(&mle->mle_refs.refcount));
+-
+-	kref_put(&mle->mle_refs, dlm_mle_release);
++	if (!atomic_read(&mle->mle_refs.refcount)) {
++		/* this may or may not crash, but who cares.
++		 * it's a BUG. */
++		mlog(ML_ERROR, "bad mle: %p\n", mle);
++		dlm_print_one_mle(mle);
++	} else
++		kref_put(&mle->mle_refs, dlm_mle_release);
+ }
+ 
+ 
+@@ -364,6 +417,7 @@ static void dlm_init_mle(struct dlm_mast
+ 	memset(mle->response_map, 0, sizeof(mle->response_map));
+ 	mle->master = O2NM_MAX_NODES;
+ 	mle->new_master = O2NM_MAX_NODES;
++	mle->inuse = 0;
+ 
+ 	if (mle->type == DLM_MLE_MASTER) {
+ 		BUG_ON(!res);
+@@ -784,7 +838,7 @@ lookup:
+ 	 * if so, the creator of the BLOCK may try to put the last
+ 	 * ref at this time in the assert master handler, so we
+ 	 * need an extra one to keep from a bad ptr deref. */
+-	dlm_get_mle(mle);
++	dlm_get_mle_inuse(mle);
+ 	spin_unlock(&dlm->master_lock);
+ 	spin_unlock(&dlm->spinlock);
+ 
+@@ -806,6 +860,7 @@ lookup:
+ 		} 
+ 
+ 		dlm_kick_recovery_thread(dlm);
++		msleep(100);
+ 		dlm_wait_for_recovery(dlm);
+ 
+ 		spin_lock(&dlm->spinlock);
+@@ -873,7 +928,7 @@ wait:
+ 	dlm_mle_detach_hb_events(dlm, mle);
+ 	dlm_put_mle(mle);
+ 	/* put the extra ref */
+-	dlm_put_mle(mle);
++	dlm_put_mle_inuse(mle);
+ 
+ wake_waiters:
+ 	spin_lock(&res->spinlock);
+@@ -955,6 +1010,12 @@ recheck:
+ 		     "rechecking now\n", dlm->name, res->lockname.len,
+ 		     res->lockname.name);
+ 		goto recheck;
++	} else {
++		if (!voting_done) {
++			mlog(0, "map not changed and voting not done "
++			     "for %s:%.*s\n", dlm->name, res->lockname.len,
++			     res->lockname.name);
++		}
+ 	}
+ 
+ 	if (m != O2NM_MAX_NODES) {
+@@ -1458,15 +1519,12 @@ way_up_top:
+ 				mlog_errno(-ENOMEM);
+ 				goto send_response;
+ 			}
+-			spin_lock(&dlm->spinlock);
+-			dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
+-					 name, namelen);
+-			spin_unlock(&dlm->spinlock);
+ 			goto way_up_top;
+ 		}
+ 
+ 		// mlog(0, "this is second time thru, already allocated, "
+ 		// "add the block.\n");
++		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
+ 		set_bit(request->node_idx, mle->maybe_map);
+ 		list_add(&mle->list, &dlm->master_list);
+ 		response = DLM_MASTER_RESP_NO;
+@@ -1639,7 +1697,7 @@ int dlm_assert_master_handler(struct o2n
+ 		if (bit >= O2NM_MAX_NODES) {
+ 			/* not necessarily an error, though less likely.
+ 			 * could be master just re-asserting. */
+-			mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
++			mlog(ML_NOTICE, "no bits set in the maybe_map, but %u "
+ 			     "is asserting! (%.*s)\n", assert->node_idx,
+ 			     namelen, name);
+ 		} else if (bit != assert->node_idx) {
+@@ -1651,13 +1709,30 @@ int dlm_assert_master_handler(struct o2n
+ 				 * number winning the mastery will respond
+ 				 * YES to mastery requests, but this node
+ 				 * had no way of knowing.  let it pass. */
+-				mlog(ML_ERROR, "%u is the lowest node, "
++				mlog(ML_NOTICE, "%u is the lowest node, "
+ 				     "%u is asserting. (%.*s)  %u must "
+ 				     "have begun after %u won.\n", bit,
+ 				     assert->node_idx, namelen, name, bit,
+ 				     assert->node_idx);
+ 			}
+ 		}
++		if (mle->type == DLM_MLE_MIGRATION) {
++			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
++				mlog(ML_NOTICE, "%s:%.*s: got cleanup assert"
++				     " from %u for migration\n",
++				     dlm->name, namelen, name,
++				     assert->node_idx);
++			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
++				mlog(ML_NOTICE, "%s:%.*s: got unrelated assert"
++				     " from %u for migration, ignoring\n",
++				     dlm->name, namelen, name,
++				     assert->node_idx);
++				__dlm_put_mle(mle);
++				spin_unlock(&dlm->master_lock);
++				spin_unlock(&dlm->spinlock);
++				goto done;
++			}	
++		}
+ 	}
+ 	spin_unlock(&dlm->master_lock);
+ 
+@@ -1672,7 +1747,8 @@ int dlm_assert_master_handler(struct o2n
+ 			goto kill;
+ 		}
+ 		if (!mle) {
+-			if (res->owner != assert->node_idx) {
++			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
++			    res->owner != assert->node_idx) {
+ 				mlog(ML_ERROR, "assert_master from "
+ 					  "%u, but current owner is "
+ 					  "%u! (%.*s)\n",
+@@ -1725,6 +1801,7 @@ ok:
+ 	if (mle) {
+ 		int extra_ref = 0;
+ 		int nn = -1;
++		int rr, err = 0;
+ 		
+ 		spin_lock(&mle->spinlock);
+ 		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
+@@ -1744,27 +1821,64 @@ ok:
+ 		wake_up(&mle->wq);
+ 		spin_unlock(&mle->spinlock);
+ 
+-		if (mle->type == DLM_MLE_MIGRATION && res) {
+-			mlog(0, "finishing off migration of lockres %.*s, "
+-			     "from %u to %u\n",
+-			       res->lockname.len, res->lockname.name,
+-			       dlm->node_num, mle->new_master);
++		if (res) {
+ 			spin_lock(&res->spinlock);
+-			res->state &= ~DLM_LOCK_RES_MIGRATING;
+-			dlm_change_lockres_owner(dlm, res, mle->new_master);
+-			BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
++			if (mle->type == DLM_MLE_MIGRATION) {
++				mlog(0, "finishing off migration of lockres %.*s, "
++			     		"from %u to %u\n",
++			       		res->lockname.len, res->lockname.name,
++			       		dlm->node_num, mle->new_master);
++				res->state &= ~DLM_LOCK_RES_MIGRATING;
++				dlm_change_lockres_owner(dlm, res, mle->new_master);
++				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
++			} else {
++				dlm_change_lockres_owner(dlm, res, mle->master);
++			}
+ 			spin_unlock(&res->spinlock);
+ 		}
+-		/* master is known, detach if not already detached */
+-		dlm_mle_detach_hb_events(dlm, mle);
+-		dlm_put_mle(mle);
+ 		
++		/* master is known, detach if not already detached. 
++		 * ensures that only one assert_master call will happen
++		 * on this mle. */
++		spin_lock(&dlm->spinlock);
++		spin_lock(&dlm->master_lock);
++
++		rr = atomic_read(&mle->mle_refs.refcount);
++		if (mle->inuse > 0) {
++			if (extra_ref && rr < 3)
++				err = 1;
++			else if (!extra_ref && rr < 2)
++				err = 1;
++		} else {
++			if (extra_ref && rr < 2)
++				err = 1;
++			else if (!extra_ref && rr < 1)
++				err = 1;
++		}
++		if (err) {
++			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
++			     "that will mess up this node, refs=%d, extra=%d, "
++			     "inuse=%d\n", dlm->name, namelen, name,
++			     assert->node_idx, rr, extra_ref, mle->inuse);
++			dlm_print_one_mle(mle);
++		}
++		list_del_init(&mle->list);
++		__dlm_mle_detach_hb_events(dlm, mle);
++		__dlm_put_mle(mle);
+ 		if (extra_ref) {
+ 			/* the assert master message now balances the extra
+ 		 	 * ref given by the master / migration request message.
+ 		 	 * if this is the last put, it will be removed
+ 		 	 * from the list. */
+-			dlm_put_mle(mle);
++			__dlm_put_mle(mle);
++		}
++		spin_unlock(&dlm->master_lock);
++		spin_unlock(&dlm->spinlock);
++	} else if (res) {
++		if (res->owner != assert->node_idx) {
++			mlog(ML_NOTICE, "assert_master from %u, but current "
++			     "owner is %u (%.*s), no mle\n", assert->node_idx,
++			     res->owner, namelen, name);
+ 		}
+ 	}
+ 
+@@ -2110,7 +2224,7 @@ fail:
+ 	 * take both dlm->spinlock and dlm->master_lock */
+ 	spin_lock(&dlm->spinlock);
+ 	spin_lock(&dlm->master_lock);
+-	dlm_get_mle(mle);
++	dlm_get_mle_inuse(mle);
+ 	spin_unlock(&dlm->master_lock);
+ 	spin_unlock(&dlm->spinlock);
+ 
+@@ -2127,7 +2241,10 @@ fail:
+ 		/* migration failed, detach and clean up mle */
+ 		dlm_mle_detach_hb_events(dlm, mle);
+ 		dlm_put_mle(mle);
+-		dlm_put_mle(mle);
++		dlm_put_mle_inuse(mle);
++		spin_lock(&res->spinlock);
++		res->state &= ~DLM_LOCK_RES_MIGRATING;
++		spin_unlock(&res->spinlock);
+ 		goto leave;
+ 	}
+ 
+@@ -2157,8 +2274,8 @@ fail:
+ 			/* avoid hang during shutdown when migrating lockres 
+ 			 * to a node which also goes down */
+ 			if (dlm_is_node_dead(dlm, target)) {
+-				mlog(0, "%s:%.*s: expected migration target %u "
+-				     "is no longer up.  restarting.\n",
++				mlog(ML_NOTICE, "%s:%.*s: expected migration "
++				     "target %u is no longer up, restarting\n",
+ 				     dlm->name, res->lockname.len,
+ 				     res->lockname.name, target);
+ 				ret = -ERESTARTSYS;
+@@ -2168,7 +2285,10 @@ fail:
+ 			/* migration failed, detach and clean up mle */
+ 			dlm_mle_detach_hb_events(dlm, mle);
+ 			dlm_put_mle(mle);
+-			dlm_put_mle(mle);
++			dlm_put_mle_inuse(mle);
++			spin_lock(&res->spinlock);
++			res->state &= ~DLM_LOCK_RES_MIGRATING;
++			spin_unlock(&res->spinlock);
+ 			goto leave;
+ 		}
+ 		/* TODO: if node died: stop, clean up, return error */
+@@ -2184,7 +2304,7 @@ fail:
+ 
+ 	/* master is known, detach if not already detached */
+ 	dlm_mle_detach_hb_events(dlm, mle);
+-	dlm_put_mle(mle);
++	dlm_put_mle_inuse(mle);
+ 	ret = 0;
+ 
+ 	dlm_lockres_calc_usage(dlm, res);
+@@ -2571,6 +2691,7 @@ static int dlm_add_migration_mle(struct 
+ 			/* remove it from the list so that only one
+ 			 * mle will be found */
+ 			list_del_init(&tmp->list);
++			__dlm_mle_detach_hb_events(dlm, mle);
+ 		}
+ 		spin_unlock(&tmp->spinlock);
+ 	}
+@@ -2664,14 +2785,15 @@ top:
+ 
+ 		/* remove from the list early.  NOTE: unlinking
+ 		 * list_head while in list_for_each_safe */
++		__dlm_mle_detach_hb_events(dlm, mle);
+ 		spin_lock(&mle->spinlock);
+ 		list_del_init(&mle->list);
+ 		atomic_set(&mle->woken, 1);
+ 		spin_unlock(&mle->spinlock);
+ 		wake_up(&mle->wq);
+ 
+-		mlog(0, "node %u died during migration from "
+-		     "%u to %u!\n", dead_node,
++		mlog(ML_NOTICE, "%s: node %u died during migration from "
++		     "%u to %u!\n", dlm->name, dead_node,
+ 		     mle->master, mle->new_master);
+ 		/* if there is a lockres associated with this
+ 	 	 * mle, find it and set its owner to UNKNOWN */
+Index: fs/ocfs2/dlm/dlmunlock.c
+===================================================================
+--- fs/ocfs2/dlm/dlmunlock.c	(revision 2787)
++++ fs/ocfs2/dlm/dlmunlock.c	(working copy)
+@@ -318,6 +318,16 @@ static enum dlm_status dlm_send_remote_u
+ 	size_t veclen = 1;
+ 
+ 	mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
++			
++	if (owner == dlm->node_num) {
++		/* ended up trying to contact ourself.  this means
++		 * that the lockres had been remote but became local
++		 * via a migration.  just retry it, now as local */
++		mlog(0, "%s:%.*s: this node became the master due to a "
++		     "migration, re-evaluate now\n", dlm->name,
++		     res->lockname.len, res->lockname.name);
++		return DLM_FORWARD;
++	}
+ 
+ 	memset(&unlock, 0, sizeof(unlock));
+ 	unlock.node_idx = dlm->node_num;
+Index: fs/ocfs2/dlm/dlmcommon.h
+===================================================================
+--- fs/ocfs2/dlm/dlmcommon.h	(revision 2787)
++++ fs/ocfs2/dlm/dlmcommon.h	(working copy)
+@@ -300,6 +300,15 @@ enum dlm_lockres_list {
+ 	DLM_BLOCKED_LIST
+ };
+ 
++static inline int dlm_lvb_is_empty(char *lvb)
++{
++	int i;
++	for (i=0; i<DLM_LVB_LEN; i++)
++		if (lvb[i])
++			return 0;
++	return 1;
++}
++
+ static inline struct list_head *
+ dlm_list_idx_to_ptr(struct dlm_lock_resource *res, enum dlm_lockres_list idx)
+ {
+Index: fs/ocfs2/dlm/dlmrecovery.c
+===================================================================
+--- fs/ocfs2/dlm/dlmrecovery.c	(revision 2787)
++++ fs/ocfs2/dlm/dlmrecovery.c	(working copy)
+@@ -115,12 +115,31 @@ static u64 dlm_get_next_mig_cookie(void)
+ 	return c;
+ }
+ 
++static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
++					  u8 dead_node)
++{
++	assert_spin_locked(&dlm->spinlock);
++	if (dlm->reco.dead_node != dead_node)
++		mlog(ML_NOTICE, "%s: changing dead_node from %u to %u\n",
++		     dlm->name, dlm->reco.dead_node, dead_node);
++	dlm->reco.dead_node = dead_node;
++}
++
++static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
++				       u8 master)
++{
++	assert_spin_locked(&dlm->spinlock);
++	mlog(ML_NOTICE, "%s: changing new_master from %u to %u\n",
++	     dlm->name, dlm->reco.new_master, master);
++	dlm->reco.new_master = master;
++}
++
+ static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
+ {
+ 	spin_lock(&dlm->spinlock);
+ 	clear_bit(dlm->reco.dead_node, dlm->recovery_map);
+-	dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
+-	dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
++	dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
++	dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
+ 	spin_unlock(&dlm->spinlock);
+ }
+ 
+@@ -220,6 +239,53 @@ void dlm_complete_recovery_thread(struct
+  *
+  */
+ 
++static void dlm_print_reco_junk(struct dlm_ctxt *dlm)
++{
++	struct dlm_reco_node_data *ndata;
++	struct dlm_lock_resource *res;
++	
++	mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, "
++		     "dead=%u, master=%u\n", dlm->name,
++		     dlm->dlm_reco_thread_task->pid, 
++		     dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive",
++		     dlm->reco.dead_node, dlm->reco.new_master);
++
++	list_for_each_entry(ndata, &dlm->reco.node_data, list) {
++		char *st = "unknown";
++		switch (ndata->state) {
++			case DLM_RECO_NODE_DATA_INIT:
++				st = "init";
++				break;
++			case DLM_RECO_NODE_DATA_REQUESTING:
++				st = "requesting";
++				break;
++			case DLM_RECO_NODE_DATA_DEAD:
++				st = "dead";
++				break;
++			case DLM_RECO_NODE_DATA_RECEIVING:
++				st = "receiving";
++				break;
++			case DLM_RECO_NODE_DATA_REQUESTED:
++				st = "requested";
++				break;
++			case DLM_RECO_NODE_DATA_DONE:
++				st = "done";
++				break;
++			case DLM_RECO_NODE_DATA_FINALIZE_SENT:
++				st = "finalize-sent";
++				break;
++			default:
++				st = "bad";
++				break;
++		}
++		mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n", 
++		     dlm->name, ndata->node_num, st);
++	}
++	list_for_each_entry(res, &dlm->reco.resources, recovering) {
++		mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n",
++		     dlm->name, res->lockname.len, res->lockname.name);
++	}
++}
+ 
+ #define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000)
+ 
+@@ -267,7 +333,7 @@ int dlm_is_node_dead(struct dlm_ctxt *dl
+ {
+ 	int dead;
+ 	spin_lock(&dlm->spinlock);
+-	dead = test_bit(node, dlm->domain_map);
++	dead = !test_bit(node, dlm->domain_map);
+ 	spin_unlock(&dlm->spinlock);
+ 	return dead;
+ }
+@@ -308,7 +374,28 @@ static int dlm_in_recovery(struct dlm_ct
+ 
+ void dlm_wait_for_recovery(struct dlm_ctxt *dlm)
+ {
+-	wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
++	if (dlm_in_recovery(dlm)) {
++		mlog(ML_NOTICE, "%s: reco thread %d in recovery: "
++		     "state=%d, master=%u, dead=%u\n",
++		     dlm->name, dlm->dlm_reco_thread_task->pid,
++		     dlm->reco.state, dlm->reco.new_master,
++		     dlm->reco.dead_node);
++		dlm_print_reco_junk(dlm);
++	}
++		
++	while (1) {
++		if (wait_event_timeout(dlm->reco.event, 
++				       !dlm_in_recovery(dlm), 
++				       msecs_to_jiffies(5000)))
++			break;
++		mlog(ML_NOTICE, "%s: reco thread %d still in recovery: "
++		     "state=%d, master=%u, dead=%u\n",
++		     dlm->name, dlm->dlm_reco_thread_task->pid,
++		     dlm->reco.state, dlm->reco.new_master,
++		     dlm->reco.dead_node);
++		dlm_print_reco_junk(dlm);
++	}
++	// wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
+ }
+ 
+ static void dlm_begin_recovery(struct dlm_ctxt *dlm)
+@@ -341,7 +428,7 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+ 		mlog(0, "new master %u died while recovering %u!\n",
+ 		     dlm->reco.new_master, dlm->reco.dead_node);
+ 		/* unset the new_master, leave dead_node */
+-		dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
++		dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
+ 	}
+ 
+ 	/* select a target to recover */
+@@ -350,14 +437,14 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+ 
+ 		bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0);
+ 		if (bit >= O2NM_MAX_NODES || bit < 0)
+-			dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
++			dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
+ 		else
+-			dlm->reco.dead_node = bit;
++			dlm_set_reco_dead_node(dlm, bit);
+ 	} else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
+ 		/* BUG? */
+ 		mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",
+ 		     dlm->reco.dead_node);
+-		dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
++		dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
+ 	}
+ 
+ 	if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
+@@ -366,7 +453,8 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+ 		/* return to main thread loop and sleep. */
+ 		return 0;
+ 	}
+-	mlog(0, "recovery thread found node %u in the recovery map!\n",
++	mlog(ML_NOTICE, "%s(%d):recovery thread found node %u in the recovery map!\n",
++	     dlm->name, dlm->dlm_reco_thread_task->pid,
+ 	     dlm->reco.dead_node);
+ 	spin_unlock(&dlm->spinlock);
+ 
+@@ -389,8 +477,8 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+ 		}
+ 		mlog(0, "another node will master this recovery session.\n");
+ 	}
+-	mlog(0, "dlm=%s, new_master=%u, this node=%u, dead_node=%u\n",
+-	     dlm->name, dlm->reco.new_master,
++	mlog(ML_NOTICE, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n",
++	     dlm->name, dlm->dlm_reco_thread_task->pid, dlm->reco.new_master,
+ 	     dlm->node_num, dlm->reco.dead_node);
+ 
+ 	/* it is safe to start everything back up here
+@@ -402,7 +490,8 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+ 	return 0;
+ 
+ master_here:
+-	mlog(0, "mastering recovery of %s:%u here(this=%u)!\n",
++	mlog(ML_NOTICE, "(%d) mastering recovery of %s:%u here(this=%u)!\n",
++	     dlm->dlm_reco_thread_task->pid,
+ 	     dlm->name, dlm->reco.dead_node, dlm->node_num);
+ 
+ 	status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
+@@ -414,7 +503,7 @@ master_here:
+ 		msleep(100);
+ 	} else {
+ 		/* success!  see if any other nodes need recovery */
+-		mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n",
++		mlog(ML_NOTICE, "DONE mastering recovery of %s:%u here(this=%u)!\n",
+ 		     dlm->name, dlm->reco.dead_node, dlm->node_num);
+ 		dlm_reset_recovery(dlm);
+ 	}
+@@ -544,11 +633,19 @@ static int dlm_remaster_locks(struct dlm
+ 					goto leave;
+ 				case DLM_RECO_NODE_DATA_RECEIVING:
+ 				case DLM_RECO_NODE_DATA_REQUESTED:
++					mlog(0, "%s: node %u still in state %s\n",
++					     dlm->name, ndata->node_num,
++					     ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?
++					     "receiving" : "requested");
+ 					all_nodes_done = 0;
+ 					break;
+ 				case DLM_RECO_NODE_DATA_DONE:
++					mlog(0, "%s: node %u state is done\n",
++					     dlm->name, ndata->node_num);
+ 					break;
+ 				case DLM_RECO_NODE_DATA_FINALIZE_SENT:
++					mlog(0, "%s: node %u state is finalize\n",
++					     dlm->name, ndata->node_num);
+ 					break;
+ 			}
+ 		}
+@@ -573,7 +670,7 @@ static int dlm_remaster_locks(struct dlm
+ 			spin_unlock(&dlm->spinlock);
+ 			mlog(0, "should be done with recovery!\n");
+ 
+-			mlog(0, "finishing recovery of %s at %lu, "
++			mlog(ML_NOTICE, "finishing recovery of %s at %lu, "
+ 			     "dead=%u, this=%u, new=%u\n", dlm->name,
+ 			     jiffies, dlm->reco.dead_node,
+ 			     dlm->node_num, dlm->reco.new_master);
+@@ -690,6 +787,15 @@ int dlm_request_all_locks_handler(struct
+ 	if (!dlm_grab(dlm))
+ 		return -EINVAL;
+ 
++	if (lr->dead_node != dlm->reco.dead_node) {
++		mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
++		     "dead_node is %u\n", dlm->name, lr->node_idx,
++		     lr->dead_node, dlm->reco.dead_node);
++		dlm_print_reco_junk(dlm);
++		/* this is a hack */
++		dlm_put(dlm);
++		return -ENOMEM;
++	}
+ 	BUG_ON(lr->dead_node != dlm->reco.dead_node);
+ 
+ 	item = kcalloc(1, sizeof(*item), GFP_KERNEL);
+@@ -729,12 +835,16 @@ static void dlm_request_all_locks_worker
+ 	struct list_head *iter;
+ 	int ret;
+ 	u8 dead_node, reco_master;
++	int skip_all_done = 0;
+ 
+ 	dlm = item->dlm;
+ 	dead_node = item->u.ral.dead_node;
+ 	reco_master = item->u.ral.reco_master;
+ 	mres = (struct dlm_migratable_lockres *)data;
+ 
++	mlog(ML_NOTICE, "%s: recovery worker started, dead=%u, master=%u\n",
++	     dlm->name, dead_node, reco_master);
++
+ 	if (dead_node != dlm->reco.dead_node ||
+ 	    reco_master != dlm->reco.new_master) {
+ 		/* show extra debug info if the recovery state is messed */
+@@ -765,12 +875,21 @@ static void dlm_request_all_locks_worker
+ 	dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
+ 
+ 	/* now we can begin blasting lockreses without the dlm lock */
++
++	/* any errors returned will be due to the new_master dying,
++	 * the dlm_reco_thread should detect this */
+ 	list_for_each(iter, &resources) {
+ 		res = list_entry (iter, struct dlm_lock_resource, recovering);
+ 		ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
+ 				   	DLM_MRES_RECOVERY);
+-		if (ret < 0)
++		if (ret < 0) {
+ 			mlog_errno(ret);
++			mlog(ML_ERROR, "%s: node %u went down while sending "
++			     "recovery state for dead node %u\n", dlm->name,
++			     reco_master, dead_node);
++			skip_all_done = 1;
++			break;
++		}
+ 	}
+ 
+ 	/* move the resources back to the list */
+@@ -778,9 +897,15 @@ static void dlm_request_all_locks_worker
+ 	list_splice_init(&resources, &dlm->reco.resources);
+ 	spin_unlock(&dlm->spinlock);
+ 
+-	ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
+-	if (ret < 0)
+-		mlog_errno(ret);
++	if (!skip_all_done) {
++		ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
++		if (ret < 0) {
++			mlog_errno(ret);
++			mlog(ML_ERROR, "%s: node %u went down while sending "
++			     "recovery all-done for dead node %u\n", dlm->name,
++			     reco_master, dead_node);
++		}
++	}
+ 
+ 	free_page((unsigned long)data);
+ }
+@@ -794,14 +919,20 @@ static int dlm_send_all_done_msg(struct 
+ 	memset(&done_msg, 0, sizeof(done_msg));
+ 	done_msg.node_idx = dlm->node_num;
+ 	done_msg.dead_node = dead_node;
+-	mlog(0, "sending DATA DONE message to %u, "
++	mlog(ML_NOTICE, "sending DATA DONE message to %u, "
+ 	     "my node=%u, dead node=%u\n", send_to, done_msg.node_idx,
+ 	     done_msg.dead_node);
+ 
+ 	ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
+ 				 sizeof(done_msg), send_to, &tmpret);
+-	/* negative status is ignored by the caller */
+-	if (ret >= 0)
++	if (ret < 0) {
++		if (!dlm_is_host_down(ret)) {
++			mlog_errno(ret);
++			mlog(ML_ERROR, "%s: unknown error sending data-done "
++			     "to %u\n", dlm->name, send_to);
++			BUG();
++		}
++	} else
+ 		ret = tmpret;
+ 	return ret;
+ }
+@@ -821,6 +952,11 @@ int dlm_reco_data_done_handler(struct o2
+ 	mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
+ 	     "node_idx=%u, this node=%u\n", done->dead_node,
+ 	     dlm->reco.dead_node, done->node_idx, dlm->node_num);
++	if (done->dead_node != dlm->reco.dead_node) {
++		mlog(ML_ERROR, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
++		     "node_idx=%u, this node=%u\n", done->dead_node,
++		     dlm->reco.dead_node, done->node_idx, dlm->node_num);
++	}
+ 	BUG_ON(done->dead_node != dlm->reco.dead_node);
+ 
+ 	spin_lock(&dlm_reco_state_lock);
+@@ -1022,8 +1158,9 @@ static int dlm_add_lock_to_array(struct 
+ 		    ml->type == LKM_PRMODE) {
+ 			/* if it is already set, this had better be a PR
+ 			 * and it has to match */
+-			if (mres->lvb[0] && (ml->type == LKM_EXMODE ||
+-			    memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
++			if (!dlm_lvb_is_empty(mres->lvb) && 
++			    (ml->type == LKM_EXMODE ||
++			     memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
+ 				mlog(ML_ERROR, "mismatched lvbs!\n");
+ 				__dlm_print_one_lock_resource(lock->lockres);
+ 				BUG();
+@@ -1082,22 +1219,25 @@ int dlm_send_one_lockres(struct dlm_ctxt
+ 			 * we must send it immediately. */
+ 			ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
+ 						       res, total_locks);
+-			if (ret < 0) {
+-				// TODO
+-				mlog(ML_ERROR, "dlm_send_mig_lockres_msg "
+-				     "returned %d, TODO\n", ret);
+-				BUG();
+-			}
++			if (ret < 0)
++				goto error;
+ 		}
+ 	}
+ 	/* flush any remaining locks */
+ 	ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
+-	if (ret < 0) {
+-		// TODO
+-		mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, "
+-		     "TODO\n", ret);
++	if (ret < 0)
++		goto error;
++	return ret;
++
++error:
++	mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",
++	     dlm->name, ret);
++	if (!dlm_is_host_down(ret))
+ 		BUG();
+-	}
++	mlog(ML_NOTICE, "%s: node %u went down while sending %s "
++	     "lockres %.*s\n", dlm->name, send_to, 
++	     flags & DLM_MRES_RECOVERY ?  "recovery" : "migration",
++	     res->lockname.len, res->lockname.name);
+ 	return ret;
+ }
+ 
+@@ -1446,6 +1586,7 @@ dlm_list_num_to_pointer(struct dlm_lock_
+ 	ret += list_num;
+ 	return ret;
+ }
++
+ /* TODO: do ast flush business
+  * TODO: do MIGRATING and RECOVERING spinning
+  */
+@@ -1482,7 +1623,7 @@ static int dlm_process_recovery_data(str
+ 	struct dlm_lock *newlock = NULL;
+ 	struct dlm_lockstatus *lksb = NULL;
+ 	int ret = 0;
+-	int i;
++	int i, bad;
+ 	struct list_head *iter;
+ 	struct dlm_lock *lock = NULL;
+ 
+@@ -1553,7 +1694,7 @@ static int dlm_process_recovery_data(str
+ 		lksb->flags |= (ml->flags &
+ 				(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
+ 			
+-		if (mres->lvb[0]) {
++		if (!dlm_lvb_is_empty(mres->lvb)) {
+ 			if (lksb->flags & DLM_LKSB_PUT_LVB) {
+ 				/* other node was trying to update
+ 				 * lvb when node died.  recreate the
+@@ -1564,8 +1705,9 @@ static int dlm_process_recovery_data(str
+ 				 * most recent valid lvb info */
+ 				BUG_ON(ml->type != LKM_EXMODE &&
+ 				       ml->type != LKM_PRMODE);
+-				if (res->lvb[0] && (ml->type == LKM_EXMODE ||
+-				    memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
++				if (!dlm_lvb_is_empty(res->lvb) && 
++				    (ml->type == LKM_EXMODE ||
++				     memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+ 					mlog(ML_ERROR, "received bad lvb!\n");
+ 					__dlm_print_one_lock_resource(res);
+ 					BUG();
+@@ -1591,9 +1733,33 @@ static int dlm_process_recovery_data(str
+ 		 * relative to each other, but clearly *not*
+ 		 * preserved relative to locks from other nodes.
+ 		 */
++		bad = 0;
+ 		spin_lock(&res->spinlock);
+-		dlm_lock_get(newlock);
+-		list_add_tail(&newlock->list, queue);
++		list_for_each_entry(lock, queue, list) {
++			if (lock->ml.cookie == ml->cookie) {
++				u64 c = lock->ml.cookie;
++				mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
++				     "exists on this lockres!\n", dlm->name,
++				     res->lockname.len, res->lockname.name,
++				     dlm_get_lock_cookie_node(c),
++				     dlm_get_lock_cookie_seq(c));
++		
++				mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
++				     "node=%u, cookie=%u:%llu, queue=%d\n", 
++	      			     ml->type, ml->convert_type, ml->node,
++				     dlm_get_lock_cookie_node(ml->cookie),
++				     dlm_get_lock_cookie_seq(ml->cookie),
++				     ml->list);
++
++				__dlm_print_one_lock_resource(res);
++				bad = 1;
++				break;
++			}
++		}
++		if (!bad) {
++			dlm_lock_get(newlock);
++			list_add_tail(&newlock->list, queue);
++		}
+ 		spin_unlock(&res->spinlock);
+ 	}
+ 	mlog(0, "done running all the locks\n");
+@@ -2048,7 +2214,7 @@ int dlm_pick_recovery_master(struct dlm_
+ 	struct dlm_lockstatus lksb;
+ 	int status = -EINVAL;
+ 
+-	mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
++	mlog(ML_NOTICE, "starting recovery of %s at %lu, dead=%u, this=%u\n",
+ 	     dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
+ again:	
+ 	memset(&lksb, 0, sizeof(lksb));
+@@ -2056,17 +2222,17 @@ again:	
+ 	ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
+ 		      DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
+ 
+-	mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
++	mlog(ML_NOTICE, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
+ 	     dlm->name, ret, lksb.status);
+ 
+ 	if (ret == DLM_NORMAL) {
+-		mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
++		mlog(ML_NOTICE, "dlm=%s dlmlock says I got it (this=%u)\n",
+ 		     dlm->name, dlm->node_num);
+ 		
+ 		/* got the EX lock.  check to see if another node 
+ 		 * just became the reco master */
+ 		if (dlm_reco_master_ready(dlm)) {
+-			mlog(0, "%s: got reco EX lock, but %u will "
++			mlog(ML_NOTICE, "%s: got reco EX lock, but %u will "
+ 			     "do the recovery\n", dlm->name,
+ 			     dlm->reco.new_master);
+ 			status = -EEXIST;
+@@ -2077,7 +2243,7 @@ again:	
+ 			spin_lock(&dlm->spinlock);
+ 			if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
+ 				status = -EINVAL;	
+-				mlog(0, "%s: got reco EX lock, but "
++				mlog(ML_NOTICE, "%s: got reco EX lock, but "
+ 				     "node got recovered already\n", dlm->name);
+ 				if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
+ 					mlog(ML_ERROR, "%s: new master is %u "
+@@ -2092,7 +2258,7 @@ again:	
+ 		/* if this node has actually become the recovery master,
+ 		 * set the master and send the messages to begin recovery */
+ 		if (!status) {
+-			mlog(0, "%s: dead=%u, this=%u, sending "
++			mlog(ML_NOTICE, "%s: dead=%u, this=%u, sending "
+ 			     "begin_reco now\n", dlm->name, 
+ 			     dlm->reco.dead_node, dlm->node_num);
+ 			status = dlm_send_begin_reco_message(dlm,
+@@ -2102,7 +2268,7 @@ again:	
+ 
+ 			/* set the new_master to this node */
+ 			spin_lock(&dlm->spinlock);
+-			dlm->reco.new_master = dlm->node_num;
++			dlm_set_reco_master(dlm, dlm->node_num);
+ 			spin_unlock(&dlm->spinlock);
+ 		}
+ 
+@@ -2123,7 +2289,7 @@ again:	
+ 			mlog(ML_ERROR, "dlmunlock returned %d\n", ret);
+ 		}
+ 	} else if (ret == DLM_NOTQUEUED) {
+-		mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
++		mlog(ML_NOTICE, "dlm=%s dlmlock says another node got it (this=%u)\n",
+ 		     dlm->name, dlm->node_num);
+ 		/* another node is master. wait on
+ 		 * reco.new_master != O2NM_INVALID_NODE_NUM 
+@@ -2132,12 +2298,12 @@ again:	
+ 					 dlm_reco_master_ready(dlm),
+ 					 msecs_to_jiffies(1000));
+ 		if (!dlm_reco_master_ready(dlm)) {
+-			mlog(0, "%s: reco master taking awhile\n",
++			mlog(ML_NOTICE, "%s: reco master taking awhile\n",
+ 			     dlm->name);
+ 			goto again;
+ 		}
+ 		/* another node has informed this one that it is reco master */
+-		mlog(0, "%s: reco master %u is ready to recover %u\n",
++		mlog(ML_NOTICE, "%s: reco master %u is ready to recover %u\n",
+ 		     dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
+ 		status = -EEXIST;
+ 	} else {
+@@ -2171,7 +2337,7 @@ static int dlm_send_begin_reco_message(s
+ 
+ 	mlog_entry("%u\n", dead_node);
+ 
+-	mlog(0, "dead node is %u\n", dead_node);
++	mlog(ML_NOTICE, "%s: dead node is %u\n", dlm->name, dead_node);
+ 
+ 	spin_lock(&dlm->spinlock);
+ 	dlm_node_iter_init(dlm->domain_map, &iter);
+@@ -2244,8 +2410,9 @@ int dlm_begin_reco_handler(struct o2net_
+ 	if (!dlm_grab(dlm))
+ 		return 0;
+ 
+-	mlog(0, "node %u wants to recover node %u\n",
+-		  br->node_idx, br->dead_node);
++	mlog(ML_NOTICE, "%s: node %u wants to recover node %u (%u:%u)\n",
++	     dlm->name, br->node_idx, br->dead_node,
++	     dlm->reco.dead_node, dlm->reco.new_master);
+ 
+ 	dlm_fire_domain_eviction_callbacks(dlm, br->dead_node);
+ 
+@@ -2267,8 +2434,8 @@ int dlm_begin_reco_handler(struct o2net_
+ 		     "node %u changing it to %u\n", dlm->name, 
+ 		     dlm->reco.dead_node, br->node_idx, br->dead_node);
+ 	}
+-	dlm->reco.new_master = br->node_idx;
+-	dlm->reco.dead_node = br->dead_node;
++	dlm_set_reco_master(dlm, br->node_idx);
++	dlm_set_reco_dead_node(dlm, br->dead_node);
+ 	if (!test_bit(br->dead_node, dlm->recovery_map)) {
+ 		mlog(0, "recovery master %u sees %u as dead, but this "
+ 		     "node has not yet.  marking %u as dead\n",
+@@ -2287,6 +2454,11 @@ int dlm_begin_reco_handler(struct o2net_
+ 	spin_unlock(&dlm->spinlock);
+ 
+ 	dlm_kick_recovery_thread(dlm);
++	
++	mlog(ML_NOTICE, "%s: recovery started by node %u, for %u (%u:%u)\n",
++	     dlm->name, br->node_idx, br->dead_node,
++	     dlm->reco.dead_node, dlm->reco.new_master);
++
+ 	dlm_put(dlm);
+ 	return 0;
+ }
+@@ -2299,7 +2471,7 @@ static int dlm_send_finalize_reco_messag
+ 	int nodenum;
+ 	int status;
+ 
+-	mlog(0, "finishing recovery for node %s:%u\n",
++	mlog(ML_NOTICE, "finishing recovery for node %s:%u\n",
+ 	     dlm->name, dlm->reco.dead_node);
+ 
+ 	spin_lock(&dlm->spinlock);
+@@ -2344,8 +2516,9 @@ int dlm_finalize_reco_handler(struct o2n
+ 	if (!dlm_grab(dlm))
+ 		return 0;
+ 
+-	mlog(0, "node %u finalizing recovery of node %u\n",
+-	     fr->node_idx, fr->dead_node);
++	mlog(ML_NOTICE, "%s: node %u finalizing recovery of node %u (%u:%u)\n",
++	     dlm->name, fr->node_idx, fr->dead_node,
++	     dlm->reco.dead_node, dlm->reco.new_master);
+ 
+ 	spin_lock(&dlm->spinlock);
+ 
+@@ -2369,6 +2542,9 @@ int dlm_finalize_reco_handler(struct o2n
+ 	dlm_reset_recovery(dlm);
+ 
+ 	dlm_kick_recovery_thread(dlm);
++	mlog(ML_NOTICE, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
++	     dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
++
+ 	dlm_put(dlm);
+ 	return 0;
+ }

Added: branches/ocfs2-1.2-cert/patches/mar24-create-lock-handler.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/mar24-create-lock-handler.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/mar24-create-lock-handler.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,39 @@
+Index: fs/ocfs2/dlm/dlmlock.c
+===================================================================
+--- fs/ocfs2/dlm/dlmlock.c.orig	2006-03-16 18:17:21.358926000 -0800
++++ fs/ocfs2/dlm/dlmlock.c	2006-03-24 15:20:26.372564000 -0800
+@@ -280,6 +280,14 @@ static enum dlm_status dlm_send_remote_l
+ 	if (tmpret >= 0) {
+ 		// successfully sent and received
+ 		ret = status;  // this is already a dlm_status
++		if (ret == DLM_RECOVERING) {
++			mlog(ML_ERROR, "%s:%.*s: BUG.  this is a stale lockres "
++			     "no longer owned by %u.  that node is coming back "
++			     "up currently.\n", dlm->name, create.namelen,
++			     create.name, res->owner);
++			dlm_print_one_lock_resource(res);
++			BUG();
++		}
+ 	} else {
+ 		mlog_errno(tmpret);
+ 		if (dlm_is_host_down(tmpret)) {
+@@ -428,11 +436,16 @@ int dlm_create_lock_handler(struct o2net
+ 	if (!dlm_grab(dlm))
+ 		return DLM_REJECTED;
+ 
+-	mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
+-			"Domain %s not fully joined!\n", dlm->name);
+-
+ 	name = create->name;
+ 	namelen = create->namelen;
++	status = DLM_RECOVERING;
++	if (!dlm_domain_fully_joined(dlm)) {
++		mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
++		     "sending a create_lock message for lock %.*s!\n",
++		     dlm->name, create->node_idx, namelen, name);
++		dlm_error(status);
++		goto leave;
++	}
+ 
+ 	status = DLM_IVBUFLEN;
+ 	if (namelen > DLM_LOCKID_NAME_MAX) {

Added: branches/ocfs2-1.2-cert/patches/mastery-restart-recovery.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/mastery-restart-recovery.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/mastery-restart-recovery.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,120 @@
+Index: fs/ocfs2/dlm/dlmmaster.c
+===================================================================
+--- fs/ocfs2/dlm/dlmmaster.c.orig	2006-03-27 14:23:30.045812000 -0800
++++ fs/ocfs2/dlm/dlmmaster.c	2006-03-27 14:23:30.283574000 -0800
+@@ -856,6 +856,7 @@ lookup:
+ 	spin_unlock(&dlm->master_lock);
+ 	spin_unlock(&dlm->spinlock);
+ 
++redo_request:
+ 	while (wait_on_recovery) {
+ 		/* any cluster changes that occurred after dropping the
+ 		 * dlm spinlock would be detectable be a change on the mle,
+@@ -893,7 +894,6 @@ lookup:
+ 	if (blocked)
+ 		goto wait;
+ 
+-redo_request:
+ 	ret = -EINVAL;
+ 	dlm_node_iter_init(mle->vote_map, &iter);
+ 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+@@ -918,7 +918,8 @@ wait:
+ 	/* keep going until the response map includes all nodes */
+ 	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
+ 	if (ret < 0) {
+-		mlog(0, "%s:%.*s: node map changed, redo the "
++		wait_on_recovery = 1;
++		mlog(ML_NOTICE, "%s:%.*s: node map changed, redo the "
+ 		     "master request now, blocked=%d\n",
+ 		     dlm->name, res->lockname.len,
+ 		     res->lockname.name, blocked);
+@@ -1199,7 +1200,60 @@ static int dlm_restart_lock_mastery(stru
+ 			set_bit(node, mle->vote_map);
+ 		} else {
+ 			mlog(ML_ERROR, "node down! %d\n", node);
++			if (blocked) {
++				int lowest = find_next_bit(mle->maybe_map,
++						       O2NM_MAX_NODES, 0);
++
++				/* act like it was never there */
++				clear_bit(node, mle->maybe_map);
+ 
++			       	if (node == lowest) {
++					mlog(ML_ERROR, "expected master %u died"
++					    " while this node was blocked "
++					    "waiting on it!\n", node);
++					lowest = find_next_bit(mle->maybe_map,
++						       	O2NM_MAX_NODES,
++						       	lowest+1);
++					if (lowest < O2NM_MAX_NODES) {
++						mlog(ML_NOTICE, "%s:%.*s:still "
++						     "blocked. waiting on %u "
++						     "now\n", dlm->name,
++						     res->lockname.len,
++						     res->lockname.name,
++						     lowest);
++					} else {
++						/* mle is an MLE_BLOCK, but 
++						 * there is now nothing left to
++						 * block on.  we need to return
++						 * all the way back out and try
++						 * again with an MLE_MASTER. 
++						 * dlm_do_local_recovery_cleanup
++						 * has already run, so the mle 
++						 * refcount is ok */
++						mlog(ML_NOTICE, "%s:%.*s: no "
++						     "longer blocking. try to "
++						     "master this here\n",
++						     dlm->name, 
++						     res->lockname.len,
++						     res->lockname.name);
++						mle->type = DLM_MLE_MASTER;
++						mle->u.res = res;
++					}
++				}
++			}
++
++			/* now blank out everything, as if we had never 
++			 * contacted anyone */
++			memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
++			memset(mle->response_map, 0, sizeof(mle->response_map));
++			/* reset the vote_map to the current node_map */
++			memcpy(mle->vote_map, mle->node_map,
++			       sizeof(mle->node_map));
++			/* put myself into the maybe map */
++			if (mle->type != DLM_MLE_BLOCK)
++				set_bit(dlm->node_num, mle->maybe_map);
++
++#if 0
+ 			/* if the node wasn't involved in mastery skip it,
+ 			 * but clear it out from the maps so that it will
+ 			 * not affect mastery of this lockres */
+@@ -1207,7 +1261,6 @@ static int dlm_restart_lock_mastery(stru
+ 			clear_bit(node, mle->vote_map);
+ 			if (!test_bit(node, mle->maybe_map))
+ 				goto next;
+-
+ 			/* if we're already blocked on lock mastery, and the
+ 			 * dead node wasn't the expected master, or there is
+ 			 * another node in the maybe_map, keep waiting */
+@@ -1253,7 +1306,6 @@ static int dlm_restart_lock_mastery(stru
+ 				ret = -EAGAIN;
+ 				goto next;
+ 			}
+-
+ 			clear_bit(node, mle->maybe_map);
+ 			if (node > dlm->node_num)
+ 				goto next;
+@@ -1263,9 +1315,12 @@ static int dlm_restart_lock_mastery(stru
+ 			 * in the vote_map, removing this node. */
+ 			memset(mle->response_map, 0,
+ 			       sizeof(mle->response_map));
++#endif
+ 		}
+ 		ret = -EAGAIN;
++#if 0
+ next:
++#endif
+ 		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
+ 	}
+ 	return ret;

Added: branches/ocfs2-1.2-cert/patches/ocfs2-disable_lvbs1.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2-disable_lvbs1.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2-disable_lvbs1.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,13 @@
+Index: fs/ocfs2/dlmglue.c
+===================================================================
+--- fs/ocfs2/dlmglue.c	(revision 2786)
++++ fs/ocfs2/dlmglue.c	(working copy)
+@@ -1312,6 +1312,8 @@ static inline int ocfs2_meta_lvb_is_trus
+ {
+ 	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+ 
++	return 0;
++
+ 	/* Old OCFS2 versions stored a "sequence" in the lvb to
+ 	 * determine whether the information could be trusted. We
+ 	 * don't want to use an lvb populated from a node running the

Added: branches/ocfs2-1.2-cert/patches/ocfs2-extend_file_more_info_on_oops.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2-extend_file_more_info_on_oops.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2-extend_file_more_info_on_oops.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,40 @@
+Index: fs/ocfs2/file.c
+===================================================================
+--- fs/ocfs2/file.c	(revision 2787)
++++ fs/ocfs2/file.c	(working copy)
+@@ -778,18 +778,23 @@ restart_all:
+ 		status = -EIO;
+ 		goto leave;
+ 	}
+-	mlog_bug_on_msg(i_size_read(inode) !=
+-			(le64_to_cpu(fe->i_size) - *bytes_extended),
+-			"Inode %"MLFu64" i_size = %lld, dinode i_size "
+-			"= %"MLFu64", bytes_extended = %"MLFu64", new_i_size "
+-			"= %"MLFu64"\n", OCFS2_I(inode)->ip_blkno,
+-			i_size_read(inode), le64_to_cpu(fe->i_size),
+-			*bytes_extended, new_i_size);
+-	mlog_bug_on_msg(new_i_size < i_size_read(inode),
+-			"Inode %"MLFu64", i_size = %lld, new sz = %"MLFu64"\n",
+-			OCFS2_I(inode)->ip_blkno, i_size_read(inode),
+-			new_i_size);
+-
++	if (i_size_read(inode) != (le64_to_cpu(fe->i_size) - *bytes_extended)) {
++		mlog(ML_ERROR, "Inode %"MLFu64" i_size = %lld, dinode i_size "
++		     "= %"MLFu64", bytes_extended = %"MLFu64", new_i_size "
++		     "= %"MLFu64"\n", OCFS2_I(inode)->ip_blkno,
++		     i_size_read(inode), le64_to_cpu(fe->i_size),
++		     *bytes_extended, new_i_size);
++		mlog_meta_lvb(ML_ERROR, &OCFS2_I(inode)->ip_meta_lockres);
++		BUG();
++	}
++	if (new_i_size < i_size_read(inode)) {
++		mlog(ML_ERROR,
++		     "Inode %"MLFu64", i_size = %lld, new sz = %"MLFu64"\n",
++		     OCFS2_I(inode)->ip_blkno, i_size_read(inode),
++		     new_i_size);
++		mlog_meta_lvb(ML_ERROR, &OCFS2_I(inode)->ip_meta_lockres);
++		BUG();
++	}
+ 	if (i_size_read(inode) == new_i_size)
+   		goto leave;
+ 

Added: branches/ocfs2-1.2-cert/patches/ocfs2-journal_start_stop_msgs.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2-journal_start_stop_msgs.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2-journal_start_stop_msgs.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,37 @@
+Index: fs/ocfs2/journal.c
+===================================================================
+--- fs/ocfs2/journal.c	(revision 2787)
++++ fs/ocfs2/journal.c	(working copy)
+@@ -1029,6 +1031,8 @@ static int __ocfs2_recovery_thread(void 
+ 	}
+ 
+ restart:
++
++	mlog(ML_NOTICE, "Begin recovery pass on volume %s\n", osb->uuid_str);
+ 	status = ocfs2_super_lock(osb, 1);
+ 	if (status < 0) {
+ 		mlog_errno(status);
+@@ -1043,6 +1047,7 @@ restart:
+ 			break;
+ 		}
+ 
++		mlog(ML_NOTICE, "Try to recover node %d\n", node_num);
+ 		status = ocfs2_recover_node(osb, node_num);
+ 		if (status < 0) {
+ 			mlog(ML_ERROR,
+@@ -1052,11 +1057,13 @@ restart:
+ 			mlog(ML_ERROR, "Volume requires unmount.\n");
+ 			continue;
+ 		}
+-
++		mlog(ML_NOTICE, "Remove %d from recovery map\n", node_num);
+ 		ocfs2_recovery_map_clear(osb, node_num);
+ 	}
+ 	ocfs2_super_unlock(osb, 1);
+ 
++	mlog(ML_NOTICE, "Complete recovery pass on volume %s\n", osb->uuid_str);
++	
+ 	/* We always run recovery on our own orphan dir - the dead
+ 	 * node(s) may have voted "no" on an inode delete earlier. A
+ 	 * revote is therefore required. */
+

Added: branches/ocfs2-1.2-cert/patches/ocfs2-reco_nofs.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2-reco_nofs.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2-reco_nofs.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,40 @@
+Index: fs/ocfs2/journal.c
+===================================================================
+--- fs/ocfs2/journal.c	(revision 2787)
++++ fs/ocfs2/journal.c	(working copy)
+@@ -858,9 +858,11 @@ static int ocfs2_force_read_journal(stru
+ 		if (p_blocks > CONCURRENT_JOURNAL_FILL)
+ 			p_blocks = CONCURRENT_JOURNAL_FILL;
+ 
++		/* We are reading journal data which should not
++		 * be put in the uptodate cache */
+ 		status = ocfs2_read_blocks(OCFS2_SB(inode->i_sb),
+ 					   p_blkno, p_blocks, bhs, 0,
+-					   inode);
++					   NULL);
+ 		if (status < 0) {
+ 			mlog_errno(status);
+ 			goto bail;
+Index: fs/ocfs2/uptodate.c
+===================================================================
+--- fs/ocfs2/uptodate.c	(revision 2787)
++++ fs/ocfs2/uptodate.c	(working copy)
+@@ -335,7 +335,7 @@ static void __ocfs2_set_buffer_uptodate(
+ 	mlog(0, "Inode %"MLFu64", block %llu, expand = %d\n",
+ 	     oi->ip_blkno, (unsigned long long) block, expand_tree);
+ 
+-	new = kmem_cache_alloc(ocfs2_uptodate_cachep, GFP_KERNEL);
++	new = kmem_cache_alloc(ocfs2_uptodate_cachep, GFP_NOFS);
+ 	if (!new) {
+ 		mlog_errno(-ENOMEM);
+ 		return;
+@@ -347,7 +347,7 @@ static void __ocfs2_set_buffer_uptodate(
+ 		 * has no way of tracking that. */
+ 		for(i = 0; i < OCFS2_INODE_MAX_CACHE_ARRAY; i++) {
+ 			tree[i] = kmem_cache_alloc(ocfs2_uptodate_cachep,
+-						   GFP_KERNEL);
++						   GFP_NOFS);
+ 			if (!tree[i]) {
+ 				mlog_errno(-ENOMEM);
+ 				goto out_free;
+

Added: branches/ocfs2-1.2-cert/patches/ocfs2-validate_lvb_contents1.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2-validate_lvb_contents1.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2-validate_lvb_contents1.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,62 @@
+Index: fs/ocfs2/dlmglue.c
+===================================================================
+--- fs/ocfs2/dlmglue.c	(revision 2787)
++++ fs/ocfs2/dlmglue.c	(working copy)
+@@ -1377,6 +1377,32 @@ static inline void ocfs2_complete_lock_r
+ 	mlog_exit_void();
+ }
+ 
++static void ocfs2_validate_lvb(struct ocfs2_lock_res *lockres,
++			       struct ocfs2_dinode *di)
++{
++	struct ocfs2_meta_lvb *lvb;
++
++	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
++	if (be32_to_cpu(lvb->lvb_iclusters) != le32_to_cpu(di->i_clusters)
++	    || be64_to_cpu(lvb->lvb_isize) != le64_to_cpu(di->i_size)
++	    || be32_to_cpu(lvb->lvb_iuid) != le32_to_cpu(di->i_uid)
++	    || be32_to_cpu(lvb->lvb_igid) != le32_to_cpu(di->i_gid)
++	    || be16_to_cpu(lvb->lvb_imode) != le16_to_cpu(di->i_mode)
++	    || be16_to_cpu(lvb->lvb_inlink) != le16_to_cpu(di->i_links_count)) {
++		mlog(ML_ERROR, "LVB and disk information for inode %llu don't "
++		     "match!\n", (unsigned long long)le64_to_cpu(di->i_blkno));
++		mlog_meta_lvb(ML_ERROR, lockres);
++		mlog(ML_ERROR, "Dinode info: clusters %u, size %llu, uid %u, "
++		     "gid %u, mode %u, links count %u\n",
++		     le32_to_cpu(di->i_clusters),
++		     (unsigned long long)le64_to_cpu(di->i_size),
++		     le32_to_cpu(di->i_uid), le32_to_cpu(di->i_gid),
++		     le16_to_cpu(di->i_mode), le16_to_cpu(di->i_links_count));
++		mlog(ML_ERROR, "Lockres flags: 0x%lx level %d ro %u ex %u requested %u blocking %u\n", lockres->l_flags, lockres->l_level, lockres->l_ro_holders ,lockres->l_ex_holders,lockres->l_requested,lockres->l_blocking);
++		BUG();
++	}
++}
++
+ /* may or may not return a bh if it went to disk. */
+ static int ocfs2_meta_lock_update(struct inode *inode,
+ 				  struct buffer_head **bh)
+@@ -1412,7 +1438,8 @@ static int ocfs2_meta_lock_update(struct
+ 	 * map (directories, bitmap files, etc) */
+ 	ocfs2_extent_map_trunc(inode, 0);
+ 
+-	if (ocfs2_meta_lvb_is_trustable(lockres)) {
++//	if (ocfs2_meta_lvb_is_trustable(lockres)) {
++	if (0) {
+ 		mlog(0, "Trusting LVB on inode %"MLFu64"\n",
+ 		     oi->ip_blkno);
+ 		ocfs2_refresh_inode_from_lvb(inode);
+@@ -1453,7 +1480,13 @@ static int ocfs2_meta_lock_update(struct
+ 				le64_to_cpu(fe->i_dtime),
+ 				le32_to_cpu(fe->i_flags));
+ 
+-		ocfs2_refresh_inode(inode, fe);
++		if (ocfs2_meta_lvb_is_trustable(lockres)) {
++			/* Refresh from lvb to maintain the same behavior */
++			ocfs2_validate_lvb(lockres, fe);
++			ocfs2_refresh_inode_from_lvb(inode);
++		} else {
++			ocfs2_refresh_inode(inode, fe);
++		}
+ 	}
+ 
+ #ifdef OCFS2_DELETE_INODE_WORKAROUND

Added: branches/ocfs2-1.2-cert/patches/ocfs2_dlm-do_lvb_puts_inline2.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2_dlm-do_lvb_puts_inline2.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2_dlm-do_lvb_puts_inline2.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,40 @@
+Index: fs/ocfs2/dlm/dlmconvert.c
+===================================================================
+--- fs/ocfs2/dlm/dlmconvert.c	(revision 2787)
++++ fs/ocfs2/dlm/dlmconvert.c	(working copy)
+@@ -214,6 +214,9 @@ grant:
+ 	if (lock->ml.node == dlm->node_num)
+ 		mlog(0, "doing in-place convert for nonlocal lock\n");
+ 	lock->ml.type = type;
++	if (lock->lksb->flags & DLM_LKSB_PUT_LVB)
++		memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN);
++
+ 	status = DLM_NORMAL;
+ 	*call_ast = 1;
+ 	goto unlock_exit;
+Index: fs/ocfs2/dlm/dlmast.c
+===================================================================
+--- fs/ocfs2/dlm/dlmast.c	(revision 2787)
++++ fs/ocfs2/dlm/dlmast.c	(working copy)
+@@ -197,12 +197,20 @@ static void dlm_update_lvb(struct dlm_ct
+ 				  lock->ml.node == dlm->node_num ? "master" :
+ 				  "remote");
+ 			memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN);
+-		} else if (lksb->flags & DLM_LKSB_PUT_LVB) {
++		}
++		/* Do nothing for lvb put requests - they should be
++		 * done in place when the lock is downconverted -
++		 * otherwise we risk racing gets and puts which could
++		 * result in old lvb data being propagated. We leave
++		 * the flag set however ... */
++#if 0
++		else if (lksb->flags & DLM_LKSB_PUT_LVB) {
+ 			mlog(0, "setting lvb from lockres for %s node\n",
+ 				  lock->ml.node == dlm->node_num ? "master" :
+ 				  "remote");
+ 			memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
+ 		}
++#endif
+ 		spin_unlock(&res->spinlock);
+ 	}
+ 

Added: branches/ocfs2-1.2-cert/patches/series
===================================================================
--- branches/ocfs2-1.2-cert/patches/series	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/series	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,16 @@
+em-silence-eexist.patch -p0
+mar20-full-3.patch -p0
+ocfs2-extend_file_more_info_on_oops.patch -p0
+ocfs2-journal_start_stop_msgs.patch -p0
+ocfs2-reco_nofs.patch -p0
+ocfs2_dlm-do_lvb_puts_inline2.patch -p0
+lockres-release-info.patch 
+debug-mastery.patch 
+hold-recovery-ref.patch 
+two-stage-finalize.patch -p0
+fix-purge-lockres.patch -p0
+dlm-eloop.patch -p0
+lvb-recovery-fix.patch -p0
+mar24-create-lock-handler.patch -p0
+mastery-restart-recovery.patch -p0
+leave-other-dead-nodes-on-recovery-list.patch 

Added: branches/ocfs2-1.2-cert/patches/two-stage-finalize.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/two-stage-finalize.patch	2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/two-stage-finalize.patch	2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,332 @@
+Index: fs/ocfs2/dlm/dlmcommon.h
+===================================================================
+--- fs/ocfs2/dlm/dlmcommon.h.orig	2006-03-22 14:36:02.379128000 -0800
++++ fs/ocfs2/dlm/dlmcommon.h	2006-03-22 14:36:02.448059000 -0800
+@@ -61,7 +61,8 @@ static inline int dlm_is_recovery_lock(c
+ 	return 0;
+ }
+ 
+-#define DLM_RECO_STATE_ACTIVE  0x0001
++#define DLM_RECO_STATE_ACTIVE    0x0001
++#define DLM_RECO_STATE_FINALIZE  0x0002
+ 
+ struct dlm_recovery_ctxt
+ {
+@@ -618,7 +619,8 @@ struct dlm_finalize_reco
+ {
+ 	u8 node_idx;
+ 	u8 dead_node;
+-	__be16 pad1;
++	u8 flags;
++	u8 pad1;
+ 	__be32 pad2;
+ };
+ 
+Index: fs/ocfs2/dlm/dlmrecovery.c
+===================================================================
+--- fs/ocfs2/dlm/dlmrecovery.c.orig	2006-03-22 14:36:02.372135000 -0800
++++ fs/ocfs2/dlm/dlmrecovery.c	2006-03-22 14:36:02.462056000 -0800
+@@ -134,12 +134,18 @@ static inline void dlm_set_reco_master(s
+ 	dlm->reco.new_master = master;
+ }
+ 
+-static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
++static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm)
+ {
+-	spin_lock(&dlm->spinlock);
++	assert_spin_locked(&dlm->spinlock);
+ 	clear_bit(dlm->reco.dead_node, dlm->recovery_map);
+ 	dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
+ 	dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
++}
++
++static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
++{
++	spin_lock(&dlm->spinlock);
++	__dlm_reset_recovery(dlm);
+ 	spin_unlock(&dlm->spinlock);
+ }
+ 
+@@ -380,7 +386,7 @@ void dlm_wait_for_recovery(struct dlm_ct
+ 		     dlm->name, dlm->dlm_reco_thread_task->pid,
+ 		     dlm->reco.state, dlm->reco.new_master,
+ 		     dlm->reco.dead_node);
+-		dlm_print_reco_junk(dlm);
++		//dlm_print_reco_junk(dlm);
+ 	}
+ 		
+ 	while (1) {
+@@ -393,7 +399,7 @@ void dlm_wait_for_recovery(struct dlm_ct
+ 		     dlm->name, dlm->dlm_reco_thread_task->pid,
+ 		     dlm->reco.state, dlm->reco.new_master,
+ 		     dlm->reco.dead_node);
+-		dlm_print_reco_junk(dlm);
++		//dlm_print_reco_junk(dlm);
+ 	}
+ 	// wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
+ }
+@@ -429,6 +435,7 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+ 		     dlm->reco.new_master, dlm->reco.dead_node);
+ 		/* unset the new_master, leave dead_node */
+ 		dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
++#warning need to make a decision here whether to move lockreses off of dlm->reco.resources or leave them
+ 	}
+ 
+ 	/* select a target to recover */
+@@ -521,6 +528,7 @@ static int dlm_remaster_locks(struct dlm
+ 	int all_nodes_done;
+ 	int destroy = 0;
+ 	int pass = 0;
++	unsigned long long mlg;
+ 
+ 	status = dlm_init_recovery_area(dlm, dead_node);
+ 	if (status < 0)
+@@ -559,9 +567,9 @@ static int dlm_remaster_locks(struct dlm
+ 				BUG();
+ 				break;
+ 			case DLM_RECO_NODE_DATA_DEAD:
+-				mlog(0, "node %u died after requesting "
+-				     "recovery info for node %u\n",
+-				     ndata->node_num, dead_node);
++				mlog(ML_ERROR, "%s:node %u died after "
++				     "requesting recovery info for node %u\n",
++				     dlm->name, ndata->node_num, dead_node);
+ 				// start all over
+ 				destroy = 1;
+ 				status = -EAGAIN;
+@@ -593,6 +601,7 @@ static int dlm_remaster_locks(struct dlm
+ 	while (1) {
+ 		/* check all the nodes now to see if we are
+ 		 * done, or if anyone died */
++		pass++;
+ 		all_nodes_done = 1;
+ 		spin_lock(&dlm_reco_state_lock);
+ 		list_for_each(iter, &dlm->reco.node_data) {
+@@ -633,7 +642,13 @@ static int dlm_remaster_locks(struct dlm
+ 					goto leave;
+ 				case DLM_RECO_NODE_DATA_RECEIVING:
+ 				case DLM_RECO_NODE_DATA_REQUESTED:
+-					mlog(0, "%s: node %u still in state %s\n",
++					if (pass % 1000)
++						mlg = ML_ERROR;
++					else if (pass % 100 == 0)
++						mlg = ML_NOTICE;
++					else
++						mlg = 0;
++					mlog(mlg, "%s: node %u still in state %s\n",
+ 					     dlm->name, ndata->node_num,
+ 					     ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?
+ 					     "receiving" : "requested");
+@@ -651,7 +666,7 @@ static int dlm_remaster_locks(struct dlm
+ 		}
+ 		spin_unlock(&dlm_reco_state_lock);
+ 
+-		mlog(0, "pass #%d, all_nodes_done?: %s\n", ++pass,
++		mlog(0, "pass #%d, all_nodes_done?: %s\n", pass,
+ 		     all_nodes_done?"yes":"no");
+ 		if (all_nodes_done) {
+ 			int ret;
+@@ -1708,8 +1723,19 @@ static int dlm_process_recovery_data(str
+ 				if (!dlm_lvb_is_empty(res->lvb) && 
+ 				    (ml->type == LKM_EXMODE ||
+ 				     memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+-					mlog(ML_ERROR, "received bad lvb!\n");
+-					__dlm_print_one_lock_resource(res);
++					int i;
++					mlog(ML_ERROR, "%s:%.*s: received bad "
++					     "lvb! type=%d\n", dlm->name,
++					     res->lockname.len, 
++					     res->lockname.name, ml->type);
++					printk("lockres lvb=[");
++					for (i=0; i<DLM_LVB_LEN; i++)
++						printk("%02x", res->lvb[i]);
++					printk("]\nmigrated lvb=[");
++					for (i=0; i<DLM_LVB_LEN; i++)
++						printk("%02x", mres->lvb[i]);
++					printk("]\n");
++					dlm_print_one_lock_resource(res);
+ 					BUG();
+ 				}
+ 				memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
+@@ -2099,6 +2125,20 @@ void __dlm_hb_node_down(struct dlm_ctxt 
+ {
+ 	assert_spin_locked(&dlm->spinlock);
+ 
++	if (dlm->reco.new_master == idx) {
++		mlog(ML_NOTICE, "%s: recovery master %d just died\n",
++		     dlm->name, idx);
++		if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
++			/* finalize1 was reached, so it is safe to clear
++			 * the new_master and dead_node.  that recovery
++			 * is complete. */
++			mlog(ML_NOTICE, "%s: dead master %d had reached "
++			     "finalize1 state, clearing\n", dlm->name, idx);
++			dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
++			__dlm_reset_recovery(dlm);
++		}
++	}
++
+ 	/* check to see if the node is already considered dead */
+ 	if (!test_bit(idx, dlm->live_nodes_map)) {
+ 		mlog(0, "for domain %s, node %d is already dead. "
+@@ -2405,6 +2445,13 @@ retry:
+ 			 * another ENOMEM */
+ 			msleep(100);
+ 			goto retry;
++		} else if (ret == EAGAIN) {
++			mlog(ML_NOTICE, "%s: trying to start recovery of node "
++			     "%u, but node %u is waiting for last recovery "
++			     "to complete, backoff for a bit\n", dlm->name,
++			     dead_node, nodenum);
++			msleep(100);
++			goto retry;
+ 		}
+ 	}
+ 
+@@ -2419,6 +2466,17 @@ int dlm_begin_reco_handler(struct o2net_
+ 	/* ok to return 0, domain has gone away */
+ 	if (!dlm_grab(dlm))
+ 		return 0;
++	
++	spin_lock(&dlm->spinlock);
++	if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
++		mlog(ML_NOTICE, "%s: node %u wants to recover node %u (%u:%u) "
++		     "but this node is in finalize state, waiting on finalize2\n",
++		     dlm->name, br->node_idx, br->dead_node,
++		     dlm->reco.dead_node, dlm->reco.new_master);
++		spin_unlock(&dlm->spinlock);
++		return EAGAIN;
++	}
++	spin_unlock(&dlm->spinlock);
+ 
+ 	mlog(ML_NOTICE, "%s: node %u wants to recover node %u (%u:%u)\n",
+ 	     dlm->name, br->node_idx, br->dead_node,
+@@ -2473,6 +2531,7 @@ int dlm_begin_reco_handler(struct o2net_
+ 	return 0;
+ }
+ 
++#define DLM_FINALIZE_STAGE2  0x01
+ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
+ {
+ 	int ret = 0;
+@@ -2480,25 +2539,31 @@ static int dlm_send_finalize_reco_messag
+ 	struct dlm_node_iter iter;
+ 	int nodenum;
+ 	int status;
++	int stage = 1;
+ 
+-	mlog(ML_NOTICE, "finishing recovery for node %s:%u\n",
+-	     dlm->name, dlm->reco.dead_node);
++	mlog(ML_NOTICE, "finishing recovery for node %s:%u, "
++	     "stage %d\n", dlm->name, dlm->reco.dead_node, stage);
+ 
+ 	spin_lock(&dlm->spinlock);
+ 	dlm_node_iter_init(dlm->domain_map, &iter);
+ 	spin_unlock(&dlm->spinlock);
+ 
++stage2:
+ 	memset(&fr, 0, sizeof(fr));
+ 	fr.node_idx = dlm->node_num;
+ 	fr.dead_node = dlm->reco.dead_node;
++	if (stage == 2)
++		fr.flags |= DLM_FINALIZE_STAGE2; 
+ 
+ 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+ 		if (nodenum == dlm->node_num)
+ 			continue;
+ 		ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
+ 					 &fr, sizeof(fr), nodenum, &status);
+-		if (ret >= 0) {
++		if (ret >= 0)
+ 			ret = status;
++		if (ret < 0) {
++			mlog_errno(ret);
+ 			if (dlm_is_host_down(ret)) {
+ 				/* this has no effect on this recovery 
+ 				 * session, so set the status to zero to 
+@@ -2507,12 +2572,15 @@ static int dlm_send_finalize_reco_messag
+ 				     "node finished recovery.\n", nodenum);
+ 				ret = 0;
+ 			}
+-		}
+-		if (ret < 0) {
+-			mlog_errno(ret);
+ 			break;
+ 		}
+ 	}
++	if (stage == 1) {
++		/* reset the node_iter back to the top and send finalize2 */
++		iter.curnode = -1;
++		stage = 2;
++		goto stage2;
++	}
+ 
+ 	return ret;
+ }
+@@ -2521,14 +2589,18 @@ int dlm_finalize_reco_handler(struct o2n
+ {
+ 	struct dlm_ctxt *dlm = data;
+ 	struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
++	int stage = 1;
+ 
+ 	/* ok to return 0, domain has gone away */
+ 	if (!dlm_grab(dlm))
+ 		return 0;
+ 
+-	mlog(ML_NOTICE, "%s: node %u finalizing recovery of node %u (%u:%u)\n",
+-	     dlm->name, fr->node_idx, fr->dead_node,
+-	     dlm->reco.dead_node, dlm->reco.new_master);
++	if (fr->flags & DLM_FINALIZE_STAGE2)
++		stage = 2;
++
++	mlog(ML_NOTICE, "%s: node %u finalizing recovery stage%d of "
++	    "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
++	    fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
+ 
+ 	spin_lock(&dlm->spinlock);
+ 
+@@ -2545,13 +2617,38 @@ int dlm_finalize_reco_handler(struct o2n
+ 		BUG();
+ 	}
+ 
+-	dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
+-
+-	spin_unlock(&dlm->spinlock);
+-
+-	dlm_reset_recovery(dlm);
++	switch (stage) {
++		case 1:
++			dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
++			if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
++				mlog(ML_ERROR, "%s: received finalize1 from "
++				     "new master %u for dead node %u, but "
++				     "this node has already received it!\n",
++				     dlm->name, fr->node_idx, fr->dead_node);
++				dlm_print_reco_junk(dlm);
++				BUG();
++			}
++			dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
++			spin_unlock(&dlm->spinlock);
++			break;
++		case 2:
++			if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) {
++				mlog(ML_ERROR, "%s: received finalize2 from "
++				     "new master %u for dead node %u, but "
++				     "this node did not have finalize1!\n",
++				     dlm->name, fr->node_idx, fr->dead_node);
++				dlm_print_reco_junk(dlm);
++				BUG();
++			}
++			dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
++			spin_unlock(&dlm->spinlock);
++			dlm_reset_recovery(dlm);
++			dlm_kick_recovery_thread(dlm);
++			break;
++		default:
++			BUG();
++	}
+ 
+-	dlm_kick_recovery_thread(dlm);
+ 	mlog(ML_NOTICE, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
+ 	     dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
+ 




More information about the Ocfs2-commits mailing list