[Ocfs2-commits] khackel commits r2521 - branches/ocfs2-1.0/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Wed Aug 17 15:42:16 CDT 2005


Author: khackel
Signed-off-by: mfasheh
Date: 2005-08-17 15:42:14 -0500 (Wed, 17 Aug 2005)
New Revision: 2521

Modified:
   branches/ocfs2-1.0/fs/ocfs2/dlm/dlmrecovery.c
Log:
* merge revision 2520 from HEAD:
   - fix a problem with lvb recovery reported in bug 527

Signed-off-by: mfasheh



Modified: branches/ocfs2-1.0/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- branches/ocfs2-1.0/fs/ocfs2/dlm/dlmrecovery.c	2005-08-17 20:25:55 UTC (rev 2520)
+++ branches/ocfs2-1.0/fs/ocfs2/dlm/dlmrecovery.c	2005-08-17 20:42:14 UTC (rev 2521)
@@ -919,15 +919,17 @@
 	ml->list = queue;
 	if (lock->lksb) {
 		ml->flags = lock->lksb->flags;
-		if (ml->flags & DLM_LKSB_PUT_LVB) {
-			/* NOTE: because we only support NL, PR and EX locks
-			 * there can be only one lock on this lockres with
-			 * this flag, and it must be currently an EX.
-			 * this means this node had a pending LVB change
-			 * when the master died.  we should send his lvb
-			 * over and attach it to the lksb on the other side */
-			BUG_ON(ml->type != LKM_EXMODE);
-			BUG_ON(mres->lvb[0] != 0);
+		/* send our current lvb */
+		if (ml->type == LKM_EXMODE ||
+		    ml->type == LKM_PRMODE) {
+			/* if it is already set, this had better be a PR
+			 * and it has to match */
+			if (mres->lvb[0] && (ml->type == LKM_EXMODE ||
+			    memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
+				mlog(ML_ERROR, "mismatched lvbs!\n");
+				__dlm_print_one_lock_resource(lock->lockres);
+				BUG();
+			}
 			memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
 		}
 	}
@@ -1439,13 +1441,29 @@
 		}
 		lksb->flags |= (ml->flags &
 				(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
-		if (lksb->flags & DLM_LKSB_PUT_LVB) {
-			/* other node was trying to update
-			 * lvb when node died.  recreate the
-			 * lksb with the updated lvb. */
-			memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
+			
+		if (mres->lvb[0]) {
+			if (lksb->flags & DLM_LKSB_PUT_LVB) {
+				/* other node was trying to update
+				 * lvb when node died.  recreate the
+				 * lksb with the updated lvb. */
+				memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
+			} else {
+				/* otherwise, the node is sending its 
+				 * most recent valid lvb info */
+				BUG_ON(ml->type != LKM_EXMODE &&
+				       ml->type != LKM_PRMODE);
+				if (res->lvb[0] && (ml->type == LKM_EXMODE ||
+				    memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+					mlog(ML_ERROR, "received bad lvb!\n");
+					__dlm_print_one_lock_resource(res);
+					BUG();
+				}
+				memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
+			}
 		}
 
+
 		/* NOTE:
 		 * wrt lock queue ordering and recovery:
 		 *    1. order of locks on granted queue is
@@ -1616,12 +1634,71 @@
 	}
 }
 
+static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local)
+{
+	if (local) {
+		if (lock->ml.type != LKM_EXMODE &&
+		    lock->ml.type != LKM_PRMODE)
+			return 1;
+	} else if (lock->ml.type == LKM_EXMODE)
+		return 1;
+	return 0;
+}
+
+static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
+			       struct dlm_lock_resource *res, u8 dead_node)
+{
+	struct list_head *iter, *queue;
+	struct dlm_lock *lock;
+	int blank_lvb = 0, local = 0;
+	int i;
+	u8 search_node;
+
+	assert_spin_locked(&dlm->spinlock);
+	assert_spin_locked(&res->spinlock);
+
+	if (res->owner == dlm->node_num)
+		/* if this node owned the lockres, and if the dead node 
+		 * had an EX when he died, blank out the lvb */
+		search_node = dead_node;
+	else {
+		/* if this is a secondary lockres, and we had no EX or PR
+		 * locks granted, we can no longer trust the lvb */
+		search_node = dlm->node_num;
+		local = 1;  /* check local state for valid lvb */
+	}
+
+	for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) {
+		queue = dlm_list_idx_to_ptr(res, i);
+		list_for_each(iter, queue) {
+			lock = list_entry (iter, struct dlm_lock, list);
+			if (lock->ml.node == search_node) {
+				if (dlm_lvb_needs_invalidation(lock, local)) {
+					/* zero the lksb lvb and lockres lvb */
+					blank_lvb = 1;
+					memset(lock->lksb->lvb, 0, DLM_LVB_LEN);
+				}
+			}
+		}
+	}
+
+	if (blank_lvb) {
+		mlog(0, "clearing %.*s lvb, dead node %u had EX\n",
+		     res->lockname.len, res->lockname.name, dead_node);
+		memset(res->lvb, 0, DLM_LVB_LEN);
+	}
+}
+
 static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
 				struct dlm_lock_resource *res, u8 dead_node)
 {
 	struct list_head *iter, *tmpiter;
 	struct dlm_lock *lock;
 
+	/* this node is the lockres master:
+	 * 1) remove any stale locks for the dead node
+	 * 2) if the dead node had an EX when he died, blank out the lvb 
+	 */
 	assert_spin_locked(&dlm->spinlock);
 	assert_spin_locked(&res->spinlock);
 
@@ -1652,6 +1729,13 @@
 	__dlm_dirty_lockres(dlm, res);
 }
 
+/* if this node is the recovery master, and there are no
+ * locks for a given lockres owned by this node that are in
+ * either PR or EX mode, zero out the lvb before requesting.
+ *
+ */
+
+
 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
 {
 	struct list_head *iter;
@@ -1684,7 +1768,10 @@
 			if (dlm_is_recovery_lock(res->lockname.name,
 						 res->lockname.len))
 				continue;
+			
 			spin_lock(&res->spinlock);
+			/* zero the lvb if necessary */
+			dlm_revalidate_lvb(dlm, res, dead_node);
 			if (res->owner == dead_node)
 				dlm_move_lockres_to_recovery_list(dlm, res);
 			else if (res->owner == dlm->node_num) {



More information about the Ocfs2-commits mailing list