[Ocfs2-commits] mfasheh commits r1891 - trunk/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Thu Feb 17 18:13:18 CST 2005


Author: mfasheh
Date: 2005-02-17 18:13:17 -0600 (Thu, 17 Feb 2005)
New Revision: 1891

Modified:
   trunk/fs/ocfs2/dlm/dlmast.c
   trunk/fs/ocfs2/dlm/dlmconvert.c
   trunk/fs/ocfs2/dlm/dlmlock.c
   trunk/fs/ocfs2/dlm/dlmmaster.c
   trunk/fs/ocfs2/dlm/dlmmod.c
   trunk/fs/ocfs2/dlm/dlmmod.h
   trunk/fs/ocfs2/dlm/dlmrecovery.c
Log:
* ref counting for dlm_ctxt structures. This doesn't give us umount yet
  because we need to do lock migration. 
   -the kref api changed mid 2.6 so i'll have some compat glue for this ASAP.

* fix a bug where the dlm_ctxt was being overwritten due to bad arguments to
  hb_fill_node_map

* fix a bad flag check in dlm_create_lock_handler

* do a little bit of cleanup in dlm_get_lock_resource. "a little bit" --
  it's still a beast.



Modified: trunk/fs/ocfs2/dlm/dlmast.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmast.c	2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmast.c	2005-02-18 00:13:17 UTC (rev 1891)
@@ -56,7 +56,6 @@
 int dlm_do_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock)
 {
 	int ret;
-
 	dlm_astlockfunc_t *fn;
 	dlm_lockstatus *lksb;
 
@@ -105,7 +104,7 @@
 {
 	int ret;
 	dlm_bastlockfunc_t *fn = lock->bast;
-	
+
 	dlmprintk0("\n");
 
 	if (lock->node != dlm->group_index) {
@@ -140,6 +139,9 @@
 	u64 cookie;
 	u32 flags;
 
+	if (!dlm_grab(dlm))
+		return DLM_REJECTED;
+
 	dlm_proxy_ast_to_host(past);
 	lockname.name = past->name;
 	lockname.len = past->namelen;
@@ -157,7 +159,7 @@
 		  (flags & LKM_GET_LVB ? "get lvb" : "none"));
 
 	lockname.hash = full_name_hash(lockname.name, lockname.len);
-	
+
 	dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
 
 	if (past->type != DLM_AST && 
@@ -214,7 +216,7 @@
 		up_read(&dlm->recovery_sem);
 	ret = DLM_NORMAL;
 	goto leave;
-		
+
 do_ast:
 	ret = DLM_NORMAL;
 	if (past->type == DLM_AST) {
@@ -228,7 +230,7 @@
 		} else {
 			// should already be there....
 		}
-		
+
 		lock->lksb->status = DLM_NORMAL;
 
 		/* if we requested the lvb, fetch it into our lksb now */
@@ -255,6 +257,8 @@
 		up_read(&dlm->recovery_sem);
 
 leave:
+
+	dlm_put(dlm);
 	return ret;
 }
 
@@ -283,6 +287,7 @@
 	iov[0].iov_len = sizeof(dlm_proxy_ast);
 	iov[0].iov_base = &past;
 	if (lock->lksb->flags & DLM_LKSB_GET_LVB) {
+		dlmprintk("sending LKM_GET_LVB flag\n");
 		past.flags |= LKM_GET_LVB;
 		iov[1].iov_len = DLM_LVB_LEN;
 		iov[1].iov_base = lock->lksb->lvb;
@@ -302,5 +307,3 @@
 			  current->pid, ret);
 	return ret;
 }
-
-

Modified: trunk/fs/ocfs2/dlm/dlmconvert.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmconvert.c	2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmconvert.c	2005-02-18 00:13:17 UTC (rev 1891)
@@ -379,6 +379,9 @@
 	int call_ast = 0, kick_thread = 0;
 	int found = 0;
 
+	if (!dlm_grab(dlm))
+		return DLM_REJECTED;
+
 	dlm_convert_lock_to_host(cnv);
 	lockname.name = cnv->name;
 	lockname.len = cnv->namelen;
@@ -448,5 +451,7 @@
 	if (kick_thread)
 		dlm_kick_thread(dlm, res);
 
+	dlm_put(dlm);
+
 	return status;
 }

Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c	2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmlock.c	2005-02-18 00:13:17 UTC (rev 1891)
@@ -108,6 +108,7 @@
 	__dlm_wait_on_lockres(res);
 
 	if (dlm_can_grant_new_lock(res, lock)) {
+		dlmprintk("I can grant this lock right away\n");
 		/* got it right away */
 		lock->lksb->status = DLM_NORMAL;
 		status = DLM_NORMAL;
@@ -145,8 +146,10 @@
 			  dlm_lock *lock, int flags)
 {
 	dlm_status status = DLM_DENIED;
-	
+
 	dlmprintk("type=%d\n", lock->type);
+	dlmprintk("lockres %*s, flags = 0x%x\n", res->lockname.len,
+		  res->lockname.name, flags);
 
 	spin_lock(&res->spinlock);
 	if (res->state & DLM_LOCK_RES_RECOVERING) {
@@ -164,7 +167,7 @@
 	/* spec seems to say that you will get DLM_NORMAL when the lock 
 	 * has been queued, meaning we need to wait for a reply here. */
 	status = dlm_send_remote_lock_request(dlm, res, lock, flags);
-	
+
 	spin_lock(&res->spinlock);
 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
 	if (status != DLM_NORMAL) {
@@ -245,11 +248,14 @@
 
 	DLM_ASSERT(dlm);
 
+	dlmprintk0("\n");
+
+	if (!dlm_grab(dlm))
+		return DLM_REJECTED;
+
 	dlm_create_lock_to_host(create);
 	lockname.name = create->name;
 	lockname.len = create->namelen;
-	
-	dlmprintk0("\n");
 
 	lockname.hash = full_name_hash(lockname.name, lockname.len);
 
@@ -257,11 +263,11 @@
 	newlock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
 	if (!newlock)
 		goto leave;
-	
+
 	lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
 	if (!lksb)
 		goto leave;
-		
+
 	memset(newlock, 0, sizeof(dlm_lock));
 	INIT_LIST_HEAD(&newlock->list);
 	INIT_LIST_HEAD(&newlock->ast_list);
@@ -280,8 +286,10 @@
 	lksb->lockid = newlock;
 	lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
 
-	if (create->flags | LKM_GET_LVB)
+	if (create->flags & LKM_GET_LVB) {
 		lksb->flags |= DLM_LKSB_GET_LVB;
+		dlmprintk("set DLM_LKSB_GET_LVB flag\n");
+	}
 
 	status = DLM_IVLOCKID;
 	res = dlm_lookup_lock(dlm, &lockname);
@@ -300,5 +308,7 @@
 			kfree(lksb);
 	}
 
+	dlm_put(dlm);
+
 	return status;
 }

Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c	2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c	2005-02-18 00:13:17 UTC (rev 1891)
@@ -207,34 +207,41 @@
 	int blocked = 0;
 	int map_changed = 0, restart = 0, assert = 0;
 	int ret, start, bit;
-	
+
 	bucket = &(dlm->resources[lockname->hash & DLM_HASH_MASK]);
 
-	/* pre-allocate a dlm_lock_resource and master stuff */
-	mle = kmalloc(sizeof(dlm_master_list_entry), GFP_KERNEL);
-	res = kmalloc(sizeof(dlm_lock_resource), GFP_KERNEL);
-	if (!mle || !res) {
-		dlmprintk0("could not allocate memory for new lock resource\n");
+lookup:
+	spin_lock(&dlm->spinlock);
+	tmpres = __dlm_lookup_lock(dlm, lockname);
+	if (tmpres) {
+		spin_unlock(&dlm->spinlock);
+
 		if (mle)
 			kfree(mle);
 		if (res)
 			kfree(res);
-		res = NULL;
-		goto leave;
+
+		return tmpres;
 	}
 
-	/* check for pre-existing lock */
-	spin_lock(&dlm->spinlock);
-	tmpres = __dlm_lookup_lock(dlm, lockname);
-	if (tmpres) {
+	if (!res) {
 		spin_unlock(&dlm->spinlock);
-		/* TODO: return error, or return the lockres ?!? */
-		kfree(res);
-		kfree(mle);
-		res = tmpres;
-		goto leave;
+
+		/* nothing found and we need to allocate one. */
+		mle = kmalloc(sizeof(dlm_master_list_entry), GFP_KERNEL);
+		if (!mle)
+			return NULL;
+
+		res = kmalloc(sizeof(dlm_lock_resource), GFP_KERNEL);
+		if (!res) {
+			kfree(mle);
+			return NULL;
+		}
+
+		goto lookup;
 	}
 
+	/* Ok, no lockres found and we have one to insert... */
 	dlm_init_lockres(res, lockname);
 
 	if (flags & LKM_LOCAL) {
@@ -247,7 +254,7 @@
 		/* lockres still marked IN_PROGRESS */
 		goto wake_waiters;
 	}
-		
+
 	/* check master list to see if another node has started mastering it */
 	spin_lock(&dlm_master_lock);
 	list_for_each(iter, &dlm_master_list) {
@@ -256,7 +263,8 @@
 			continue;
 
 		if (tmpmle->type == DLM_MLE_MASTER) {
-			dlmprintk0("eek! master entry for nonexistent lock!\n");
+			dlmprintk0("eek! master entry for nonexistent "
+				   "lock!\n");
 			BUG();
 		}
 		dlm_get_mle(tmpmle);
@@ -275,15 +283,15 @@
 	}
 	spin_unlock(&dlm_master_lock);
 
-	/* at this point there is either a DLM_MLE_BLOCK or a DLM_MLE_MASTER 
-	 * on the master list, so it's safe to add the lockres to the hashtable.
-	 * anyone who finds the lock will still have to wait on the IN_PROGRESS.
-	 * also, any new nodes that try to join at this point will have to wait
-	 * until my dlm_master_lock list is empty, so they cannot possibly 
-	 * do any master requests yet... TODO
-	 * ?? should i have a special type of mle just for joining nodes ?? 
-	 * ?? could allow them to come in and put their mle 
-	 *    on the list and sleep ?? */
+	/* at this point there is either a DLM_MLE_BLOCK or a
+	 * DLM_MLE_MASTER on the master list, so it's safe to add the
+	 * lockres to the hashtable.  anyone who finds the lock will
+	 * still have to wait on the IN_PROGRESS.  also, any new nodes
+	 * that try to join at this point will have to wait until my
+	 * dlm_master_lock list is empty, so they cannot possibly do
+	 * any master requests yet... TODO ?? should i have a special
+	 * type of mle just for joining nodes ??  ?? could allow them
+	 * to come in and put their mle on the list and sleep ?? */
 
 	/* finally add the lockres to its hash bucket */
 	list_add_tail(&res->list, bucket);
@@ -472,13 +480,12 @@
 	spin_unlock(&res->spinlock);
 	wake_up(&res->wq);
 
-leave:
 	return res;
 }
-	
 
 
 
+
 /*
  * locks that can be taken here:
  * dlm->spinlock
@@ -499,6 +506,9 @@
 	int found;
 	struct list_head *iter;
 
+	if (!dlm_grab(dlm))
+		return DLM_MASTER_RESP_NO;
+
 	dlm_master_request_to_host(request);
 	lockname.name = request->name;
 	lockname.len = request->namelen;
@@ -572,7 +582,7 @@
 		dlmprintk0("bug bug bug!!!  no mle found for this lock!\n");
 		BUG();
 	}
-	
+
 	/* 
 	 * lockres doesn't exist on this node 
 	 * if there is an MLE_BLOCK, return NO 
@@ -596,7 +606,7 @@
 		if (!mle) {
 			spin_unlock(&dlm_master_lock);
 			spin_unlock(&dlm->spinlock);
-	
+
 			mle = kmalloc(sizeof(dlm_master_list_entry) + 
 				      lockname.len, GFP_KERNEL);
 			if (!mle) {
@@ -634,6 +644,7 @@
 	spin_unlock(&dlm->spinlock);
 
 send_response:
+	dlm_put(dlm);
 	//ret = dlm_do_master_request_resp(dlm, &lockname, response, 
 	//				   request->node_idx);
 	//dlmprintk("response returned %d\n", ret);
@@ -660,6 +671,9 @@
 	struct list_head *iter;
 	struct qstr lockname;
 
+	if (!dlm_grab(dlm))
+		return 0;
+
 	dlm_master_request_resp_to_host(resp);
 	lockname.name = resp->name;
 	lockname.len = resp->namelen;
@@ -698,8 +712,8 @@
 					wake = 1;
 				break;
 			case DLM_MASTER_RESP_MAYBE:
-				// dlmprintk("node %u is not the master, but IS"
-				// " in-progress\n", resp->node_idx);
+				//dlmprintk("node %u is not the master, but IS"
+				//" in-progress\n", resp->node_idx);
 				set_bit(resp->node_idx, mle->response_map);
 				set_bit(resp->node_idx, mle->maybe_map);
 				if (memcmp(mle->response_map, mle->vote_map, 
@@ -731,6 +745,8 @@
 	else
 		dlmprintk0("hrrm... got a master resp but found no matching "
 			   "request\n");
+
+	dlm_put(dlm);
 	return 0;
 }
 
@@ -753,6 +769,9 @@
 	struct list_head *iter;
 	struct qstr lockname;
 
+	if (!dlm_grab(dlm))
+		return 0;
+
 	dlm_assert_master_to_host(assert);	
 	lockname.name = assert->name;
 	lockname.len = assert->namelen;
@@ -835,6 +854,8 @@
 		/* if this is the last put, it will be removed from the list */
 		dlm_put_mle(mle);
 	}
+
+	dlm_put(dlm);
 	return 0;
 }
 

