[Ocfs2-commits] khackel commits r2782 - branches/ocfs2-1.2/fs/ocfs2/dlm

svn-commits@oss.oracle.com svn-commits at oss.oracle.com
Wed Mar 1 20:09:30 CST 2006


Author: khackel
Signed-off-by: mfasheh
Date: 2006-03-01 20:09:29 -0600 (Wed, 01 Mar 2006)
New Revision: 2782

Modified:
   branches/ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c
Log:
* fixes hangs in lock mastery related to refcounting on the mle
  structure (failed to get cleaned up by the eventual master)
* whenever we call dlm_do_master_request, make sure we always end up
  contacting the master.  this is the only way the master can know
  that this node was involved in the mastery, and then clear any
  mles we may have created.
* make sure to dispatch an assert_master task even in the case where
  the lockres is not yet updated (but the mle->master is this node)
* in dlm_assert_master_handler, if we were mastering the lock
  simultaneously with the eventual master (a MASTER mle), inform
  the master if we had contacted any other nodes so the master
  can clear those mles

Signed-off-by: mfasheh



Modified: branches/ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- branches/ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c	2006-03-02 01:10:17 UTC (rev 2781)
+++ branches/ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c	2006-03-02 02:09:29 UTC (rev 2782)
@@ -786,7 +786,15 @@
 			mlog_errno(ret);
 		if (mle->master != O2NM_MAX_NODES) {
 			/* found a master ! */
-			break;
+			if (mle->master <= nodenum)
+				break;
+			/* if our master request has not reached the master
+			 * yet, keep going until it does.  this is how the
+			 * master will know that asserts are needed back to
+			 * the lower nodes. */
+			mlog(0, "%s:%.*s: requests only up to %u but master "
+			     "is %u, keep going\n", dlm->name, namelen,
+			     lockid, nodenum, mle->master);
 		}
 	}
 
@@ -854,7 +862,19 @@
 	/* check if another node has already become the owner */
 	spin_lock(&res->spinlock);
 	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
+		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
+		     res->lockname.len, res->lockname.name, res->owner);
 		spin_unlock(&res->spinlock);
+		/* this will cause the master to re-assert across
+		 * the whole cluster, freeing up mles */
+		ret = dlm_do_master_request(mle, res->owner);
+		if (ret < 0) {
+			/* give recovery a chance to run */
+			mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
+			msleep(500);
+			goto recheck;
+		}
+		ret = 0;
 		goto leave;
 	}
 	spin_unlock(&res->spinlock);
@@ -1238,13 +1258,14 @@
 {
 	u8 response = DLM_MASTER_RESP_MAYBE;
 	struct dlm_ctxt *dlm = data;
-	struct dlm_lock_resource *res;
+	struct dlm_lock_resource *res = NULL;
 	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
 	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
 	char *name;
 	unsigned int namelen;
 	int found, ret;
 	int set_maybe;
+	int dispatch_assert = 0;
 
 	if (!dlm_grab(dlm))
 		return DLM_MASTER_RESP_NO;
@@ -1281,7 +1302,6 @@
 		}
 
 		if (res->owner == dlm->node_num) {
-			u32 flags = DLM_ASSERT_MASTER_MLE_CLEANUP;
 			spin_unlock(&res->spinlock);
 			// mlog(0, "this node is the master\n");
 			response = DLM_MASTER_RESP_YES;
@@ -1294,16 +1314,7 @@
 			 * caused all nodes up to this one to
 			 * create mles.  this node now needs to
 			 * go back and clean those up. */
-			mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
-			     dlm->node_num, res->lockname.len, res->lockname.name);
-			ret = dlm_dispatch_assert_master(dlm, res, 1,
-							 request->node_idx,
-							 flags);
-			if (ret < 0) {
-				mlog(ML_ERROR, "failed to dispatch assert "
-				     "master work\n");
-				response = DLM_MASTER_RESP_ERROR;
-			}
+			dispatch_assert = 1;
 			goto send_response;
 		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
 			spin_unlock(&res->spinlock);
@@ -1351,9 +1362,13 @@
 			}
 		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
 			set_maybe = 0;
