[Ocfs2-commits] khackel commits r2526 - branches/ocfs2-1.0/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Thu Aug 18 16:55:12 CDT 2005


Author: khackel
Signed-off-by: mfasheh
Date: 2005-08-18 16:55:10 -0500 (Thu, 18 Aug 2005)
New Revision: 2526

Modified:
   branches/ocfs2-1.0/fs/ocfs2/dlm/dlmast.c
   branches/ocfs2-1.0/fs/ocfs2/dlm/dlmcommon.h
   branches/ocfs2-1.0/fs/ocfs2/dlm/dlmconvert.c
   branches/ocfs2-1.0/fs/ocfs2/dlm/dlmdebug.c
   branches/ocfs2-1.0/fs/ocfs2/dlm/dlmdomain.c
   branches/ocfs2-1.0/fs/ocfs2/dlm/dlmlock.c
   branches/ocfs2-1.0/fs/ocfs2/dlm/dlmmaster.c
   branches/ocfs2-1.0/fs/ocfs2/dlm/dlmrecovery.c
   branches/ocfs2-1.0/fs/ocfs2/dlm/dlmthread.c
Log:
* merge from r2525 in HEAD:
  - fix mle node up/down
  - fix hang on migration

Signed-off-by: mfasheh



Modified: branches/ocfs2-1.0/fs/ocfs2/dlm/dlmast.c
===================================================================
--- branches/ocfs2-1.0/fs/ocfs2/dlm/dlmast.c	2005-08-18 21:47:09 UTC (rev 2525)
+++ branches/ocfs2-1.0/fs/ocfs2/dlm/dlmast.c	2005-08-18 21:55:10 UTC (rev 2526)
@@ -98,6 +98,11 @@
 	BUG_ON(!lock);
 
 	assert_spin_locked(&dlm->ast_lock);
+	if (!list_empty(&lock->ast_list)) {
+		mlog(ML_ERROR, "ast list not empty!!  pending=%d, newlevel=%d\n",
+		     lock->ast_pending, lock->ml.type);
+		BUG();
+	}
 	BUG_ON(!list_empty(&lock->ast_list));
 	if (lock->ast_pending)
 		mlog(0, "lock has an ast getting flushed right now\n");
@@ -235,7 +240,6 @@
 	dlm_bastlockfunc_t *fn = lock->bast;
 
 	mlog_entry_void();
-
 	BUG_ON(lock->ml.node != dlm->node_num);
 
 	(*fn)(lock->astdata, blocked_type);

Modified: branches/ocfs2-1.0/fs/ocfs2/dlm/dlmcommon.h
===================================================================
--- branches/ocfs2-1.0/fs/ocfs2/dlm/dlmcommon.h	2005-08-18 21:47:09 UTC (rev 2525)
+++ branches/ocfs2-1.0/fs/ocfs2/dlm/dlmcommon.h	2005-08-18 21:55:10 UTC (rev 2526)
@@ -125,6 +125,7 @@
 	wait_queue_head_t dlm_thread_wq;
 	wait_queue_head_t dlm_reco_thread_wq;
 	wait_queue_head_t ast_wq;
+	wait_queue_head_t migration_wq;
 
 	struct work_struct dispatched_work;
 	struct list_head work_list;
@@ -380,7 +381,6 @@
 	DLM_FINALIZE_RECO_MSG	 /* 518 */
 };
 
-
 struct dlm_reco_node_data
 {
 	int state;
@@ -973,13 +973,15 @@
 int dlm_hb_node_dead(struct dlm_ctxt *dlm, int node);
 int __dlm_hb_node_dead(struct dlm_ctxt *dlm, int node);
 
+int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
 int dlm_migrate_lockres(struct dlm_ctxt *dlm,
 			struct dlm_lock_resource *res,
 			u8 target);
 int dlm_finish_migration(struct dlm_ctxt *dlm,
 			 struct dlm_lock_resource *res,
 			 u8 old_master);
-void dlm_lockres_release_ast(struct dlm_lock_resource *res);
+void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
+			     struct dlm_lock_resource *res);
 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res);
 
 int dlm_master_request_handler(o2net_msg *msg, u32 len, void *data);