Modified: trunk/fs/ocfs2/dlm/dlmmod.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.c	2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmmod.c	2005-02-18 00:13:17 UTC (rev 1891)
@@ -71,6 +71,7 @@
 
 LIST_HEAD(dlm_domains);
 spinlock_t dlm_domain_lock = SPIN_LOCK_UNLOCKED;
+DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
 u8 dlm_global_index = NM_MAX_NODES;
 static spinlock_t dlm_cookie_lock = SPIN_LOCK_UNLOCKED;
 static u64 dlm_next_cookie = 1;
@@ -220,7 +221,7 @@
 		status = DLM_BADARGS;
 		if (!name)
 			goto error;
-		
+
 		status = DLM_IVBUFLEN;
 		q.len = strlen(name);
 		if (q.len > DLM_LOCKID_NAME_MAX)
@@ -253,7 +254,7 @@
 			goto up_error;
 		}
 
-		dlmprintk("type=%d\n", mode);
+		dlmprintk("type=%d, flags = 0x%x\n", mode, flags);
 		dlmprintk("creating lock: lock=%p res=%p\n", lock, res);
 
 		memset(lock, 0, sizeof(dlm_lock));
@@ -273,6 +274,8 @@
 		dlm_get_next_cookie(lock->node, &lock->cookie);
 
 		if (flags & LKM_VALBLK) {
+			dlmprintk("LKM_VALBLK passed by caller\n");
+
 			/* LVB requests for non PR, PW or EX locks are
 			 * ignored. */
 			if (mode < LKM_PRMODE)
@@ -379,9 +382,39 @@
 }
 EXPORT_SYMBOL(dlmunlock);
 