-			if (tmpmle->master == dlm->node_num)
+			if (tmpmle->master == dlm->node_num) {
 				response = DLM_MASTER_RESP_YES;
-			else
+				/* this node will be the owner.
+				 * go back and clean the mles on any
+				 * other nodes */
+				dispatch_assert = 1;
+			} else
 				response = DLM_MASTER_RESP_NO;
 		} else {
 			// mlog(0, "this node is attempting to "
@@ -1392,8 +1407,8 @@
 			mle = (struct dlm_master_list_entry *)
 				kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
 			if (!mle) {
-				// bad bad bad... this sucks.
 				response = DLM_MASTER_RESP_ERROR;
+				mlog_errno(-ENOMEM);
 				goto send_response;
 			}
 			spin_lock(&dlm->spinlock);
@@ -1412,25 +1427,19 @@
 		// mlog(0, "mle was found\n");
 		set_maybe = 1;
 		spin_lock(&tmpmle->spinlock);
+		if (tmpmle->master == dlm->node_num) {
+			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
+			BUG();
+		}
 		if (tmpmle->type == DLM_MLE_BLOCK)
 			response = DLM_MASTER_RESP_NO;
 		else if (tmpmle->type == DLM_MLE_MIGRATION) {
 			mlog(0, "migration mle was found (%u->%u)\n",
 			     tmpmle->master, tmpmle->new_master);
-			if (tmpmle->master == dlm->node_num) {
-				mlog(ML_ERROR, "no lockres, but migration mle "
-				     "says that this node is master!\n");
-				BUG();
-			}
 			/* real master can respond on its own */
 			response = DLM_MASTER_RESP_NO;
-		} else {
-			if (tmpmle->master == dlm->node_num) {
-				response = DLM_MASTER_RESP_YES;
-				set_maybe = 0;
-			} else
-				response = DLM_MASTER_RESP_MAYBE;
-		}
+		} else
+			response = DLM_MASTER_RESP_MAYBE;
 		if (set_maybe)
 			set_bit(request->node_idx, tmpmle->maybe_map);
 		spin_unlock(&tmpmle->spinlock);
@@ -1443,6 +1452,24 @@
 		dlm_put_mle(tmpmle);
 	}
 send_response:
+
+	if (dispatch_assert) {
+		if (response != DLM_MASTER_RESP_YES)
+			mlog(ML_ERROR, "invalid response %d\n", response);
+		if (!res) {
+			mlog(ML_ERROR, "bad lockres while trying to assert!\n");
+			BUG();
+		}
+		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
+			     dlm->node_num, res->lockname.len, res->lockname.name);
+		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, 
+						 DLM_ASSERT_MASTER_MLE_CLEANUP);
+		if (ret < 0) {
+			mlog(ML_ERROR, "failed to dispatch assert master work\n");
+			response = DLM_MASTER_RESP_ERROR;
+		}
+	}
+
 	dlm_put(dlm);
 	return response;
 }
@@ -1465,8 +1492,11 @@
 	int to, tmpret;
 	struct dlm_node_iter iter;
 	int ret = 0;
+	int reassert;
 
 	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
+again:
+	reassert = 0;
 
 	/* note that if this nodemap is empty, it returns 0 */
 	dlm_node_iter_init(nodemap, &iter);
@@ -1498,9 +1528,17 @@
 			     "got %d.\n", namelen, lockname, to, r);
 			dlm_dump_lock_resources(dlm);
 			BUG();
+		} else if (r == EAGAIN) {
+			mlog(0, "%.*s: node %u create mles on other "
+			     "nodes and requests a re-assert\n", 
+			     namelen, lockname, to);
+			reassert = 1;
 		}
 	}
 
+	if (reassert)
+		goto again;
+
 	return ret;
 }
 
@@ -1522,6 +1560,8 @@
 	char *name;
 	unsigned int namelen;
 	u32 flags;
+	int master_request = 0;
+	int ret = 0;
 
 	if (!dlm_grab(dlm))
 		return 0;
@@ -1636,11 +1676,22 @@
 	// mlog(0, "woo!  got an assert_master from node %u!\n",
 	// 	     assert->node_idx);
 	if (mle) {
-		int extra_ref;
+		int extra_ref = 0;
+		int nn = -1;
 		
 		spin_lock(&mle->spinlock);
-		extra_ref = !!(mle->type == DLM_MLE_BLOCK
-			       || mle->type == DLM_MLE_MIGRATION);
+		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
+			extra_ref = 1;
+		else {
+			/* MASTER mle: if any bits set in the response map
+			 * then the calling node needs to re-assert to clear
+			 * up nodes that this node contacted */
+			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, 
+						    nn+1)) < O2NM_MAX_NODES) {
+				if (nn != dlm->node_num && nn != assert->node_idx)
+					master_request = 1;
+			}
+		}
 		mle->master = assert->node_idx;
 		atomic_set(&mle->woken, 1);
 		wake_up(&mle->wq);
@@ -1671,10 +1722,15 @@
 	}
 
 done:
+	ret = 0;
 	if (res)
 		dlm_lockres_put(res);
 	dlm_put(dlm);
-	return 0;
+	if (master_request) {
+		mlog(0, "need to tell master to reassert\n");
+		ret = EAGAIN;  // positive. negative would shoot down the node.
+	}
+	return ret;
 
 kill:
 	/* kill the caller! */
@@ -1707,6 +1763,10 @@
 	item->u.am.request_from = request_from;
 	item->u.am.flags = flags;
 
+	if (ignore_higher) 
+		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, 
+		     res->lockname.name);
+		
 	spin_lock(&dlm->work_lock);
 	list_add_tail(&item->list, &dlm->work_list);
 	spin_unlock(&dlm->work_lock);




More information about the Ocfs2-commits mailing list