[Ocfs2-commits] khackel commits r1916 - branches/dlm-reco-mig/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Mon Feb 28 14:00:17 CST 2005


Author: khackel
Date: 2005-02-28 14:00:15 -0600 (Mon, 28 Feb 2005)
New Revision: 1916

Modified:
   branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmaster.c
   branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.c
   branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.h
Log:
* move the master_list and master_lock onto the dlm_ctxt structure
* move dlm_num_resources onto the dlm_ctxt, and split into three separate
  counters (local, remote and unknown)
* introduce dlm_set_lockres_owner as the standard method for setting the
  lockres owner (which also manages the counters above)


Modified: branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmaster.c	2005-02-28 19:24:54 UTC (rev 1915)
+++ branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmaster.c	2005-02-28 20:00:15 UTC (rev 1916)
@@ -50,12 +50,7 @@
 #include "dlmcommon.h"
 #include "dlmmod.h"
 
-spinlock_t dlm_master_lock = SPIN_LOCK_UNLOCKED;
-LIST_HEAD(dlm_master_list);
 
-/* gives a really vague idea of the system load */
-atomic_t dlm_num_resources = ATOMIC_INIT(0);
-
 static int dlm_init_mle(dlm_master_list_entry *mle,
 			enum dlm_mle_type type,
 			dlm_ctxt *dlm,
@@ -70,13 +65,50 @@
 		     int idx, void *data);
 static void dlm_mle_node_down(struct inode *group, struct inode *node, 
 		       int idx, void *data);
+static dlm_lock_resource *dlm_new_lockres(dlm_ctxt *dlm,
+					  const char *name,
+					  unsigned int namelen);
+static void dlm_init_lockres(dlm_ctxt *dlm,
+			     dlm_lock_resource *res,
+			     const char *name,
+			     unsigned int namelen);
 