@@ -991,6 +993,7 @@
 int dlm_reco_data_done_handler(o2net_msg *msg, u32 len, void *data);
 int dlm_begin_reco_handler(o2net_msg *msg, u32 len, void *data);
 int dlm_finalize_reco_handler(o2net_msg *msg, u32 len, void *data);
+void dlm_print_one_mle(struct dlm_master_list_entry *mle);
 
 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
 			       struct dlm_lock_resource *res,

Modified: branches/ocfs2-1.0/fs/ocfs2/dlm/dlmconvert.c
===================================================================
--- branches/ocfs2-1.0/fs/ocfs2/dlm/dlmconvert.c	2005-08-18 21:47:09 UTC (rev 2525)
+++ branches/ocfs2-1.0/fs/ocfs2/dlm/dlmconvert.c	2005-08-18 21:55:10 UTC (rev 2526)
@@ -101,7 +101,7 @@
 	if (call_ast)
 		dlm_queue_ast(dlm, lock);
 	else
-		dlm_lockres_release_ast(res);
+		dlm_lockres_release_ast(dlm, res);
 
 	if (kick_thread)
 		dlm_kick_thread(dlm, res);
@@ -236,6 +236,9 @@
 
 unlock_exit:
 	spin_unlock(&lock->spinlock);
+	if (status == DLM_DENIED) {
+		__dlm_print_one_lock_resource(res);
+	}
 	if (status == DLM_NORMAL)
 		*kick_thread = 1;
 	return status;
@@ -401,7 +404,6 @@
 	return ret;
 }
 
-
 /* handler for DLM_CONVERT_LOCK_MSG on master site
  * locking:
  *   caller needs:  none
@@ -517,7 +519,7 @@
 	if (call_ast)
 		dlm_queue_ast(dlm, lock);
 	else
-		dlm_lockres_release_ast(res);
+		dlm_lockres_release_ast(dlm, res);
 
 	if (kick_thread)
 		dlm_kick_thread(dlm, res);

Modified: branches/ocfs2-1.0/fs/ocfs2/dlm/dlmdebug.c
===================================================================
--- branches/ocfs2-1.0/fs/ocfs2/dlm/dlmdebug.c	2005-08-18 21:47:09 UTC (rev 2525)
+++ branches/ocfs2-1.0/fs/ocfs2/dlm/dlmdebug.c	2005-08-18 21:55:10 UTC (rev 2526)
@@ -243,9 +243,12 @@
 		lock = list_entry(iter2, struct dlm_lock, list);
 		spin_lock(&lock->spinlock);
 		mlog(ML_NOTICE, "    type=%d, conv=%d, node=%u, "
-		       "cookie=%"MLFu64"\n", lock->ml.type,
-		       lock->ml.convert_type, lock->ml.node,
-		       lock->ml.cookie);
+		       "cookie=%"MLFu64", ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 
+		       lock->ml.type, lock->ml.convert_type, lock->ml.node, lock->ml.cookie, 
+		       list_empty(&lock->ast_list) ? 'y' : 'n',
+		       lock->ast_pending ? 'y' : 'n',
+		       list_empty(&lock->bast_list) ? 'y' : 'n',
+		       lock->bast_pending ? 'y' : 'n');
 		spin_unlock(&lock->spinlock);
 	}
 	mlog(ML_NOTICE, "  converting queue: \n");
@@ -253,9 +256,12 @@
 		lock = list_entry(iter2, struct dlm_lock, list);
 		spin_lock(&lock->spinlock);
 		mlog(ML_NOTICE, "    type=%d, conv=%d, node=%u, "
-		       "cookie=%"MLFu64"\n", lock->ml.type,
-		       lock->ml.convert_type, lock->ml.node,
-		       lock->ml.cookie);
+		       "cookie=%"MLFu64", ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 
+		       lock->ml.type, lock->ml.convert_type, lock->ml.node, lock->ml.cookie, 
+		       list_empty(&lock->ast_list) ? 'y' : 'n',
+		       lock->ast_pending ? 'y' : 'n',
+		       list_empty(&lock->bast_list) ? 'y' : 'n',
+		       lock->bast_pending ? 'y' : 'n');
 		spin_unlock(&lock->spinlock);
 	}
 	mlog(ML_NOTICE, "  blocked queue: \n");
@@ -263,9 +269,12 @@
 		lock = list_entry(iter2, struct dlm_lock, list);
 		spin_lock(&lock->spinlock);
 		mlog(ML_NOTICE, "    type=%d, conv=%d, node=%u, "
-		       "cookie=%"MLFu64"\n", lock->ml.type,
-		       lock->ml.convert_type, lock->ml.node,
-		       lock->ml.cookie);
+		       "cookie=%"MLFu64", ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 
+		       lock->ml.type, lock->ml.convert_type, lock->ml.node, lock->ml.cookie, 
+		       list_empty(&lock->ast_list) ? 'y' : 'n',
+		       lock->ast_pending ? 'y' : 'n',
+		       list_empty(&lock->bast_list) ? 'y' : 'n',
+		       lock->bast_pending ? 'y' : 'n');
 		spin_unlock(&lock->spinlock);
 	}
 }

Modified: branches/ocfs2-1.0/fs/ocfs2/dlm/dlmdomain.c
===================================================================
--- branches/ocfs2-1.0/fs/ocfs2/dlm/dlmdomain.c	2005-08-18 21:47:09 UTC (rev 2525)
+++ branches/ocfs2-1.0/fs/ocfs2/dlm/dlmdomain.c	2005-08-18 21:55:10 UTC (rev 2526)
@@ -308,6 +308,7 @@
 	struct dlm_lock_resource *res;
 
 	mlog(0, "Migrating locks from domain %s\n", dlm->name);
+restart:
 	spin_lock(&dlm->spinlock);
 	for (i=0; i<DLM_HASH_SIZE; i++) {
 		while (!list_empty(&dlm->resources[i])) {
@@ -316,18 +317,17 @@
 			/* this should unhash the lockres
 			 * and exit with dlm->spinlock */
 			mlog(0, "purging res=%p\n", res);
