[Ocfs2-commits] khackel commits r2384 - trunk/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Fri Jun 10 01:25:09 CDT 2005


Author: khackel
Signed-off-by: mfasheh
Date: 2005-06-10 01:25:08 -0500 (Fri, 10 Jun 2005)
New Revision: 2384

Modified:
   trunk/fs/ocfs2/dlm/dlmcommon.h
   trunk/fs/ocfs2/dlm/dlmmaster.c
   trunk/fs/ocfs2/dlm/dlmrecovery.c
   trunk/fs/ocfs2/dlm/dlmthread.c
Log:
* fixes bug 379: fully implements lock mastery recovery after another
  node dies while the local node is attempting to master a lock
* adds a nolock version of dlm_kick_thread to be called at the end
  of recovery while still holding dlm and lockres locks
* de-complicate some of the error paths in lock mastery to move forward
  in some safe cases that did not require special handling
* remove the error field in the master list entry
* added a slightly more fine grained checking of node up/down events
  during lock mastery, to avoid many costly error paths

Signed-off-by: mfasheh



Modified: trunk/fs/ocfs2/dlm/dlmcommon.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmcommon.h	2005-06-10 02:28:10 UTC (rev 2383)
+++ trunk/fs/ocfs2/dlm/dlmcommon.h	2005-06-10 06:25:08 UTC (rev 2384)
@@ -312,7 +312,6 @@
 	unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	u8 master;
 	u8 new_master;
-	u8 error;
 	enum dlm_mle_type type;    // BLOCK or MASTER
 	struct o2hb_callback_func mle_hb_up;
 	struct o2hb_callback_func mle_hb_down;
@@ -890,6 +889,8 @@
 
 u8 dlm_nm_this_node(dlm_ctxt *dlm);
 void dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res);
+void __dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res);
+	
 
 int dlm_nm_init(dlm_ctxt *dlm);
 int dlm_heartbeat_init(dlm_ctxt *dlm);

Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c	2005-06-10 02:28:10 UTC (rev 2383)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c	2005-06-10 06:25:08 UTC (rev 2384)
@@ -38,6 +38,7 @@
 #include <linux/socket.h>
 #include <linux/inet.h>
 #include <linux/spinlock.h>
+#include <linux/delay.h>
 
 
 #include "cluster/heartbeat.h"
@@ -59,13 +60,13 @@
 	struct list_head *iter;
 	int i = 0, refs;
 	char *type;
-	char err, attached;
+	char attached;
 	u8 master;
 	unsigned int namelen;
 	const char *name;
 
 	mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
-	mlog(ML_NOTICE, "  ####: type refs owner events? err?     lockname\n");
+	mlog(ML_NOTICE, "  ####: type refs owner events? lockname\n");
 	spin_lock(&dlm->master_lock);
 
 	list_for_each(iter, &dlm->master_list) {
@@ -74,7 +75,6 @@
 		
 		k = &mle->mle_refs;
 		type = (mle->type == DLM_MLE_BLOCK ? "BLK" : "MAS");
-		err = (mle->error ? 'Y' : 'N');
 		refs = atomic_read(&k->refcount);
 		master = mle->master;
 		attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
@@ -87,8 +87,8 @@
 			name = mle->u.res->lockname.name;
 		}
 
-		mlog(ML_NOTICE, "  #%3d: %3s  %3d  %3u   %c       %c    (%d)%.*s\n",
-			  i, type, refs, master, attached, err,
+		mlog(ML_NOTICE, "  #%3d: %3s  %3d  %3u   %c    (%d)%.*s\n",
+			  i, type, refs, master, attached, 
 			  namelen, namelen, name);
 	}
 
@@ -137,7 +137,7 @@
 				     dlm_master_list_entry *mle,
 				     int blocked);
 static int dlm_restart_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res,