+static inline void dlm_set_lockres_owner(dlm_ctxt *dlm, dlm_lock_resource *res,
+					 u8 owner, int init)
+{
+	if (!init) {
+		if (owner == res->owner)
+			return;
+
+		if (res->owner == dlm->group_index)
+			atomic_dec(&dlm->local_resources);
+		else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
+			atomic_dec(&dlm->unknown_resources);
+		else
+			atomic_dec(&dlm->remote_resources);
+	}
+
+	if (owner == dlm->group_index)
+		atomic_inc(&dlm->local_resources);
+	else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
+		atomic_inc(&dlm->unknown_resources);
+	else
+		atomic_inc(&dlm->remote_resources);
+
+	res->owner = owner;
+}
+
 /* remove from list and free */
 static void dlm_put_mle(dlm_master_list_entry *mle)
 {
-	if (atomic_dec_and_lock(&mle->refcnt, &dlm_master_lock)) {
+	dlm_ctxt *dlm;
+	DLM_ASSERT(mle);
+	DLM_ASSERT(mle->dlm);
+	dlm = mle->dlm;
+
+	if (atomic_dec_and_lock(&mle->refcnt, &dlm->master_lock)) {
 		list_del(&mle->list);
-		spin_unlock(&dlm_master_lock);
+		spin_unlock(&dlm->master_lock);
 		hb_unregister_callback(&mle->mle_hb_down);
 		hb_unregister_callback(&mle->mle_hb_up);
 		kfree(mle);
@@ -219,7 +251,8 @@
 	spin_unlock(&dlm->spinlock);
 }
 
-static void dlm_init_lockres(dlm_lock_resource *res,
+static void dlm_init_lockres(dlm_ctxt *dlm,
+			     dlm_lock_resource *res,
 			     const char *name,
 			     unsigned int namelen)
 {
@@ -246,13 +279,14 @@
 
 	kref_init(&res->refs, dlm_lockres_release);
 
-	res->owner = DLM_LOCK_RES_OWNER_UNKNOWN;
+	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN, 1);
 	res->state = DLM_LOCK_RES_IN_PROGRESS;
 
 	memset(res->lvb, 0, DLM_LVB_LEN);
 }
 
-static dlm_lock_resource *dlm_new_lockres(const char *name,
+static dlm_lock_resource *dlm_new_lockres(dlm_ctxt *dlm,
+					  const char *name,
 					  unsigned int namelen)
 {
 	dlm_lock_resource *res;
@@ -267,7 +301,7 @@
 		return NULL;
 	}
 
-	dlm_init_lockres(res, name, namelen);
+	dlm_init_lockres(dlm, res, name, namelen);
 	return res;
 }
 
@@ -279,7 +313,7 @@
  * if not, allocate enough for the lockres and for
  * the temporary structure used in doing the mastering.
  * 
- * also, do a lookup in the dlm_master_list to see
+ * also, do a lookup in the dlm->master_list to see
  * if another node has begun mastering the same lock.
  * if so, there should be a block entry in there
  * for this name, and we should *not* attempt to master
@@ -332,7 +366,7 @@
 		if (!mle)
 			return NULL;
 
-		res = dlm_new_lockres(lockid, namelen);
+		res = dlm_new_lockres(dlm, lockid, namelen);
 		if (!res) {
 			kfree(mle);
 			return NULL;
@@ -346,17 +380,16 @@
 	if (flags & LKM_LOCAL) {
 		/* caller knows it's safe to assume it's not mastered elsewhere
 		 * DONE!  return right away */
+		dlm_set_lockres_owner(dlm, res, dlm->group_index, 0);
 		__dlm_insert_lock(dlm, res);
-		res->owner = dlm->group_index;
-		atomic_inc(&dlm_num_resources);
 		spin_unlock(&dlm->spinlock);
 		/* lockres still marked IN_PROGRESS */
 		goto wake_waiters;
 	}
 
 	/* check master list to see if another node has started mastering it */
-	spin_lock(&dlm_master_lock);
-	list_for_each(iter, &dlm_master_list) {
+	spin_lock(&dlm->master_lock);
+	list_for_each(iter, &dlm->master_list) {
 		tmpmle = list_entry(iter, dlm_master_list_entry, list);
 		if (!dlm_mle_equal(dlm, tmpmle, lockid, namelen))
 			continue;
@@ -378,23 +411,22 @@
 			dlmprintk0("bug! failed to register hb callbacks\n");
 			BUG();
 		}
-		list_add(&mle->list, &dlm_master_list);
+		list_add(&mle->list, &dlm->master_list);
 	}
-	spin_unlock(&dlm_master_lock);
+	spin_unlock(&dlm->master_lock);
 
 	/* at this point there is either a DLM_MLE_BLOCK or a
 	 * DLM_MLE_MASTER on the master list, so it's safe to add the
 	 * lockres to the hashtable.  anyone who finds the lock will
 	 * still have to wait on the IN_PROGRESS.  also, any new nodes
 	 * that try to join at this point will have to wait until my
-	 * dlm_master_lock list is empty, so they cannot possibly do
+	 * dlm->master_lock list is empty, so they cannot possibly do
 	 * any master requests yet... TODO ?? should i have a special
 	 * type of mle just for joining nodes ??  ?? could allow them
 	 * to come in and put their mle on the list and sleep ?? */
 
 	/* finally add the lockres to its hash bucket */
 	__dlm_insert_lock(dlm, res);
-	atomic_inc(&dlm_num_resources);
 	spin_unlock(&dlm->spinlock);
 
 	if (blocked) {
@@ -442,7 +474,7 @@
 			spin_unlock(&mle->spinlock);
 
 			spin_lock(&res->spinlock);
-			res->owner = m;
+			dlm_set_lockres_owner(dlm, res, m, 0);
 			spin_unlock(&res->spinlock);
 			break;
 		}
@@ -475,7 +507,7 @@
 				/* No other nodes are in-progress. Those nodes 
 				 * should all be locking out this lockid until 
 				 * I assert. They should have put a dummy entry
-				 * on dlm_master_list. Need to assert myself as
+				 * on dlm->master_list. Need to assert myself as
 				 * the master. */ 
 				// dlmprintk0("I am the only node in-progress!"
 				// "  asserting myself as master\n");
@@ -543,7 +575,8 @@
 			// dlmprintk("assert returned %d!\n", ret);
 			if (ret == 0) {
 				spin_lock(&res->spinlock);
-				res->owner = dlm->group_index;
+				dlm_set_lockres_owner(dlm, res,
+						      dlm->group_index, 0);
 				spin_unlock(&res->spinlock);
 				// dlmprintk("wooo!  i am the owner.  phew!\n");
 				break;
@@ -587,7 +620,7 @@
  * dlm->spinlock
  * res->spinlock
  * mle->spinlock
- * dlm_master_list
+ * dlm->master_list
  *
  * if possible, TRIM THIS DOWN!!!
  */
@@ -650,8 +683,8 @@
 
 		// dlmprintk0("lockres is in progress...\n");
 		found = 0;
-		spin_lock(&dlm_master_lock);
-		list_for_each(iter, &dlm_master_list) {
+		spin_lock(&dlm->master_lock);
+		list_for_each(iter, &dlm->master_list) {
 			tmpmle = list_entry(iter, dlm_master_list_entry, list);
 			if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
 				continue;
@@ -670,7 +703,7 @@
 			set_bit(request->node_idx, tmpmle->maybe_map);
 			spin_unlock(&tmpmle->spinlock);
 
-			spin_unlock(&dlm_master_lock);
+			spin_unlock(&dlm->master_lock);
 			spin_unlock(&res->spinlock);
 
 			dlm_put_mle(tmpmle);
@@ -678,7 +711,7 @@
 				kfree(mle);
 			goto send_response;
 		}
-		spin_unlock(&dlm_master_lock);
+		spin_unlock(&dlm->master_lock);
 		spin_unlock(&res->spinlock);
 		dlmprintk0("bug bug bug!!!  no mle found for this lock!\n");
 		BUG();
@@ -691,8 +724,8 @@
 	 * otherwise, add an MLE_BLOCK, return NO 
 	 */
 	found = 0;
-	spin_lock(&dlm_master_lock);
-	list_for_each(iter, &dlm_master_list) {
+	spin_lock(&dlm->master_lock);
+	list_for_each(iter, &dlm->master_list) {
 		tmpmle = list_entry(iter, dlm_master_list_entry, list);
 		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
 			continue;
@@ -705,7 +738,7 @@
 		/* this lockid has never been seen on this node yet */
 		// dlmprintk0("no mle found\n");
 		if (!mle) {
-			spin_unlock(&dlm_master_lock);
+			spin_unlock(&dlm->master_lock);
 			spin_unlock(&dlm->spinlock);
 
 			mle = kmalloc(sizeof(dlm_master_list_entry) + 
@@ -728,7 +761,7 @@
 		// dlmprintk0("this is second time thru, already allocated, "
 		// "add the block.\n");
 		set_bit(request->node_idx, mle->maybe_map);
-		list_add(&mle->list, &dlm_master_list);
+		list_add(&mle->list, &dlm->master_list);
 		response = DLM_MASTER_RESP_NO;
 	} else {
 		// dlmprintk0("mle was found\n");
@@ -741,7 +774,7 @@
 		spin_unlock(&tmpmle->spinlock);
 		dlm_put_mle(tmpmle);
 	}
-	spin_unlock(&dlm_master_lock);
+	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->spinlock);
 
 send_response:
@@ -749,14 +782,14 @@
 	return response;
 }
 
-/* NOTE: when doing node recovery, run the dlm_master_list looking for the 
+/* NOTE: when doing node recovery, run the dlm->master_list looking for the 
  * dead node in any maybe_map... clear that bit, and if now empty, clear the 
  * whole thing */
 
 /*
  * locks that can be taken here:
  * mle->spinlock
- * dlm_master_list
+ * dlm->master_list
  *
  */
 int dlm_master_request_resp_handler(net_msg *msg, u32 len, void *data)
@@ -777,8 +810,8 @@
 		goto done;
 	}
 
-	spin_lock(&dlm_master_lock);
-	list_for_each(iter, &dlm_master_list) {
+	spin_lock(&dlm->master_lock);
+	list_for_each(iter, &dlm->master_list) {
 		mle = list_entry(iter, dlm_master_list_entry, list);
 		if (!dlm_mle_equal(dlm, mle, resp->name, resp->namelen)) {
 			mle = NULL;
@@ -836,7 +869,7 @@
 		spin_unlock(&mle->spinlock);
 		break;
 	}
-	spin_unlock(&dlm_master_lock);
+	spin_unlock(&dlm->master_lock);
 
 	if (found)
 		dlm_put_mle(mle);
@@ -854,7 +887,7 @@
  * dlm->spinlock
  * res->spinlock
  * mle->spinlock
- * dlm_master_list
+ * dlm->master_list
  *
  * if possible, TRIM THIS DOWN!!!
  */
@@ -884,8 +917,8 @@
 	spin_lock(&dlm->spinlock);
 
 	/* find the MLE */
-	spin_lock(&dlm_master_lock);
-	list_for_each(iter, &dlm_master_list) {
+	spin_lock(&dlm->master_lock);
+	list_for_each(iter, &dlm->master_list) {
 		mle = list_entry(iter, dlm_master_list_entry, list);
 		if (dlm_mle_equal(dlm, mle, name, namelen)) {
 			dlm_get_mle(mle);
@@ -897,7 +930,7 @@
 		dlmprintk("EEEEEEK!  just got an assert_master from %u, but no "
 			  "MLE for it!\n",
 		       assert->node_idx);
-		spin_unlock(&dlm_master_lock);
+		spin_unlock(&dlm->master_lock);
 		goto check_lockres;
 	}
 	bit = find_next_bit (mle->maybe_map, NM_MAX_NODES, 0);
@@ -911,7 +944,7 @@
 			  "asserting!\n", bit, assert->node_idx);
 		BUG();
 	}
-	spin_unlock(&dlm_master_lock);
+	spin_unlock(&dlm->master_lock);
 
 	/* ok everything checks out with the MLE
 	 * now check to see if there is a lockres */

Modified: branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.c
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.c	2005-02-28 19:24:54 UTC (rev 1915)
+++ branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.c	2005-02-28 20:00:15 UTC (rev 1916)
@@ -78,15 +78,9 @@
 
 
 
-/* ----------------------------------------------------------------- */
 
-extern spinlock_t dlm_master_lock;
-extern struct list_head dlm_master_list;
-/* ----------------------------------------------------------------- */
 
 
-
-
 /*
  * dlm_driver_entry()
  *
@@ -646,10 +640,12 @@
 	dlm->key = key;
 
 	spin_lock_init(&dlm->spinlock);
+	spin_lock_init(&dlm->master_lock);
 	INIT_LIST_HEAD(&dlm->list);
 	INIT_LIST_HEAD(&dlm->dirty_list);
 	INIT_LIST_HEAD(&dlm->reco.resources);
 	INIT_LIST_HEAD(&dlm->reco.received);
+	INIT_LIST_HEAD(&dlm->master_list);
 	util_thread_info_init(&dlm->thread);
 	util_thread_info_init(&dlm->reco.thread);
 	init_rwsem(&dlm->recovery_sem);
@@ -662,6 +658,9 @@
 	dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
 	dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
 	dlm->reco.next_seq = 0;
+	atomic_set(&dlm->local_resources, 0);
+	atomic_set(&dlm->remote_resources, 0);
+	atomic_set(&dlm->unknown_resources, 0);
 
 	kref_init(&dlm->dlm_refs, dlm_ctxt_release);
 	dlm->dlm_state = DLM_CTXT_NEW;

Modified: branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.h
===================================================================
--- branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.h	2005-02-28 19:24:54 UTC (rev 1915)
+++ branches/dlm-reco-mig/fs/ocfs2/dlm/dlmmod.h	2005-02-28 20:00:15 UTC (rev 1916)
@@ -216,6 +216,14 @@
 	unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
 	unsigned long recovery_map[BITS_TO_LONGS(NM_MAX_NODES)];
 	dlm_recovery_ctxt reco;
+	spinlock_t master_lock;
+	struct list_head master_list;
+
+	/* these give a really vague idea of the system load */
+	atomic_t local_resources;
+	atomic_t remote_resources;
+	atomic_t unknown_resources;
+
 	/* dlm_refs and dlm_state are protected by dlm_domain_lock */
 	struct kref dlm_refs;
 	dlm_ctxt_state dlm_state;



More information about the Ocfs2-commits mailing list