+dlm_lock_resource * __dlm_lookup_lock(dlm_ctxt *dlm, struct qstr *lockname)
+{
+	struct list_head *iter;
+	dlm_lock_resource *tmpres=NULL;
+	struct list_head *bucket;
 
-static dlm_ctxt * __dlm_lookup_domain(char *domain)
+	dlmprintk0("\n");
+
+	bucket = &(dlm->resources[lockname->hash & DLM_HASH_MASK]);
+
+	/* check for pre-existing lock */
+	list_for_each(iter, bucket) {
+		tmpres = list_entry(iter, dlm_lock_resource, list);
+		if (tmpres->lockname.len == lockname->len &&
+		    strncmp(tmpres->lockname.name, lockname->name, 
+			    lockname->len) == 0)
+			break;
+		tmpres = NULL;
+	}
+	return tmpres;
+}
+
+dlm_lock_resource * dlm_lookup_lock(dlm_ctxt *dlm, struct qstr *lockname)
 {
+	dlm_lock_resource *res;
+	spin_lock(&dlm->spinlock);
+	res = __dlm_lookup_lock(dlm, lockname);
+	spin_unlock(&dlm->spinlock);
+	return res;
+}
+
+static dlm_ctxt * __dlm_lookup_domain(const char *domain)
+{
 	dlm_ctxt *tmp = NULL;
 	struct list_head *iter;
 
@@ -395,111 +428,211 @@
 	return tmp;
 }
 
