[Ocfs2-commits] khackel commits r1921 - branches/dlm-reco-mig/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Mon Feb 28 18:06:49 CST 2005


Author: khackel
Date: 2005-02-28 18:06:48 -0600 (Mon, 28 Feb 2005)
New Revision: 1921

Modified:
   branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmaster.c
Log:
* split dlm_get_lock_resource into two functions by introducing 
  dlm_wait_for_lock_mastery
* start using dlm_node_iter, where appropriate
* NOTE: dlm_restart_lock_mastery is left undefined.  however, this
  process was not defined before either, it is just pulled out into
  its own function now to make it clearer.


Modified: branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmaster.c	2005-02-28 23:13:15 UTC (rev 1920)
+++ branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmaster.c	2005-03-01 00:06:48 UTC (rev 1921)
@@ -80,6 +80,10 @@
 			     const char *name,
 			     unsigned int namelen);
 
+static int dlm_wait_for_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res, 
+				     dlm_master_list_entry *mle);
+static int dlm_restart_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res,
+				    dlm_master_list_entry *mle);
 
 
 /*
@@ -289,7 +293,7 @@
 static void dlm_lockres_release(struct kref *kref)
 {
 	dlm_lock_resource *res;
-
+	
 	BUG_ON(!kref);
 
 	res = container_of(kref, dlm_lock_resource, refs);
@@ -405,8 +409,8 @@
 	dlm_lock_resource *tmpres=NULL, *res=NULL;
 	dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
 	int blocked = 0;
-	int map_changed = 0, restart = 0, assert = 0;
-	int ret, start, bit;
+	int ret, nodenum;
+	dlm_node_iter iter;
 	unsigned int namelen;
 
 	BUG_ON(!lockid);
@@ -465,6 +469,8 @@
 
 	/* check master list to see if another node has started mastering it */
 	spin_lock(&dlm->master_lock);
