[Ocfs2-commits] smushran commits r2849 - branches/ocfs2-1.2/fs/ocfs2/dlm

svn-commits@oss.oracle.com svn-commits at oss.oracle.com
Wed Apr 19 16:46:50 CDT 2006


Author: smushran
Signed-off-by: mfasheh
Signed-off-by: khackel
Date: 2006-04-19 16:46:49 -0500 (Wed, 19 Apr 2006)
New Revision: 2849

Modified:
   branches/ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c
Log:
DLM: Fixes mle ref counting while inuse
Signed-off-by: mfasheh
Signed-off-by: khackel

Modified: branches/ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- branches/ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c	2006-04-19 21:43:08 UTC (rev 2848)
+++ branches/ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c	2006-04-19 21:46:49 UTC (rev 2849)
@@ -73,6 +73,7 @@
 	wait_queue_head_t wq;
 	atomic_t woken;
 	struct kref mle_refs;
+	int inuse;
 	unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
@@ -333,6 +334,31 @@
 	spin_unlock(&dlm->spinlock);
 }
 
+static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
+{
+	struct dlm_ctxt *dlm;
+	dlm = mle->dlm;
+
+	assert_spin_locked(&dlm->spinlock);
+	assert_spin_locked(&dlm->master_lock);
+	mle->inuse++;
+	kref_get(&mle->mle_refs);
+}
+
+static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
+{
+	struct dlm_ctxt *dlm;
+	dlm = mle->dlm;
+
+	spin_lock(&dlm->spinlock);
+	spin_lock(&dlm->master_lock);
+	mle->inuse--;
+	__dlm_put_mle(mle);
+	spin_unlock(&dlm->master_lock);
+	spin_unlock(&dlm->spinlock);
+
+}
+
 /* remove from list and free */
 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 {
@@ -386,6 +412,7 @@
 	memset(mle->response_map, 0, sizeof(mle->response_map));
 	mle->master = O2NM_MAX_NODES;
 	mle->new_master = O2NM_MAX_NODES;
+	mle->inuse = 0;
 
 	if (mle->type == DLM_MLE_MASTER) {
 		BUG_ON(!res);
@@ -806,7 +833,7 @@
 	 * if so, the creator of the BLOCK may try to put the last
 	 * ref at this time in the assert master handler, so we
 	 * need an extra one to keep from a bad ptr deref. */
-	dlm_get_mle(mle);
+	dlm_get_mle_inuse(mle);
 	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->spinlock);
 
@@ -895,7 +922,7 @@
 	dlm_mle_detach_hb_events(dlm, mle);
 	dlm_put_mle(mle);
 	/* put the extra ref */
-	dlm_put_mle(mle);
+	dlm_put_mle_inuse(mle);
 
 wake_waiters:
 	spin_lock(&res->spinlock);
@@ -1747,6 +1774,7 @@
 	if (mle) {
 		int extra_ref = 0;
 		int nn = -1;
+		int rr, err = 0;
 		
 		spin_lock(&mle->spinlock);
 		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
@@ -1766,28 +1794,65 @@
 		wake_up(&mle->wq);
 		spin_unlock(&mle->spinlock);
 
-		if (mle->type == DLM_MLE_MIGRATION && res) {
-			mlog(0, "finishing off migration of lockres %.*s, "
-			     "from %u to %u\n",
-			       res->lockname.len, res->lockname.name,
-			       dlm->node_num, mle->new_master);
+		if (res) {
 			spin_lock(&res->spinlock);
-			res->state &= ~DLM_LOCK_RES_MIGRATING;
-			dlm_change_lockres_owner(dlm, res, mle->new_master);
-			BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
+			if (mle->type == DLM_MLE_MIGRATION) {
+				mlog(0, "finishing off migration of lockres %.*s, "
+			     		"from %u to %u\n",
+			       		res->lockname.len, res->lockname.name,
+			       		dlm->node_num, mle->new_master);
+				res->state &= ~DLM_LOCK_RES_MIGRATING;
+				dlm_change_lockres_owner(dlm, res, mle->new_master);
+				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
+			} else {
+				dlm_change_lockres_owner(dlm, res, mle->master);
+			}
 			spin_unlock(&res->spinlock);
 		}
-		/* master is known, detach if not already detached */
-		dlm_mle_detach_hb_events(dlm, mle);
-		dlm_put_mle(mle);
-		
+
+		/* master is known, detach if not already detached.
+		 * ensures that only one assert_master call will happen
+		 * on this mle. */
+		spin_lock(&dlm->spinlock);
+		spin_lock(&dlm->master_lock);
+
+		rr = atomic_read(&mle->mle_refs.refcount);
+		if (mle->inuse > 0) {
+			if (extra_ref && rr < 3)
+				err = 1;
+			else if (!extra_ref && rr < 2)
+				err = 1;
+		} else {
+			if (extra_ref && rr < 2)
+				err = 1;
+			else if (!extra_ref && rr < 1)
+				err = 1;
+		}
+		if (err) {
+			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
+			     "that will mess up this node, refs=%d, extra=%d, "
+			     "inuse=%d\n", dlm->name, namelen, name,
+			     assert->node_idx, rr, extra_ref, mle->inuse);
+			dlm_print_one_mle(mle);
+		}
+		list_del_init(&mle->list);
+		__dlm_mle_detach_hb_events(dlm, mle);
+		__dlm_put_mle(mle);
 		if (extra_ref) {
 			/* the assert master message now balances the extra
 		 	 * ref given by the master / migration request message.
 		 	 * if this is the last put, it will be removed
 		 	 * from the list. */
-			dlm_put_mle(mle);
+			__dlm_put_mle(mle);
 		}
+		spin_unlock(&dlm->master_lock);
+		spin_unlock(&dlm->spinlock);
+	} else if (res) {
+		if (res->owner != assert->node_idx) {
+			mlog(0, "assert_master from %u, but current "
+			     "owner is %u (%.*s), no mle\n", assert->node_idx,
+			     res->owner, namelen, name);
+		}
 	}
 
 done:
@@ -2132,7 +2197,7 @@
 	 * take both dlm->spinlock and dlm->master_lock */
 	spin_lock(&dlm->spinlock);
 	spin_lock(&dlm->master_lock);
-	dlm_get_mle(mle);
+	dlm_get_mle_inuse(mle);
 	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->spinlock);
 