-			if (res->state & DLM_LOCK_RES_DIRTY ||
-			    !list_empty(&res->dirty)) {
+			if (dlm_lockres_is_dirty(dlm, res)) {
 				/* HACK!  this should absolutely go.
 				 * need to figure out why some empty
 				 * lockreses are still marked dirty */
 				mlog(ML_ERROR, "lockres %.*s dirty!\n",
 				     res->lockname.len, res->lockname.name);
-				spin_lock(&res->spinlock);
-				dlm_shuffle_lists(dlm, res);
-				list_del_init(&res->dirty);
-				res->state &= ~DLM_LOCK_RES_DIRTY;
-				spin_unlock(&res->spinlock);
+
+				spin_unlock(&dlm->spinlock);
+				dlm_kick_thread(dlm, res);
+				wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
+				goto restart;
 			}
 			dlm_purge_lockres(dlm, res);
 		}
@@ -390,6 +390,8 @@
 	struct dlm_ctxt *dlm = data;
 	unsigned int node;
 	struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
+	struct dlm_master_list_entry *mle;
+	struct list_head *iter;
 
 	mlog_entry("%p %u %p", msg, len, data);
 
@@ -405,6 +407,13 @@
 	spin_lock(&dlm->spinlock);
 	clear_bit(node, dlm->domain_map);
 	__dlm_print_nodes(dlm);
+
+	/* notify any mles attached to the heartbeat events */
+	list_for_each(iter, &dlm->mle_hb_events) {
+		mle = list_entry(iter, struct dlm_master_list_entry, hb_events);
+		dlm_mle_node_down(dlm, mle, NULL, node);
+	}
+
 	spin_unlock(&dlm->spinlock);
 
 	dlm_put(dlm);