+
+	/* if we found a block, wait for lock to be mastered by another node */
 	blocked = dlm_find_mle(dlm, &tmpmle, (char *)lockid, namelen);
 	if (blocked) {
 		if (tmpmle->type == DLM_MLE_MASTER) {
@@ -505,185 +511,168 @@
 	}
 
 	ret = -EINVAL;
-	start = 0;
-	while (1) {
-		bit = find_next_bit (mle->vote_map, NM_MAX_NODES, start);
-		if (bit >= NM_MAX_NODES) {
-			dlmprintk0("no more nodes\n");
-			break;
-		}
-
-		ret = dlm_do_master_request(mle, bit);
+	dlm_node_iter_init(mle->vote_map, &iter);
+	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+		ret = dlm_do_master_request(mle, nodenum);
 		if (ret < 0) {
-			// TODO
-			//dlmprintk("dlm_do_master_request returned %d\n", ret);
+			dlmprintk("dlm_do_master_request returned %d\n", ret);
 		}
 		if (mle->master != NM_MAX_NODES) {
-			// found a master!
+			/* found a master ! */
 			break;
 		}
-		start = bit+1;
 	}
 
 wait:
-	while (1) {
-		spin_lock(&res->spinlock);
-		if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
-			// another node has become the owner
-			spin_unlock(&res->spinlock);
-			break;
+	/* keep going until the response map includes all nodes */
+	ret = -EAGAIN;
+	while (ret == -EAGAIN) {
+		ret = dlm_wait_for_lock_mastery(dlm, res, mle);
+		if (ret == -EINVAL) {
+			dlmprintk0("some error occurred.  restarting "
+				   "lock mastery!\n");
+			/* TODO: figure out how restart this */
+			BUG();
 		}
-		spin_unlock(&res->spinlock);
+	}
+	if (ret == 0)
+		dlmprintk("lockres mastered by %u\n", res->owner);
 
-		spin_lock(&mle->spinlock);
-		if (mle->master != NM_MAX_NODES) {
-			u8 m = mle->master;
-			// dlmprintk("node %u is the master!\n", m);
-			spin_unlock(&mle->spinlock);
 
-			spin_lock(&res->spinlock);
-			dlm_set_lockres_owner(dlm, res, m, 0);
-			spin_unlock(&res->spinlock);
-			break;
-		}
-		restart = 0;
-		map_changed = (memcmp(mle->vote_map, mle->node_map, 
-				      sizeof(mle->vote_map)) != 0);
-		if (memcmp(mle->vote_map, mle->response_map, 
-			   sizeof(mle->vote_map)) == 0) {
-			// dlmprintk("every node has responded...\n");
-			if (map_changed) {
-				dlmprintk0("eek! got all original nodes, but "
-					   "nodemap changed while collecting "
-					   "responses\n");
-				restart = 1;
-			}
+	dlm_put_mle(mle);
 
-			if (mle->error) {
-				dlmprintk0("ugh.  some node hit an error "
-					   "(-ENOMEM).  try the whole thing "
-					   "again\n"); 
-				mle->error = 0;
-				/* TODO: treat this just like the dead node 
-				 * case below, cleanup and start over, but 
-				 * keep the error node around */
-				restart = 1;
-			}
+wake_waiters:
+	spin_lock(&res->spinlock);
+	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
+	spin_unlock(&res->spinlock);
+	wake_up(&res->wq);
 
-			bit = find_next_bit(mle->maybe_map, NM_MAX_NODES, 0);
-			if (bit >= NM_MAX_NODES) {
-				/* No other nodes are in-progress. Those nodes 
-				 * should all be locking out this lockid until 
-				 * I assert. They should have put a dummy entry
-				 * on dlm->master_list. Need to assert myself as
-				 * the master. */ 
-				// dlmprintk0("I am the only node in-progress!"
-				// "  asserting myself as master\n");
-				assert = 1;
-			} else {
-				/* other nodes are in-progress */
-				if (map_changed && 
-				    !test_bit(bit, mle->node_map)) {
-					/* TODO: need to copy the node_map into
-					 * the vote_map, zero everything out 
-					 * and start over */
-					dlmprintk("need to handle this case.  "
-						  "winning node %u just "
-						  "died\n", bit);
-					restart = 1;
-				}
+	return res;
+}
 
-				if (bit > dlm->group_index) {
-					// dlmprintk("next in-progress node "
-					// "(%u) is higher than me (%u)\n",
-					//        bit, dlm->group_index);
 
-					/* Nodes not in-progress should be 
-					 * locking out this lockid until I 
-					 * assert. In-progress nodes should 
-					 * match me up with their lowest 
-					 * maybe_map bit. Need to assert myself
-					 * as the master */
-					// dlmprintk("I am the lowest node!  "
-					// "asserting myself as master\n");
-					assert = 1;
-				} else {
-					/* Need to sit around and wait for 
-					 * assert. My lowest maybe_map bit 
-					 * should be the one to assert. Just 
-					 * fall through and sleep. Should be 
-					 * woken by the handler. */
-					// dlmprintk("sleeping while waiting "
-					// "for %u to assert himself as "
-					// "master\n", bit);
-				}
-			}
-		} else {
-			if (map_changed) {
-				/* TODO: need to handle this */
-				dlmprintk0("eek! nodemap changed while "
-					   "collecting responses\n");
-				restart = 1;
-			}
-			// dlmprintk0("still waiting for all nodes to "
-			// "respond...\n");
-		}
+static int dlm_wait_for_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res, 
+				     dlm_master_list_entry *mle)
+{
+	u8 m;
+	int ret = 0, tmpret, bit;
+	int map_changed = 0, voting_done = 0;
+	int assert = 0, sleep;
 
-		if (restart && assert)
-			assert = 0;
+	/* check if another node has already become the owner */
+	spin_lock(&res->spinlock);
+	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
+		spin_unlock(&res->spinlock);
+		goto leave;
+	}
+	spin_unlock(&res->spinlock);
 