-				    dlm_master_list_entry *mle);
+				    dlm_master_list_entry *mle, int blocked);
 static int dlm_add_migration_mle(dlm_ctxt *dlm, 
 				 dlm_lock_resource *res, 
 				 dlm_master_list_entry *mle, 
@@ -150,7 +150,28 @@
 static void dlm_mark_lockres_migrating(dlm_ctxt *dlm, dlm_lock_resource *res);
 
 
+static int dlm_is_host_down(int errno)
+{
+	switch (errno) {
+		case -EBADF:
+		case -ECONNREFUSED:
+		case -ENOTCONN:
+		case -ECONNRESET:
+		case -EPIPE:
+		case -EHOSTDOWN:
+		case -EHOSTUNREACH:
+		case -ETIMEDOUT:
+		case -ECONNABORTED:
+		case -ENETDOWN:
+		case -ENETUNREACH:
+		case -ENETRESET:
+		case -ESHUTDOWN:
+			return 1;
+	}
+	return 0;
+}
 
+
 /*
  * MASTER LIST FUNCTIONS
  */
@@ -249,7 +270,6 @@
 	memset(mle->response_map, 0, sizeof(mle->response_map));
 	mle->master = O2NM_MAX_NODES;
 	mle->new_master = O2NM_MAX_NODES;
-	mle->error = 0;
 
 	if (mle->type == DLM_MLE_MASTER) {
 		DLM_ASSERT(res);
@@ -661,6 +681,7 @@
 		goto wait;
 	}
 
+redo_request:
 	ret = -EINVAL;
 	dlm_node_iter_init(mle->vote_map, &iter);
 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
@@ -675,18 +696,26 @@
 
 wait:
 	/* keep going until the response map includes all nodes */
-	ret = -EAGAIN;
-	while (ret == -EAGAIN) {
-		ret = dlm_wait_for_lock_mastery(dlm, res, mle, blocked);
-		if (ret == -EINVAL) {
-			mlog(ML_ERROR, "some error occurred.  restarting "
-			     "lock mastery!\n");
-			/* TODO: figure out how restart this */
+	ret = dlm_wait_for_lock_mastery(dlm, res, mle, blocked);
+	if (ret < 0) {
+		if (blocked) {
+		       	if (mle->type == DLM_MLE_MASTER) {
+				mlog(0, "mle changed to a MASTER due "
+				     "to node death. restart.\n");
+				goto redo_request;
+			}
+			/* should never happen for a BLOCK */
+			mlog(ML_ERROR, "mle type=%d\n", mle->type);
 			BUG();
 		}
+		mlog(0, "node map changed, redo the "
+		     "master request now\n");
+		goto redo_request;
 	}
-	if (ret == 0)
-		mlog(0, "lockres mastered by %u\n", res->owner);
+	
+	mlog(0, "lockres mastered by %u\n", res->owner);
+	/* make sure we never continue without this */
+	DLM_ASSERT(res->owner != O2NM_MAX_NODES);
 
 	/* master is known, detach if not already detached */
 	dlm_mle_detach_hb_events(dlm, mle);
@@ -707,10 +736,14 @@
 				     int blocked)
 {
 	u8 m;
-	int ret = 0, tmpret, bit;
-	int map_changed = 0, voting_done = 0;
-	int assert = 0, sleep;
+	int ret, bit;
+	int map_changed, voting_done;
+	int assert, sleep;
 
+recheck:
+	ret = 0;
+	assert = 0;
+
 	/* check if another node has already become the owner */
 	spin_lock(&res->spinlock);
 	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
@@ -727,20 +760,16 @@
 			     sizeof(mle->vote_map)) == 0);
 
 	/* restart if we hit any errors */
-	if (mle->error || map_changed) {
-		if (mle->error) {
-			mlog(0, "another node got error %d, restarting\n",
-			     mle->error);
-			mle->error = 0;
+	if (map_changed) {
+		mlog(0, "node map changed, restarting\n");
+		ret = dlm_restart_lock_mastery(dlm, res, mle, blocked);
+		spin_unlock(&mle->spinlock);
+		if (ret < 0) {
+			mlog_errno(ret);
+			goto leave;
 		}
-		if (map_changed)
-			mlog(0, "node map changed, restarting\n");
-		spin_unlock(&mle->spinlock);
-		tmpret = dlm_restart_lock_mastery(dlm, res, mle);
-		if (tmpret < 0)
-			mlog_errno(tmpret);
-		ret = -EINVAL;
-		goto leave;
+		mlog(0, "restart lock mastery succeeded, rechecking now\n");
+		goto recheck;
 	}
 
 	if (m != O2NM_MAX_NODES) {
@@ -770,19 +799,13 @@
 	/* sleep if we haven't finished voting yet */
 	if (sleep) {
 		atomic_set(&mle->woken, 0);
-		ret = wait_event_interruptible_timeout(mle->wq, 
-					(atomic_read(&mle->woken) == 1), 
-					msecs_to_jiffies(5000));
-
-		if (ret >= 0 && !atomic_read(&mle->woken)) {
-			mlog(0, "timed out during lock mastery: "
-			     "vote_map=%0lx, response_map=%0lx\n",
-			     mle->vote_map[0], mle->response_map[0]);
+		wait_event(mle->wq, (atomic_read(&mle->woken) == 1));
+		if (res->owner == O2NM_MAX_NODES) {
+			mlog(0, "waiting again\n");
+			goto recheck;
 		}
-		/* unless we are aborting, need to recheck and 
-		 * maybe sleep again */
-		if (ret != -ERESTARTSYS)
-			ret = -EAGAIN;
+		mlog(0, "done waiting, master is %u\n", res->owner);
+		ret = 0;
 		goto leave;
 	}
 
@@ -792,14 +815,16 @@
 		ret = dlm_do_assert_master(dlm, res->lockname.name, 
 					   res->lockname.len, mle->vote_map, 0);
 		if (ret) {
+			/* This is a failure in the network path,
+			 * not in the response to the assert_master
+			 * (any nonzero response is a BUG on this node).
+			 * Most likely a socket just got disconnected
+			 * due to node death. */
 			mlog_errno(ret);
-
-			tmpret = dlm_restart_lock_mastery(dlm, res, mle);
-			if (tmpret < 0)
-				mlog_errno(tmpret);
-			ret = -EINVAL;
-			goto leave;
 		}
+		/* no longer need to restart lock mastery.  
+		 * all living nodes have been contacted. */
+		ret = 0;
 	}
 
 	/* set the lockres owner */
@@ -811,24 +836,186 @@
 	return ret;
 }
 
+struct dlm_bitmap_diff_iter
+{
+	int curnode;
+	unsigned long *orig_bm;
+	unsigned long *cur_bm;
+	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
+};
+
+enum dlm_node_state_change
+{
+	NODE_DOWN = -1,
+	NODE_NO_CHANGE = 0,
+	NODE_UP
+};
+
+static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
+				      unsigned long *orig_bm, 
+				      unsigned long *cur_bm)
+{
+	unsigned long p1, p2;
+	int i;
+
+	iter->curnode = -1;
+	iter->orig_bm = orig_bm;
+	iter->cur_bm = cur_bm;
+
+	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
+       		p1 = *(iter->orig_bm + i);
+	       	p2 = *(iter->cur_bm + i);
+		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
+	}
+}
+
+static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
+				     enum dlm_node_state_change *state)
+{
+	int bit;
+
+	if (iter->curnode >= O2NM_MAX_NODES)
+		return -ENOENT;
+
+	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES, 
+			    iter->curnode+1);
+	if (bit >= O2NM_MAX_NODES) {
+		iter->curnode = O2NM_MAX_NODES;
+		return -ENOENT;
+	}
+
+	/* if it was there in the original then this node died */
+	if (test_bit(bit, iter->orig_bm))
+		*state = NODE_DOWN;
+	else
+		*state = NODE_UP;
+
+	iter->curnode = bit;
+	return bit;
+}
+
+
 static int dlm_restart_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res,