@@ -613,6 +622,8 @@
 {
 	struct dlm_assert_joined *assert;
 	struct dlm_ctxt *dlm = NULL;
+	struct dlm_master_list_entry *mle;
+	struct list_head *iter;
 
 	assert = (struct dlm_assert_joined *) msg->buf;
 	dlm_assert_joined_to_host(assert);
@@ -635,6 +646,12 @@
 
 		__dlm_print_nodes(dlm);
 
+		/* notify any mles attached to the heartbeat events */
+		list_for_each(iter, &dlm->mle_hb_events) {
+			mle = list_entry(iter, struct dlm_master_list_entry, hb_events);
+			dlm_mle_node_up(dlm, mle, NULL, assert->node_idx);
+		}
+
 		spin_unlock(&dlm->spinlock);
 	}
 	spin_unlock(&dlm_domain_lock);
@@ -1229,6 +1246,7 @@
 	init_waitqueue_head(&dlm->dlm_reco_thread_wq);
 	init_waitqueue_head(&dlm->reco.event);
 	init_waitqueue_head(&dlm->ast_wq);
+	init_waitqueue_head(&dlm->migration_wq);
 	INIT_LIST_HEAD(&dlm->master_list);
 	INIT_LIST_HEAD(&dlm->mle_hb_events);
 

Modified: branches/ocfs2-1.0/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- branches/ocfs2-1.0/fs/ocfs2/dlm/dlmlock.c	2005-08-18 21:47:09 UTC (rev 2525)
+++ branches/ocfs2-1.0/fs/ocfs2/dlm/dlmlock.c	2005-08-18 21:55:10 UTC (rev 2526)
@@ -160,7 +160,7 @@
 	if (call_ast)
 		dlm_queue_ast(dlm, lock);
 	else
-		dlm_lockres_release_ast(res);
+		dlm_lockres_release_ast(dlm, res);
 
 	dlm_lockres_calc_usage(dlm, res);
 	if (kick_thread)

Modified: branches/ocfs2-1.0/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- branches/ocfs2-1.0/fs/ocfs2/dlm/dlmmaster.c	2005-08-18 21:47:09 UTC (rev 2525)
+++ branches/ocfs2-1.0/fs/ocfs2/dlm/dlmmaster.c	2005-08-18 21:55:10 UTC (rev 2526)
@@ -52,46 +52,51 @@
 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
 #include "cluster/masklog.h"
 
-static void dlm_dump_mles(struct dlm_ctxt *dlm);
 
-static void dlm_dump_mles(struct dlm_ctxt *dlm)
+void dlm_print_one_mle(struct dlm_master_list_entry *mle)
 {
-	struct dlm_master_list_entry *mle;
-	struct list_head *iter;
 	int i = 0, refs;
 	char *type;
 	char attached;
 	u8 master;
 	unsigned int namelen;
 	const char *name;
+	struct kref *k;
 
+	k = &mle->mle_refs;
+	type = (mle->type == DLM_MLE_BLOCK ? "BLK" : "MAS");
+	refs = atomic_read(&k->refcount);
+	master = mle->master;
+	attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
+
+	if (mle->type == DLM_MLE_BLOCK) {
+		namelen = mle->u.name.len;
+		name = mle->u.name.name;
+	} else {
+		namelen = mle->u.res->lockname.len;
+		name = mle->u.res->lockname.name;
+	}
+
+	mlog(ML_NOTICE, "  #%3d: %3s  %3d  %3u   %3u %c    (%d)%.*s\n",
+		  i, type, refs, master, mle->new_master, attached,
+		  namelen, namelen, name);
+}
+
+			      
+static void dlm_dump_mles(struct dlm_ctxt *dlm);
+
+static void dlm_dump_mles(struct dlm_ctxt *dlm)
+{
+	struct dlm_master_list_entry *mle;
+	struct list_head *iter;
+	
 	mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
-	mlog(ML_NOTICE, "  ####: type refs owner events? lockname\n");
+	mlog(ML_NOTICE, "  ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
 	spin_lock(&dlm->master_lock);
-
 	list_for_each(iter, &dlm->master_list) {
-		struct kref *k;
 		mle = list_entry(iter, struct dlm_master_list_entry, list);
-
-		k = &mle->mle_refs;
-		type = (mle->type == DLM_MLE_BLOCK ? "BLK" : "MAS");
-		refs = atomic_read(&k->refcount);
-		master = mle->master;
-		attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
-
-		if (mle->type == DLM_MLE_BLOCK) {
-			namelen = mle->u.name.len;
-			name = mle->u.name.name;
-		} else {
-			namelen = mle->u.res->lockname.len;
-			name = mle->u.res->lockname.name;
-		}
-
-		mlog(ML_NOTICE, "  #%3d: %3s  %3d  %3u   %c    (%d)%.*s\n",
-			  i, type, refs, master, attached,
-			  namelen, namelen, name);
+		dlm_print_one_mle(mle);
 	}
-
 	spin_unlock(&dlm->master_lock);
 }
 
@@ -153,8 +158,9 @@
 				    struct dlm_lock_resource *res);
 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
 				      struct dlm_lock_resource *res);