-		/* make sure to tell any other nodes that i am mastering this */
-		if (assert)
-			mle->master = dlm->group_index;
-
+	spin_lock(&mle->spinlock);
+	m = mle->master;
+	map_changed = (memcmp(mle->vote_map, mle->node_map, 
+			      sizeof(mle->vote_map)) != 0);
+	voting_done = (memcmp(mle->vote_map, mle->response_map,
+			     sizeof(mle->vote_map)) == 0);
+	
+	/* restart if we hit any errors */
+	if (mle->error || map_changed) {
+		if (mle->error) {
+			dlmprintk("another node got error %d, restarting\n",
+			  	mle->error);
+			mle->error = 0;
+		}
+		if (map_changed)
+			dlmprintk0("node map changed, restarting\n");
 		spin_unlock(&mle->spinlock);
-		
-		if (assert) {
-			ret = dlm_do_assert_master(mle);
-			// dlmprintk("assert returned %d!\n", ret);
-			if (ret == 0) {
-				spin_lock(&res->spinlock);
-				dlm_set_lockres_owner(dlm, res,
-						      dlm->group_index, 0);
-				spin_unlock(&res->spinlock);
-				// dlmprintk("wooo!  i am the owner.  phew!\n");
-				break;
-			} else 
-				restart = 1;
+		tmpret = dlm_restart_lock_mastery(dlm, res, mle);
+		if (tmpret < 0)
+			dlmprintk("dlm_restart_lock_mastery returned %d!\n",
+				  tmpret);
+		ret = -EINVAL;
+		goto leave;
+	}
+
+	if (m != NM_MAX_NODES) {
+		/* another node has done an assert!
+		 * all done! */
+		sleep = 0;
+	} else {
+		sleep = 1;
+		/* have all nodes responded? */
+		if (voting_done) {
+			bit = find_next_bit(mle->maybe_map, NM_MAX_NODES, 0);
+			if (dlm->group_index <= bit) {
+				/* my node number is lowest.
+			 	 * now tell other nodes that I am 
+				 * mastering this. */
+				mle->master = dlm->group_index;
+				assert = 1;
+				sleep = 0;
+			}
+			/* if voting is done, but we have not received
+			 * an assert master yet, we must sleep */
 		}
-		if (restart) {
-			dlmprintk0("something happened such that the master "
-				   "process needs to be restarted!\n");
-			/* TODO: clear it all out and start over */
-		}
+	}
 
+	spin_unlock(&mle->spinlock);
+
+	/* sleep if we haven't finished voting yet */
+	if (sleep) {
 		atomic_set(&mle->woken, 0);
 		ret = wait_event_interruptible_timeout(mle->wq, 
 					(atomic_read(&mle->woken) == 1), 
 					msecs_to_jiffies(5000));
-		if (ret == 0) {
+
+		if (ret >= 0 && !atomic_read(&mle->woken)) {
 			dlmprintk("timed out during lock mastery: "
 				  "vote_map=%0lx, response_map=%0lx\n",
 				  mle->vote_map[0], mle->response_map[0]);
-			continue;
 		}
+		/* unless we are aborting, need to recheck and 
+		 * maybe sleep again */
+		if (ret != -EINTR)
+			ret = -EAGAIN;
+		goto leave;
+	}
+
+	ret = 0;   /* done */	
+	if (assert) {
+		m = dlm->group_index;
+		ret = dlm_do_assert_master(mle);
+		if (ret) {
+			dlmprintk("dlm_do_assert_master returned %d!\n",
+				  ret);
+			tmpret = dlm_restart_lock_mastery(dlm, res, mle);
+			if (tmpret < 0)
+				dlmprintk("dlm_restart_lock_mastery returned "
+					  "%d!\n", tmpret);
+			ret = -EINVAL;
+			goto leave;
+		}
 		if (ret < 0) {
 			dlmprintk0("interrupted during lock mastery!\n");
-			break;
+			goto leave;
 		}
 	}
-	dlm_put_mle(mle);
 
-wake_waiters:
+	/* set the lockres owner */
 	spin_lock(&res->spinlock);
-	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
+	dlm_set_lockres_owner(dlm, res, m, 0);
 	spin_unlock(&res->spinlock);
-	wake_up(&res->wq);
 
-	return res;
+leave:
+	return ret;
 }
 
+static int dlm_restart_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res,
+				    dlm_master_list_entry *mle)
+{
+	dlmprintk0("something happened such that the whole "
+		   "master process needs to be restarted!\n");
+	return 0;
+}
 
+
 /*
  * DLM_MASTER_REQUEST_MSG
  */
@@ -1060,16 +1049,12 @@
 	struct inode *inode = NULL;
 	dlm_ctxt *dlm = mle->dlm;
 	dlm_assert_master assert;
-	int to, start = 0, ret = 0, tmpret;
+	int to, ret = 0, tmpret;
+	dlm_node_iter iter;
 
-	while (1) {
-		to = find_next_bit (mle->vote_map, NM_MAX_NODES, start);
-		if (to >= NM_MAX_NODES) {
-			// dlmprintk0("no more nodes\n");
-			break;
-		}
+	dlm_node_iter_init(mle->vote_map, &iter);
+	while ((to = dlm_node_iter_next(&iter)) >= 0) {
 		// dlmprintk("sending assert master to %d\n", to);
-
 		memset(&assert, 0, sizeof(assert));
 		assert.node_idx = dlm->group_index;
 		if (mle->type == DLM_MLE_BLOCK) {
@@ -1101,7 +1086,6 @@
 			ret = tmpret;
 			break;
 		}
-		start = to+1;
 	}
 
 	return ret;



More information about the Ocfs2-commits mailing list