-				    dlm_master_list_entry *mle)
+				    dlm_master_list_entry *mle, int blocked)
 {
-	mlog(0, "something happened such that the whole master process needs "
-	     "to be restarted!\n");
-	return 0;
+	struct dlm_bitmap_diff_iter bdi;
+	enum dlm_node_state_change sc;
+	int node;
+	int ret = 0;
+
+	mlog(0, "something happened such that the "
+	     "master process may need to be restarted!\n");
+
+	assert_spin_locked(&mle->spinlock);
+
+	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
+	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
+	while (node >= 0) {
+		if (sc == NODE_UP) {
+			/* a node came up.  easy.  might not even need 
+			 * to talk to it if its node number is higher
+			 * or if we are already blocked. */
+			mlog(0, "node up! %d\n", node);
+			if (blocked)
+				goto next;
+
+			if (node > dlm->node_num) {
+				mlog(0, "node > this node. skipping.\n");
+				goto next;
+			}
+
+			/* redo the master request, but only for the new node */
+			mlog(0, "sending request to new node\n");
+			clear_bit(node, mle->response_map);
+			set_bit(node, mle->vote_map);
+		} else {
+			mlog(ML_ERROR, "node down! %d\n", node);
+		
+			/* if the node wasn't involved in mastery skip it */
+			if (!test_bit(node, mle->maybe_map))
+				goto next;
+
+			/* if we're already blocked on lock mastery, and the
+			 * dead node wasn't the expected master, or there is
+			 * another node in the maybe_map, keep waiting */
+			if (blocked) {
+				int lowest = find_next_bit(mle->maybe_map, 
+						       O2NM_MAX_NODES, 0);
+		
+				/* act like it was never there */
+				clear_bit(node, mle->maybe_map);
+				clear_bit(node, mle->vote_map);
+				clear_bit(node, mle->response_map);
+
+			       	if (node != lowest)
+					goto next;
+
+				mlog(ML_ERROR, "expected master %u died while "
+				     "this node was blocked waiting on it!\n", 
+				     node);
+				lowest = find_next_bit(mle->maybe_map, 
+						       O2NM_MAX_NODES, 
+						       lowest+1);
+				if (lowest < O2NM_MAX_NODES) {
+					mlog(0, "still blocked. waiting "
+					     "on %u now\n", lowest);
+					goto next;
+				} 
+
+				/* mle is an MLE_BLOCK, but there is now 
+				 * nothing left to block on.  we need to return
+				 * all the way back out and try again with
+				 * an MLE_MASTER. dlm_do_local_recovery_cleanup
+				 * has already run, so the mle refcount is ok */
+				mlog(0, "no longer blocking. we can "
+				     "try to master this here\n");
+				mle->type = DLM_MLE_MASTER;
+				memset(mle->maybe_map, 0, 
+				       sizeof(mle->maybe_map));
+				memset(mle->response_map, 0, 
+				       sizeof(mle->maybe_map));
+				memcpy(mle->vote_map, mle->node_map,
+				       sizeof(mle->node_map));
+				mle->u.res = res;
+
+				ret = -EAGAIN;
+				goto next;
+			}
+
+			if (node > dlm->node_num)
+				goto next;
+
+			mlog(0, "dead node in map!\n");
+			/* yuck. go back and re-contact all nodes 
+			 * in the vote_map, removing this node. */
+			clear_bit(node, mle->maybe_map);
+			clear_bit(node, mle->vote_map);
+			memset(mle->response_map, 0, 
+			       sizeof(mle->response_map));
+		}
+		ret = -EAGAIN;
+next:
+		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
+	}
+	return ret;
 }
 
 
 /*
  * DLM_MASTER_REQUEST_MSG
+ * 
+ * returns: 0 on success, 
+ *          -errno on a network error
+ *
+ * on error, the caller should assume the target node is "dead"
+ * 
  */
 
 static int dlm_do_master_request(dlm_master_list_entry *mle, int to)
 {
 	dlm_ctxt *dlm = mle->dlm;
 	dlm_master_request request;
-	int ret, response=0;
+	int ret, response=0, resend;
 
 	memset(&request, 0, sizeof(request));
 	request.node_idx = dlm->node_num;
@@ -845,43 +1032,68 @@
 	}
 
 	dlm_master_request_to_net(&request);