@@ -2149,7 +2214,10 @@
 		/* migration failed, detach and clean up mle */
 		dlm_mle_detach_hb_events(dlm, mle);
 		dlm_put_mle(mle);
-		dlm_put_mle(mle);
+		dlm_put_mle_inuse(mle);
+		spin_lock(&res->spinlock);
+		res->state &= ~DLM_LOCK_RES_MIGRATING;
+		spin_unlock(&res->spinlock);
 		goto leave;
 	}
 
@@ -2190,7 +2258,10 @@
 			/* migration failed, detach and clean up mle */
 			dlm_mle_detach_hb_events(dlm, mle);
 			dlm_put_mle(mle);
-			dlm_put_mle(mle);
+			dlm_put_mle_inuse(mle);
+			spin_lock(&res->spinlock);
+			res->state &= ~DLM_LOCK_RES_MIGRATING;
+			spin_unlock(&res->spinlock);
 			goto leave;
 		}
 		/* TODO: if node died: stop, clean up, return error */
@@ -2206,7 +2277,7 @@
 
 	/* master is known, detach if not already detached */
 	dlm_mle_detach_hb_events(dlm, mle);
-	dlm_put_mle(mle);
+	dlm_put_mle_inuse(mle);
 	ret = 0;
 
 	dlm_lockres_calc_usage(dlm, res);




More information about the Ocfs2-commits mailing list