-static void dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
-				       struct dlm_lock_resource *res);
+static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
+				       struct dlm_lock_resource *res,
+				       u8 target);
 
 
 int dlm_is_host_down(int errno)
@@ -837,6 +843,9 @@
 	ret = 0;   /* done */
 	if (assert) {
 		m = dlm->node_num;
+		mlog(0, "about to master %.*s here, this=%u\n",
+		     res->lockname.len, res->lockname.name, m);
+		dlm_print_one_mle(mle);
 		ret = dlm_do_assert_master(dlm, res->lockname.name,
 					   res->lockname.len, mle->vote_map, 0);
 		if (ret) {
@@ -1196,6 +1205,8 @@
 			 * caused all nodes up to this one to
 			 * create mles.  this node now needs to
 			 * go back and clean those up. */
+			mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
+			     dlm->node_num, res->lockname.len, res->lockname.name);
 			ret = dlm_dispatch_assert_master(dlm, res, 1,
 							 request->node_idx,
 							 flags);
@@ -1519,10 +1530,14 @@
 		spin_unlock(&mle->spinlock);
 
 		if (mle->type == DLM_MLE_MIGRATION && res) {
-			mlog(0, "finishing off migration of lockres\n");
+			mlog(0, "finishing off migration of lockres %.*s, "
+			     "from %u to %u\n",
+			       res->lockname.len, res->lockname.name,
+			       dlm->node_num, mle->new_master);
 			spin_lock(&res->spinlock);
 			res->state &= ~DLM_LOCK_RES_MIGRATING;
 			dlm_change_lockres_owner(dlm, res, mle->new_master);
+			BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
 			spin_unlock(&res->spinlock);
 		}
 		/* master is known, detach if not already detached */
@@ -1616,6 +1631,8 @@
 
 	/* this call now finishes out the nodemap
 	 * even if one or more nodes die */
+	mlog(0, "worker about to master %.*s here, this=%u\n",
+		     res->lockname.len, res->lockname.name, dlm->node_num);
 	ret = dlm_do_assert_master(dlm, res->lockname.name,
 				   res->lockname.len,
 				   nodemap, flags);
@@ -1767,7 +1784,15 @@
 	 * set the MIGRATING flag and flush asts
 	 * if we fail after this we need to re-dirty the lockres
 	 */
-	dlm_mark_lockres_migrating(dlm, res);
+	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
+		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
+		     "the target went down.\n", res->lockname.len,
+		     res->lockname.name, target);
+		spin_lock(&res->spinlock);
+		res->state &= ~DLM_LOCK_RES_MIGRATING;
+		spin_unlock(&res->spinlock);
+		ret = -EINVAL;
+	}
 
 fail:
 	if (oldmle) {
@@ -1917,10 +1942,43 @@
 	return ret;
 }
 
+static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
+				     struct dlm_lock_resource *res,
+				     u8 mig_target)
+{
+	int can_proceed;
+	spin_lock(&res->spinlock);
+	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
+	spin_unlock(&res->spinlock);
 
-static void dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
-				       struct dlm_lock_resource *res)
+	/* target has died, so make the caller break out of the 
+	 * wait_event, but caller must recheck the domain_map */
+	spin_lock(&dlm->spinlock);
+	if (!test_bit(mig_target, dlm->domain_map))
+		can_proceed = 1;
+	spin_unlock(&dlm->spinlock);
+	return can_proceed;
+}
+
+int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 {
+	int ret;
+	spin_lock(&res->spinlock);
+	ret = !!(res->state & DLM_LOCK_RES_DIRTY);
+	spin_unlock(&res->spinlock);
+	return ret;
+}
+
+
+static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
+				       struct dlm_lock_resource *res,
+				       u8 target)
+{
+	int ret = 0;
+
+	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
+	       res->lockname.len, res->lockname.name, dlm->node_num,
+	       target);
 	/* need to set MIGRATING flag on lockres.  this is done by
 	 * ensuring that all asts have been flushed for this lockres. */
 	spin_lock(&res->spinlock);