+again:
 	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
 				 sizeof(request), to, &response);
 	if (ret < 0)  {
-		mlog_errno(ret);
+		if (ret == -ESRCH) {
+			/* should never happen */
+			mlog(ML_ERROR, "TCP stack not ready!\n");
+			BUG();
+		} else if (ret == -EINVAL) {
+			mlog(ML_ERROR, "bad args passed to o2net!\n");
+			BUG();
+		} else if (ret == -ENOMEM) {
+			mlog(ML_ERROR, "out of memory while trying to send "
+			     "network message!  retrying\n");
+			/* this is totally crude */
+			msleep(50);
+			goto again;
+		} else if (!dlm_is_host_down(ret)) {
+			/* not a network error. bad. */
+			mlog_errno(ret);
+			mlog(ML_ERROR, "unhandled error!");
+			BUG();
+		}
+		/* all other errors should be network errors,
+		 * and likely indicate node death */
+		mlog(ML_ERROR, "link to %d went down!\n", to);
 		goto out;
 	}
 
+	ret = 0;
+	resend = 0;
 	spin_lock(&mle->spinlock);
 	switch (response) {
 		case DLM_MASTER_RESP_YES:
 			set_bit(to, mle->response_map);
-			// mlog(0, "woot!  node %u is the "
-			// "master!\n", to);
+			mlog(0, "node %u is the master, response=YES\n", to);
 			mle->master = to;
 			break;
 		case DLM_MASTER_RESP_NO:
-			// mlog(0, "node %u is not the "
-			// "master, not in-progress\n", to);
+			mlog(0, "node %u not master, response=NO\n", to);
 			set_bit(to, mle->response_map);
 			break;
 		case DLM_MASTER_RESP_MAYBE:
-			// mlog(0, "node %u is not the "
-			// "master, but IS in-progress\n", to);
+			mlog(0, "node %u not master, response=MAYBE\n", to);
 			set_bit(to, mle->response_map);
 			set_bit(to, mle->maybe_map);
 			break;
 		case DLM_MASTER_RESP_ERROR:
-			mlog(0, "node %u hit an -ENOMEM! try everything "
-			     "again\n", to);
-			mle->error = 1;
+			mlog(0, "node %u hit an error, resending\n", to);
+			resend = 1;
+			response = 0;
 			break;
 		default:
-			mlog(0, "bad response! %u\n", response);
-			ret = -EINVAL;
-			break;
+			mlog(ML_ERROR, "bad response! %u\n", response);
+			BUG();
 	}
 	spin_unlock(&mle->spinlock);