-dlm_ctxt * dlm_lookup_domain(char *domain)
+/* returns true on one of two conditions:
+ * 1) the domain does not exist
+ * 2) the domain exists and it's state is "joined" */
+static int dlm_wait_on_domain_helper(const char *domain)
 {
+	int ret = 0;
 	dlm_ctxt *tmp = NULL;
+
 	spin_lock(&dlm_domain_lock);
+
 	tmp = __dlm_lookup_domain(domain);
+	if (!tmp)
+		ret = 1;
+	else if (tmp->dlm_state == DLM_CTXT_JOINED)
+		ret = 1;
+
 	spin_unlock(&dlm_domain_lock);
-	return tmp;
+	return ret;
 }
 
-dlm_lock_resource * __dlm_lookup_lock(dlm_ctxt *dlm, struct qstr *lockname)
+static void dlm_free_ctxt_mem(dlm_ctxt *dlm)
 {
+	BUG_ON(!dlm);
+
+	if (dlm->resources)
+		free_page((unsigned long) dlm->resources);
+
+	if (dlm->name)
+		kfree(dlm->name);
+
+	if (dlm->group)
+		iput(dlm->group);
+
+	kfree(dlm);
+}
+
+/* A little strange - this function will be called while holding
+ * dlm_domain_lock and is expected to be holding it on the way out. We
+ * will however drop and reacquire it multiple times */
+static void dlm_ctxt_release(struct kref *kref)
+{
+	dlm_ctxt *dlm;
+
+	BUG_ON(!kref);
+
+	dlm = container_of(kref, dlm_ctxt, dlm_refs);
+
+	BUG_ON(dlm->num_joins);
+	BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
+
+	/* we may still be in the list if we hit an error during join. */
+	list_del_init(&dlm->list);
+
+	spin_unlock(&dlm_domain_lock);
+
+	dlmprintk("freeing memory from domain %s\n", dlm->name);
+
+	wake_up(&dlm_domain_events);
+
+	dlm_free_ctxt_mem(dlm);
+
+	spin_lock(&dlm_domain_lock);
+}
+
+void dlm_put(dlm_ctxt *dlm)
+{
+	BUG_ON(!dlm);
+
+	spin_lock(&dlm_domain_lock);
+	kref_put(&dlm->dlm_refs);
+	spin_unlock(&dlm_domain_lock);
+}
+
+static void __dlm_get(dlm_ctxt *dlm)
+{
+	kref_get(&dlm->dlm_refs);
+}
+
+/* given a questionable reference to a dlm object, gets a reference if
+ * it can find it in the list, otherwise returns NULL in which case
+ * you shouldn't trust your pointer. */
+dlm_ctxt *dlm_grab(dlm_ctxt *dlm)
+{
 	struct list_head *iter;
-	dlm_lock_resource *tmpres=NULL;
-	struct list_head *bucket;
-	
-	dlmprintk0("\n");
+	dlm_ctxt *target = NULL;
 
-	bucket = &(dlm->resources[lockname->hash & DLM_HASH_MASK]);
+	spin_lock(&dlm_domain_lock);
 
-	/* check for pre-existing lock */
-	list_for_each(iter, bucket) {
-		tmpres = list_entry(iter, dlm_lock_resource, list);
-		if (tmpres->lockname.len == lockname->len &&
-		    strncmp(tmpres->lockname.name, lockname->name, 
-			    lockname->len) == 0)
+	list_for_each(iter, &dlm_domains) {
+		target = list_entry (iter, dlm_ctxt, list);
+
+		if (target == dlm) {
+			__dlm_get(target);
 			break;
-		tmpres = NULL;
+		}
+
+		target = NULL;
 	}
-	return tmpres;
+
+	spin_unlock(&dlm_domain_lock);
+
+	return target;
 }
 
-dlm_lock_resource * dlm_lookup_lock(dlm_ctxt *dlm, struct qstr *lockname)
+void dlm_get(dlm_ctxt *dlm)
 {
-	dlm_lock_resource *res;
-	spin_lock(&dlm->spinlock);
-	res = __dlm_lookup_lock(dlm, lockname);
-	spin_unlock(&dlm->spinlock);
-	return res;
+	BUG_ON(!dlm);
+
+	spin_lock(&dlm_domain_lock);
+	__dlm_get(dlm);
+	spin_unlock(&dlm_domain_lock);
 }
 
+static void dlm_leave_domain(dlm_ctxt *dlm)
+{
+	spin_lock(&dlm_domain_lock);
 
+	BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
+	BUG_ON(!dlm->num_joins);
 
-/*
- * dlm_register_domain: one-time setup per "domain"
- */
-dlm_ctxt * dlm_register_domain(char *domain, char *group_name, u32 key)
+	dlm->num_joins--;
+	if (dlm->num_joins) {
+		spin_unlock(&dlm_domain_lock);
+		return;
+	}
+
+	dlmprintk("shutting down domain %s\n", dlm->name);
+
+	dlm->dlm_state = DLM_CTXT_LEAVING;
+	spin_unlock(&dlm_domain_lock);
+
+	/* TODO: Any network communication involving shutting this guy
+	 * down happens here. */
+
+	hb_unregister_callback(HB_NODE_UP_CB, dlm_hb_node_up_cb, dlm);
+	hb_unregister_callback(HB_NODE_DOWN_CB, dlm_hb_node_down_cb, dlm);
+
+	/* if the network code had any unregister calls, they would be here. */
+
+	if (dlm->thread.task)
+		dlm_complete_thread(dlm);
+
+	/* We've left the domain. Now we can take ourselves out of the
+	 * list and allow the kref stuff to help us free the
+	 * memory. */
+	spin_lock(&dlm_domain_lock);
+	list_del_init(&dlm->list);
+	spin_unlock(&dlm_domain_lock);
+
+	/* Wake up anyone waiting for us to remove this domain */
+	wake_up(&dlm_domain_events);
+}
+
+void dlm_unregister_domain(dlm_ctxt *dlm)
 {
-	dlm_ctxt *tmp = NULL, *dlm = NULL;
-	struct inode *group = NULL;
-	int tmpret, i;
+	BUG_ON(!dlm);
 
-	if (strlen(domain) > NM_MAX_NAME_LEN) {
-		dlmprintk0("domain name length too long\n");
-		goto leave;
-	}
+	dlm_leave_domain(dlm);
+	dlm_put(dlm);
+}
+EXPORT_SYMBOL(dlm_unregister_domain);
 
-	group = nm_get_group_by_name(group_name);
-	if (!group) {
-		dlmprintk("no nm group %s for domain %s!\n", 
-			  group_name, domain);
-		goto leave;
-	}
+static dlm_ctxt *dlm_alloc_ctxt(const char *domain,
+				struct inode *group,
+				u32 key)
+{
+	int i;
+	dlm_ctxt *dlm = NULL;
 
-	/* 
-	 * TODO: should i do some type of dlm-group-join business here?
-	 * I need to have new nodes communicate with other dlm nodes to 
-	 * wait until their master lists are empty before allowing me to
-	 * join.  does this belong here?  or in hb?
-	 * seems like stuff that heartbeat shouldn't care about, cuz we
-	 * would actually be preventing a node that is "UP" from being 
-	 * part of the dlm group.
-	 */ 
-	dlm = dlm_lookup_domain(domain);
-	if (dlm) {
-		/* found a pre-existing domain */
+	/* if for some reason we can't get a reference on the group
+	 * inode (required) then don't even try the rest. */
+	if (!igrab(group))
 		goto leave;
-	}
 
 	dlm = kmalloc(sizeof(dlm_ctxt), GFP_KERNEL);
-	if (dlm == NULL) {
+	if (!dlm) {
 		dlmprintk0("could not allocate dlm_ctxt\n");
 		goto leave;
 	}
 	memset(dlm, 0, sizeof(dlm_ctxt));
+
 	dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
 	if (dlm->name == NULL) {
+		dlmprintk0("could not allocate dlm domain name\n");
 		kfree(dlm);
 		dlm = NULL;
-		dlmprintk0("could not allocate dlm domain name\n");
 		goto leave;
 	}
+
 	dlm->resources = (struct list_head *) __get_free_page(GFP_KERNEL);
 	if (!dlm->resources) {
+		dlmprintk0("could not allocate dlm hash\n");
 		kfree(dlm->name);
 		kfree(dlm);
 		dlm = NULL;
-		dlmprintk0("could not allocate dlm hash\n");
 		goto leave;
 	}
 	memset(dlm->resources, 0, PAGE_SIZE);
-	
+
 	for (i=0; i<DLM_HASH_SIZE; i++)
 		INIT_LIST_HEAD(&dlm->resources[i]);
 
 	strcpy(dlm->name, domain);
+	dlm->key = key;
+
 	spin_lock_init(&dlm->spinlock);
 	INIT_LIST_HEAD(&dlm->list);
 	INIT_LIST_HEAD(&dlm->dirty_list);
@@ -508,167 +641,231 @@
 	util_thread_info_init(&dlm->thread);
 	util_thread_info_init(&dlm->reco.thread);
 	init_rwsem(&dlm->recovery_sem);
+
+	/* this eats the reference we got above. */
 	dlm->group = group;
 	dlm->group_index = nm_this_node(group);
-	dlm->key = key;
+
 	dlm->reco.new_master = NM_INVALID_SLOT_NUM;
 	dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
 	dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
 	dlm->reco.next_seq = 0;
 
-	spin_lock(&dlm_domain_lock);
-	tmp = __dlm_lookup_domain(domain);
-	if (tmp) {
-		spin_unlock(&dlm_domain_lock);
-		/* found a pre-existing domain */
-		kfree(dlm->name);
-		kfree(dlm);
-		dlm = NULL;
-		goto leave;
-	}
+	kref_init(&dlm->dlm_refs, dlm_ctxt_release);
+	dlm->dlm_state = DLM_CTXT_NEW;
 
-	/* add the new domain */
-	list_add_tail(&dlm->list, &dlm_domains);
-	spin_unlock(&dlm_domain_lock);
+	dlmprintk("context init: refcount %u\n",
+		  atomic_read(&dlm->dlm_refs.refcount));
 
-	tmpret = hb_register_callback(HB_NODE_DOWN_CB, dlm_hb_node_down_cb, dlm,
+leave:
+	return dlm;
+}
+
+static int dlm_join_domain(dlm_ctxt *dlm)
+{
+	int status;
+
+	BUG_ON(!dlm);
+
+	dlmprintk("Join domain %s\n", dlm->name);
+
+	status = hb_register_callback(HB_NODE_DOWN_CB,
+				      dlm_hb_node_down_cb,
+				      dlm,
 				      DLM_HB_NODE_DOWN_PRI);
-	if (tmpret)
-		goto error;
-	tmpret = hb_register_callback(HB_NODE_UP_CB, dlm_hb_node_up_cb, dlm, 
+	if (status)
+		goto bail;
+
+	status = hb_register_callback(HB_NODE_UP_CB,
+				      dlm_hb_node_up_cb,
+				      dlm, 
 				      DLM_HB_NODE_UP_PRI);
-	if (tmpret)
-		goto error;
+	if (status)
+		goto bail;
 
-	/* TODO: need to use hb_fill_node_map to fill a temporary votemap
-	 * then communicate with each of these nodes that I want to come up
-	 * FOR THIS DLM.  there may be many nodes in this group heartbeating
-	 * but they may not care about this particular dlm instance.  once
-	 * everyone has come back with a response that i have been added or 
-	 * that they are not a member I can put together the REAL node map
-	 * for this dlm in dlm->node_map */
-	/* TODO: I guess we can fill this here as a superset of possible nodes
-	 * so that the hb_callbacks above have something to work on in the 
-	 * meantime, then trim out the nodes that are not part of this dlm 
-	 * once we know */
-	/* TODO: I may need to register a special net handler on insmod of dlm.o
-	 * with a key of 0 so that I can respond to requests even if I am not
-	 * part of a dlm group.  this would still leave a gap in time between 
-	 * the start of heartbeating and the insmod dlm.o, unless I change the 
-	 * module loading stuff in clusterbo to include dlm.o (which would work
+	/* TODO: need to use hb_fill_node_map to fill a temporary
+	 * votemap then communicate with each of these nodes that I
+	 * want to come up FOR THIS DLM.  there may be many nodes in
+	 * this group heartbeating but they may not care about this
+	 * particular dlm instance.  once everyone has come back with
+	 * a response that i have been added or that they are not a
+	 * member I can put together the REAL node map for this dlm in
+	 * dlm->node_map */
+	/* TODO: I guess we can fill this here as a superset of
+	 * possible nodes so that the hb_callbacks above have
+	 * something to work on in the meantime, then trim out the
+	 * nodes that are not part of this dlm once we know */
+	/* TODO: I may need to register a special net handler on
+	 * insmod of dlm.o with a key of 0 so that I can respond to
+	 * requests even if I am not part of a dlm group.  this would
+	 * still leave a gap in time between the start of heartbeating
+	 * and the insmod dlm.o, unless I change the module loading
+	 * stuff in clusterbo to include dlm.o (which would work
 	 * fine) */
 #warning WRONG WRONG WRONG
-	tmpret = hb_fill_node_map(group, dlm->node_map, NM_MAX_NODES);
-	if (tmpret)
-		goto error;
+	status = hb_fill_node_map(dlm->group, dlm->node_map,
+				  sizeof(dlm->node_map));
+	if (status)
+		goto bail;
 
-	dlmprintk("hb_fill_node_map returned node map:\n");
-	BUG_ON(ARRAY_SIZE(dlm->node_map) & 3); /* better be mult of 4 :) */
-	for(i = 0; i < ARRAY_SIZE(dlm->node_map); i += 4)
-		dlmprintk("%0lx%0lx%0lx%0lx\n",
-			  dlm->node_map[i], dlm->node_map[i + 1],
-			  dlm->node_map[i + 2], dlm->node_map[i + 3]);
-
-#if 0
-	tmpret = net_register_handler("reco-request", 
-		      DLM_NET_RECOVERY_REQUEST_MSG_TYPE, 
-		      key, sizeof(dlm_reco_request),
-		      dlm_recovery_request_handler, dlm);
-	if (tmpret)
-		goto error;
-	tmpret = net_register_handler("reco-lock-arr-req", 
-		      DLM_NET_RECOVERY_LOCK_ARR_REQ_MSG_TYPE, 
-		      key, sizeof(dlm_reco_lock_arr_req),
-		      dlm_recovery_lock_arr_req_handler, dlm);
-	if (tmpret)
-		goto error;
-	tmpret = net_register_handler("reco-response", 
-		      DLM_NET_RECOVERY_RESPONSE_MSG_TYPE, 
-		      key, sizeof(dlm_reco_response),
-		      dlm_recovery_response_handler, dlm);
-	if (tmpret)
-		goto error;
-#endif
-
-	tmpret = net_register_handler(DLM_MASTER_REQUEST_RESP_MSG, key, 0, 
+	status = net_register_handler(DLM_MASTER_REQUEST_RESP_MSG, dlm->key, 0,
 				      sizeof(dlm_master_request_resp), 
 				      dlm_master_request_resp_handler,
 				      dlm);
-	if (tmpret)
-		goto error;
+	if (status)
+		goto bail;
 
-	tmpret = net_register_handler(DLM_MASTER_REQUEST_MSG, key, 0, 
+	status = net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, 0, 
 				      sizeof(dlm_master_request), 
 				      dlm_master_request_handler,
 				      dlm);
+	if (status)
+		goto bail;
 
-	if (tmpret)
-		goto error;
-
-	tmpret = net_register_handler(DLM_ASSERT_MASTER_MSG, key, 0, 
+	status = net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, 0, 
 				      sizeof(dlm_assert_master), 
 				      dlm_assert_master_handler,
 				      dlm);
-	if (tmpret)
-		goto error;
-	tmpret = net_register_handler(DLM_CREATE_LOCK_MSG, key, 0, 
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key, 0, 
 				      sizeof(dlm_create_lock), 
 				      dlm_create_lock_handler,
 				      dlm);
-	if (tmpret)
-		goto error;
-	tmpret = net_register_handler(DLM_CONVERT_LOCK_MSG, key, 
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key, 
 				      NET_HND_VAR_LEN, 
 				      DLM_CONVERT_LOCK_MAX_LEN,
 				      dlm_convert_lock_handler,
 				      dlm);
-	if (tmpret)
-		goto error;
+	if (status)
+		goto bail;
 
-	tmpret = net_register_handler(DLM_UNLOCK_LOCK_MSG, key, 
+	status = net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key, 
 				      NET_HND_VAR_LEN,
 				      DLM_UNLOCK_LOCK_MAX_LEN,
 				      dlm_unlock_lock_handler,
 				      dlm);
-	if (tmpret)
-		goto error;
-				
-	tmpret = net_register_handler(DLM_PROXY_AST_MSG, key, 
+	if (status)
+		goto bail;
+
+	status = net_register_handler(DLM_PROXY_AST_MSG, dlm->key, 
 				      NET_HND_VAR_LEN,
 				      DLM_PROXY_AST_MAX_LEN,
 				      dlm_proxy_ast_handler,
 				      dlm);
-	if (tmpret)
-		goto error;
+	if (status)
+		goto bail;
 
-	tmpret = dlm_launch_thread(dlm);
-	if (tmpret == 0)
-		goto leave;
+	status = dlm_launch_thread(dlm);
+	if (status < 0) {
+		dlmprintk("could not launch dlm thread!\n");
+		goto bail;
+	}
 
-error:	
-	hb_unregister_callback(HB_NODE_UP_CB, dlm_hb_node_up_cb, dlm);
-	hb_unregister_callback(HB_NODE_DOWN_CB, dlm_hb_node_down_cb, dlm);
 	spin_lock(&dlm_domain_lock);
-	list_del(&dlm->list);
+	dlm->num_joins++;
+	dlm->dlm_state = DLM_CTXT_JOINED;
 	spin_unlock(&dlm_domain_lock);
-	free_page((unsigned long)dlm->resources);
-	kfree(dlm->name);
-	kfree(dlm);
+
+	status = 0;
+bail:
+	wake_up(&dlm_domain_events);
+
+	return status;
+}
+
+/*
+ * dlm_register_domain: one-time setup per "domain"
+ */
+dlm_ctxt * dlm_register_domain(const char *domain,
+			       const char *group_name,
+			       u32 key)
+{
+	int ret;
+	dlm_ctxt *dlm = NULL;
+	dlm_ctxt *new_ctxt = NULL;
+	struct inode *group = NULL;
+
+	if (strlen(domain) > NM_MAX_NAME_LEN) {
+		dlmprintk0("domain name length too long\n");
+		goto leave;
+	}
+
+	group = nm_get_group_by_name(group_name);
+	if (!group) {
+		dlmprintk("no nm group %s for domain %s!\n", 
+			  group_name, domain);
+		goto leave;
+	}
+
+	dlmprintk("register called for domain \"%s\"\n", domain);
+
+retry:
 	dlm = NULL;
+	if (signal_pending(current))
+		goto leave;
 
+	spin_lock(&dlm_domain_lock);
+
+	dlm = __dlm_lookup_domain(domain);
+	if (dlm) {
+		if (dlm->dlm_state != DLM_CTXT_JOINED) {
+			spin_unlock(&dlm_domain_lock);
+
+			dlmprintk("This ctxt is not joined yet!\n");
+			wait_event_interruptible(dlm_domain_events,
+						 dlm_wait_on_domain_helper(
+							 domain));
+			goto retry;
+		}
+
+		__dlm_get(dlm);
+		dlm->num_joins++;
+
+		spin_unlock(&dlm_domain_lock);
+		goto leave;
+	}
+
+	/* doesn't exist */
+	if (!new_ctxt) {
+		spin_unlock(&dlm_domain_lock);
+
+		new_ctxt = dlm_alloc_ctxt(domain, group, key);
+		if (new_ctxt)
+			goto retry;
+		goto leave;
+	}
+
+	/* a little variable switch-a-roo here... */
+	dlm = new_ctxt;
+	new_ctxt = NULL;
+
+	/* add the new domain */
+	list_add_tail(&dlm->list, &dlm_domains);
+	spin_unlock(&dlm_domain_lock);
+
+	ret = dlm_join_domain(dlm);
+	if (ret) {
+		dlmprintk("return code %d from join_domain!\n", ret);
+		dlm_put(dlm);
+		dlm = NULL;
+	}
+
 leave:
-	if (!dlm && group)
-	       	iput(group);
+	if (new_ctxt)
+		dlm_free_ctxt_mem(new_ctxt);
+
+	if (group)
+		iput(group);
+
 	return dlm;
 }
 EXPORT_SYMBOL(dlm_register_domain);
 
-void dlm_unregister_domain(dlm_ctxt *dlm)
-{
-	// fill me in please
-}
-EXPORT_SYMBOL(dlm_unregister_domain);
-
 void dlm_init_lockres(dlm_lock_resource *res, struct qstr *lockname)
 {
 	memset(res, 0, sizeof(dlm_lock_resource));

Modified: trunk/fs/ocfs2/dlm/dlmmod.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.h	2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmmod.h	2005-02-18 00:13:17 UTC (rev 1891)
@@ -28,8 +28,8 @@
 #ifndef CLUSTER_DLMMOD_H
 #define CLUSTER_DLMMOD_H
 
+#include <linux/kref.h>
 
-
 #if 0
 #define dlmprintk(x, arg...)
 #define dlmprintk0(x)
@@ -185,6 +185,11 @@
 	unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
 } dlm_recovery_ctxt;
 
+typedef enum _dlm_ctxt_state {
+	DLM_CTXT_NEW = 0,
+	DLM_CTXT_LEAVING,
+	DLM_CTXT_JOINED
+} dlm_ctxt_state;
 
 struct _dlm_ctxt
 {
@@ -201,6 +206,10 @@
 	unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
 	unsigned long recovery_map[BITS_TO_LONGS(NM_MAX_NODES)];
 	dlm_recovery_ctxt reco;
+	/* dlm_refs and dlm_state are protected by dlm_domain_lock */
+	struct kref dlm_refs;
+	dlm_ctxt_state dlm_state;
+	unsigned int num_joins;
 };
 
 #define DLM_LOCK_RES_UNINITED             0x00000001
@@ -557,9 +566,14 @@
 	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 0);
 }
 
+dlm_ctxt * dlm_register_domain(const char *domain,
+			       const char *group_name,
+			       u32 key);
+void dlm_unregister_domain(dlm_ctxt *dlm);
+void dlm_get(dlm_ctxt *dlm);
+void dlm_put(dlm_ctxt *dlm);
+dlm_ctxt *dlm_grab(dlm_ctxt *dlm);
 
-dlm_ctxt * dlm_register_domain(char *domain, char *group_name, u32 key);
-void dlm_unregister_domain(dlm_ctxt *dlm);
 dlm_lock_resource * dlm_get_lock_resource(dlm_ctxt *dlm, struct qstr *lockname, int flags);
 int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
 int dlm_refresh_lock_resource(dlm_ctxt *dlm, dlm_lock_resource *res);
@@ -572,7 +586,6 @@
 int dlm_heartbeat_init(dlm_ctxt *dlm);
 
 dlm_lock_resource * dlm_lookup_lock(dlm_ctxt *dlm, struct qstr *lockname);
-dlm_ctxt * dlm_lookup_domain(char *domain);
 
 void dlm_hb_node_down_cb(struct inode *group, struct inode *node, int idx, void *data);
 void dlm_hb_node_up_cb(struct inode *group, struct inode *node, int idx, void *data);

Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-02-18 00:13:17 UTC (rev 1891)
@@ -833,11 +833,11 @@
 
 void dlm_hb_node_down_cb(struct inode *group, struct inode *node, int idx, void *data)
 {
-	//int ret;
-	//struct inode *group = ptr1;
-	//struct inode *node = ptr2;
 	dlm_ctxt *dlm = data;
-	
+
+	if (!dlm_grab(dlm))
+		return;
+
 	spin_lock(&dlm->spinlock);
 
 	if (!test_bit(idx, dlm->node_map))
@@ -846,7 +846,7 @@
 		dlmprintk("node %u being removed from nodemap!\n", idx);
 		clear_bit(idx, dlm->node_map);
 	}
-	
+
 	if (test_bit(idx, dlm->recovery_map))
 		dlmprintk("node %u already added to recovery map!\n", idx);
 	else {
@@ -854,14 +854,17 @@
 		dlm_do_local_recovery_cleanup(dlm, idx, 1);
 	}
 	spin_unlock(&dlm->spinlock);
+
+	dlm_put(dlm);
 }
 
 void dlm_hb_node_up_cb(struct inode *group, struct inode *node, int idx, void *data)
 {
-	//struct inode *group = ptr1;
-	//struct inode *node = ptr2;
 	dlm_ctxt *dlm = data;
 
+	if (!dlm_grab(dlm))
+		return;
+
 	spin_lock(&dlm->spinlock);
 
 	if (test_bit(idx, dlm->recovery_map)) {
@@ -876,6 +879,8 @@
 	}
 
 	spin_unlock(&dlm->spinlock);
+
+	dlm_put(dlm);
 }
 
 int __dlm_hb_node_dead(dlm_ctxt *dlm, int node)



More information about the Ocfs2-commits mailing list