@@ -1932,17 +1990,42 @@
 	spin_unlock(&res->spinlock);
 
 	/* now flush all the pending asts.. hang out for a bit */
-	dlm_flush_lockres_asts(dlm, res);
-	wait_event(dlm->ast_wq, dlm_lockres_asts_flushed(dlm, res));
-	dlm_lockres_release_ast(res);
+	dlm_kick_thread(dlm, res);
+	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
+	dlm_lockres_release_ast(dlm, res);
 
+	mlog(0, "about to wait on migration_wq, dirty=%s\n",
+	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
 	/* if the extra ref we just put was the final one, this
 	 * will pass thru immediately.  otherwise, we need to wait
 	 * for the last ast to finish. */
-	spin_lock(&res->spinlock);
-	__dlm_wait_on_lockres_flags_set(res, DLM_LOCK_RES_MIGRATING);
-	spin_unlock(&res->spinlock);
+again:
+	ret = wait_event_interruptible_timeout(dlm->migration_wq,
+		   dlm_migration_can_proceed(dlm, res, target),
+		   msecs_to_jiffies(1000));
+	if (ret < 0) {
+		mlog(0, "woken again: migrating? %s, dead? %s\n",
+		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
+		       test_bit(target, dlm->domain_map) ? "no":"yes");
+	} else {
+		mlog(0, "all is well: migrating? %s, dead? %s\n",
+		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
+		       test_bit(target, dlm->domain_map) ? "no":"yes");
+	}
+	if (!dlm_migration_can_proceed(dlm, res, target)) {
+		mlog(0, "trying again...\n");
+		goto again;
+	}
 
+	/* did the target go down or die? */
+	spin_lock(&dlm->spinlock);
+	if (!test_bit(target, dlm->domain_map)) {
+		mlog(ML_ERROR, "aha. migration target %u just went down\n",
+		     target);
+		ret = -EHOSTDOWN;
+	}
+	spin_unlock(&dlm->spinlock);
+
 	/*
 	 * at this point:
 	 *
@@ -1951,6 +2034,7 @@
 	 *   o all processes trying to reserve an ast on this
 	 *     lockres must wait for the MIGRATING flag to clear
 	 */
+	return ret;
 }
 
 /* last step in the migration process.
@@ -2372,7 +2456,8 @@
 		goto leave;
 	}
 
-	mlog(0, "doing assert master to all except the original node\n");
+	mlog(0, "doing assert master of %.*s to all except the original node\n",
+	     res->lockname.len, res->lockname.name);
 	/* this call now finishes out the nodemap
 	 * even if one or more nodes die */
 	ret = dlm_do_assert_master(dlm, res->lockname.name,
@@ -2386,7 +2471,8 @@
 
 	memset(iter.node_map, 0, sizeof(iter.node_map));
 	set_bit(old_master, iter.node_map);
-	mlog(0, "doing assert master back to %u\n", old_master);
+	mlog(0, "doing assert master of %.*s back to %u\n",
+	     res->lockname.len, res->lockname.name, old_master);
 	ret = dlm_do_assert_master(dlm, res->lockname.name,
 				   res->lockname.len, iter.node_map,
 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
@@ -2422,6 +2508,9 @@
 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
 {
 	assert_spin_locked(&res->spinlock);
+	if (res->state & DLM_LOCK_RES_MIGRATING) {
+		__dlm_print_one_lock_resource(res);
+	}
 	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
 
 	atomic_inc(&res->asts_reserved);
@@ -2440,7 +2529,8 @@
  * or that a bast should be fired, because the new master will
  * shuffle the lists on this lockres as soon as it is migrated.
  */
-void dlm_lockres_release_ast(struct dlm_lock_resource *res)
+void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
+			     struct dlm_lock_resource *res)
 {
 	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
 		return;
@@ -2455,4 +2545,5 @@
 	res->state |= DLM_LOCK_RES_MIGRATING;
 	spin_unlock(&res->spinlock);
 	wake_up(&res->wq);
+	wake_up(&dlm->migration_wq);
 }

Modified: branches/ocfs2-1.0/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- branches/ocfs2-1.0/fs/ocfs2/dlm/dlmrecovery.c	2005-08-18 21:47:09 UTC (rev 2525)
+++ branches/ocfs2-1.0/fs/ocfs2/dlm/dlmrecovery.c	2005-08-18 21:55:10 UTC (rev 2526)
@@ -1828,6 +1828,9 @@
 
 	mlog(0, "node %u being removed from domain map!\n", idx);
 	clear_bit(idx, dlm->domain_map);
+	/* wake up migration waiters if a node goes down.
+	 * perhaps later we can genericize this for other waiters. */
+	wake_up(&dlm->migration_wq);
 
 	if (test_bit(idx, dlm->recovery_map))
 		mlog(0, "domain %s, node %u already added "

Modified: branches/ocfs2-1.0/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- branches/ocfs2-1.0/fs/ocfs2/dlm/dlmthread.c	2005-08-18 21:47:09 UTC (rev 2525)
+++ branches/ocfs2-1.0/fs/ocfs2/dlm/dlmthread.c	2005-08-18 21:55:10 UTC (rev 2526)
@@ -80,26 +80,7 @@
 	current->state = TASK_RUNNING;
 }
 
-/* opposite of the above, waits until flags are SET */
-void __dlm_wait_on_lockres_flags_set(struct dlm_lock_resource *res, int flags)
-{
-	DECLARE_WAITQUEUE(wait, current);
 
-	assert_spin_locked(&res->spinlock);
-
-	add_wait_queue(&res->wq, &wait);
-repeat:
-	set_current_state(TASK_UNINTERRUPTIBLE);
-	if ((res->state & flags) != flags) {
-		spin_unlock(&res->spinlock);
-		schedule();
-		spin_lock(&res->spinlock);
-		goto repeat;
-	}
-	remove_wait_queue(&res->wq, &wait);
-	current->state = TASK_RUNNING;
-}
-
 static int __dlm_lockres_unused(struct dlm_lock_resource *res)
 {
 	if (list_empty(&res->granted) &&
@@ -542,7 +523,7 @@
 		/* drop the extra ref.
 		 * this may drop it completely. */
 		dlm_lock_put(lock);
-		dlm_lockres_release_ast(res);
+		dlm_lockres_release_ast(dlm, res);
 	}
 
 	while (!list_empty(&dlm->pending_basts)) {
@@ -590,10 +571,10 @@
 		/* drop the extra ref.
 		 * this may drop it completely. */
 		dlm_lock_put(lock);
-		dlm_lockres_release_ast(res);
+		dlm_lockres_release_ast(dlm, res);
 	}
-	spin_unlock(&dlm->ast_lock);
 	wake_up(&dlm->ast_wq);
+	spin_unlock(&dlm->ast_lock);
 }
 
 
@@ -646,6 +627,14 @@
 			 * dirty_list in this gap, but that is ok */
 
 			spin_lock(&res->spinlock);
+			if (res->owner != dlm->node_num) {
+				__dlm_print_one_lock_resource(res);
+				mlog(ML_ERROR, "inprog:%s, mig:%s, reco:%s, dirty:%s\n",
+				     res->state & DLM_LOCK_RES_IN_PROGRESS ? "yes" : "no",
+				     res->state & DLM_LOCK_RES_MIGRATING ? "yes" : "no",
+				     res->state & DLM_LOCK_RES_RECOVERING ? "yes" : "no",
+				     res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
+			}
 			BUG_ON(res->owner != dlm->node_num);
 
 			/* it is now ok to move lockreses in these states



More information about the Ocfs2-commits mailing list