+	if (resend) {
+		/* this is also totally crude */
+		msleep(50);
+		goto again;
+	}
 
 out:
 	return ret;
@@ -1112,9 +1324,6 @@
 	int ret = 0;
 
 	DLM_ASSERT(namelen <= O2NM_MAX_NAME_LEN);
-	DLM_ASSERT(dlm);
-	DLM_ASSERT(lockname);
-	DLM_ASSERT(nodemap);
 
 	/* note that if this nodemap is empty, it returns 0 */
 	dlm_node_iter_init(nodemap, &iter);
@@ -1132,10 +1341,15 @@
 		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key, 
 					    &assert, sizeof(assert), to, &r);
 		if (tmpret < 0) {
-			// TODO
-			// mlog(0, "assert_master returned %d!\n", tmpret);
-			ret = tmpret;
-			break;
+			mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);
+			if (!dlm_is_host_down(tmpret)) {
+				mlog(ML_ERROR, "unhandled error!\n");
+				BUG();
+			}
+			/* a node died.  finish out the rest of the nodes. */
+			mlog(ML_ERROR, "link to %d went down!\n", to);
+			/* any nonzero status return will do */
+			ret = tmpret; 
 		} else if (r < 0) {
 			/* ok, something horribly messed.  kill thyself. */
 			mlog(ML_ERROR,"during assert master of %.*s to %u, "
@@ -1351,36 +1565,35 @@
 	request_from = item->u.am.request_from;
 	flags = item->u.am.flags;
 
-	do {
-		spin_lock(&dlm->spinlock);
-		memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
-		spin_unlock(&dlm->spinlock);
+	spin_lock(&dlm->spinlock);
+	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
+	spin_unlock(&dlm->spinlock);
 
-		clear_bit(dlm->node_num, nodemap);
-		if (ignore_higher) {
-			/* if is this just to clear up mles for nodes below 
-			 * this node, do not send the message to the original
-			 * caller or any node number higher than this */
-			clear_bit(request_from, nodemap);
-			bit = dlm->node_num;
-			while (1) {
-				bit = find_next_bit(nodemap, O2NM_MAX_NODES, 
-						    bit+1);
-			       	if (bit >= O2NM_MAX_NODES)
-					break;
-				clear_bit(bit, nodemap);
-			}
+	clear_bit(dlm->node_num, nodemap);
+	if (ignore_higher) {
+		/* if is this just to clear up mles for nodes below 
+		 * this node, do not send the message to the original
+		 * caller or any node number higher than this */
+		clear_bit(request_from, nodemap);
+		bit = dlm->node_num;
+		while (1) {
+			bit = find_next_bit(nodemap, O2NM_MAX_NODES, 
+					    bit+1);
+		       	if (bit >= O2NM_MAX_NODES)
+				break;
+			clear_bit(bit, nodemap);
 		}
+	}
 
-		ret = dlm_do_assert_master(dlm, res->lockname.name,
-					   res->lockname.len, 
-					   nodemap, flags);
-		if (ret < 0) {
-			/* no choice but to try again.
-			 * maybe a node died. */ 
-			mlog_errno(ret);
-		}
-	} while (ret < 0);
+	/* this call now finishes out the nodemap
+	 * even if one or more nodes die */
+	ret = dlm_do_assert_master(dlm, res->lockname.name,
+				   res->lockname.len, 
+				   nodemap, flags);
+	if (ret < 0) {
+		/* no need to restart, we are done */
+		mlog_errno(ret);
+	}
 
 	dlm_lockres_put(res);
 
@@ -1944,10 +2157,10 @@
 void dlm_clean_master_list(dlm_ctxt *dlm, u8 dead_node)
 {
 	struct list_head *iter, *iter2;
-	int bit;
 	dlm_master_list_entry *mle;
 	dlm_lock_resource *res;
 
+	mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
 top:
 	assert_spin_locked(&dlm->spinlock);
 
@@ -1966,15 +2179,38 @@
 		if (mle->type == DLM_MLE_MASTER)
 			continue;
 
-		bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
 
 		/* BLOCK mles are initiated by other nodes.
 		 * need to clean up if the dead node would have
 		 * been the master. */
-		if (mle->type == DLM_MLE_BLOCK &&
-		    bit != dead_node)
+		if (mle->type == DLM_MLE_BLOCK) {
+			int bit;
+
+			spin_lock(&mle->spinlock);
+			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
+			if (bit != dead_node) {
+				mlog(0, "mle found, but dead node %u would "
+				     "not have been master\n", dead_node);
+				spin_unlock(&mle->spinlock);
+			} else {
+				/* must drop the refcount by one since the
+				 * assert_master will never arrive.  this 
+				 * may result in the mle being unlinked and
+				 * freed, but there may still be a process
+				 * waiting in the dlmlock path which is fine. */
+				mlog(ML_ERROR, "node %u was expected master\n",
+				     dead_node);
+				atomic_set(&mle->woken, 1);
+				spin_unlock(&mle->spinlock);
+				wake_up(&mle->wq);
+				/* final put will take care of list removal */
+				__dlm_put_mle(mle);
+			}
 			continue;
+		}
 
+		/* everything else is a MIGRATION mle */
+
 		/* the rule for MIGRATION mles is that the master
 		 * becomes UNKNOWN if *either* the original or
 		 * the new master dies.  all UNKNOWN lockreses
@@ -1984,51 +2220,51 @@
 		 * this lockres, or if he needs to take over 
 		 * mastery.  either way, this node should expect
 		 * another message to resolve this. */
-		if (mle->type == DLM_MLE_MIGRATION &&
-		    mle->master != dead_node &&
+		if (mle->master != dead_node &&
 		    mle->new_master != dead_node)
 			continue;
 
 		/* if we have reached this point, this mle needs to
 		 * be removed from the list and freed. */
 
-		/* unlinking list_head while in list_for_each_safe */
+		/* remove from the list early.  NOTE: unlinking 
+		 * list_head while in list_for_each_safe */
+		spin_lock(&mle->spinlock);
 		list_del_init(&mle->list);
 		atomic_set(&mle->woken, 1);
+		spin_unlock(&mle->spinlock);
 		wake_up(&mle->wq);
 				
-		if (mle->type == DLM_MLE_MIGRATION) {
-			mlog(0, "node %u died during migration from "
-			     "%u to %u!\n", dead_node, 
-			     mle->master, mle->new_master);
-			/* if there is a lockres associated with this
-		 	 * mle, find it and set its owner to UNKNOWN */
-			res = __dlm_lookup_lockres(dlm, mle->u.name.name, 
-						mle->u.name.len);
-			if (res) {
-				/* unfortunately if we hit this rare case, our 
-			 	 * lock ordering is messed.  we need to drop
-			 	 * the master lock so that we can take the
-			  	 * lockres lock, meaning that we will have to
-				 * restart from the head of list. */
-				spin_unlock(&dlm->master_lock);
-			
-				/* move lockres onto recovery list */
-				spin_lock(&res->spinlock);
-				dlm_set_lockres_owner(dlm, res, 
-					      	DLM_LOCK_RES_OWNER_UNKNOWN);
-				dlm_move_lockres_to_recovery_list(dlm, res);
-				spin_unlock(&res->spinlock);
-				dlm_lockres_put(res);
+		mlog(0, "node %u died during migration from "
+		     "%u to %u!\n", dead_node, 
+		     mle->master, mle->new_master);
+		/* if there is a lockres associated with this
+	 	 * mle, find it and set its owner to UNKNOWN */
+		res = __dlm_lookup_lockres(dlm, mle->u.name.name, 
+					mle->u.name.len);
+		if (res) {
+			/* unfortunately if we hit this rare case, our 
+		 	 * lock ordering is messed.  we need to drop
+		 	 * the master lock so that we can take the
+		  	 * lockres lock, meaning that we will have to
+			 * restart from the head of list. */
+			spin_unlock(&dlm->master_lock);
+		
+			/* move lockres onto recovery list */
+			spin_lock(&res->spinlock);
+			dlm_set_lockres_owner(dlm, res, 
+				      	DLM_LOCK_RES_OWNER_UNKNOWN);
+			dlm_move_lockres_to_recovery_list(dlm, res);
+			spin_unlock(&res->spinlock);
+			dlm_lockres_put(res);
 
-				/* dump the mle */
-				spin_lock(&dlm->master_lock);
-				__dlm_put_mle(mle);
-				spin_unlock(&dlm->master_lock);
+			/* dump the mle */
+			spin_lock(&dlm->master_lock);
+			__dlm_put_mle(mle);
+			spin_unlock(&dlm->master_lock);
 
-				/* restart */
-				goto top;
-			}
+			/* restart */
+			goto top;
 		}
 		
 		/* this may be the last reference */
@@ -2057,21 +2293,16 @@
 		goto leave;
 	}
 
-retry:
 	mlog(0, "doing assert master to all except the original node\n");
+	/* this call now finishes out the nodemap
+	 * even if one or more nodes die */
 	ret = dlm_do_assert_master(dlm, res->lockname.name, 
 				   res->lockname.len, iter.node_map,
 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
 	if (ret < 0) {
+		/* no longer need to retry.  all living nodes contacted. */
 		mlog_errno(ret);
-
-		/* maybe we can be saved by updating the domain map */
-		spin_lock(&dlm->spinlock);
-		dlm_node_iter_init(dlm->domain_map, &iter);
-		clear_bit(old_master, iter.node_map);
-		clear_bit(dlm->node_num, iter.node_map);
-		spin_unlock(&dlm->spinlock);
-		goto retry;
+		ret = 0;
 	}
 
 	memset(iter.node_map, 0, sizeof(iter.node_map));
@@ -2085,6 +2316,7 @@
 		     "with %d.\n", ret);
 		/* the only nonzero status here would be because of
 		 * a dead original node.  we're done. */
+		ret = 0;
 	}
 
 	/* all done, set the owner, clear the flag */

Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-06-10 02:28:10 UTC (rev 2383)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-06-10 06:25:08 UTC (rev 2384)
@@ -1456,6 +1456,7 @@
 			spin_lock(&res->spinlock);
 			res->owner = new_master;
 			res->state &= ~DLM_LOCK_RES_RECOVERING;
+			__dlm_kick_thread(dlm, res);
 			spin_unlock(&res->spinlock);
 			wake_up(&res->wq);
 		}

Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c	2005-06-10 02:28:10 UTC (rev 2383)
+++ trunk/fs/ocfs2/dlm/dlmthread.c	2005-06-10 06:25:08 UTC (rev 2384)
@@ -414,28 +414,39 @@
 	return;
 }
 
-/* must have NO locks when calling this */
+/* must have NO locks when calling this with res !=NULL * */
 void dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res)
 {
 	mlog_entry("dlm=%p, res=%p\n", dlm, res);
 	if (res) {
 		spin_lock(&dlm->spinlock);
 		spin_lock(&res->spinlock);
+		__dlm_kick_thread(dlm, res);
+		spin_unlock(&res->spinlock);
+		spin_unlock(&dlm->spinlock);
+	} else
+		wake_up(&dlm->dlm_thread_wq);
+}
 
+void __dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res)
+{
+	mlog_entry("dlm=%p, res=%p\n", dlm, res);
+	if (res) {
+		assert_spin_locked(&dlm->spinlock);
+		assert_spin_locked(&res->spinlock);
+
 		/* don't shuffle secondary queues */
 		if ((res->owner == dlm->node_num) &&
 		    !(res->state & DLM_LOCK_RES_DIRTY)) {
 			list_add_tail(&res->dirty, &dlm->dirty_list);
 			res->state |= DLM_LOCK_RES_DIRTY;
 		}
-
-		spin_unlock(&res->spinlock);
-		spin_unlock(&dlm->spinlock);
 	}
 
 	wake_up(&dlm->dlm_thread_wq);
 }
 
+
 /* Launch the NM thread for the mounted volume */
 int dlm_launch_thread(dlm_ctxt *dlm)
 {



More information about the Ocfs2-commits mailing list