[Ocfs2-commits] khackel commits r2055 - trunk/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Fri Mar 25 15:36:28 CST 2005


Author: khackel
Signed-off-by: mfasheh
Date: 2005-03-25 15:36:27 -0600 (Fri, 25 Mar 2005)
New Revision: 2055

Modified:
   trunk/fs/ocfs2/dlm/dlmcommon.h
   trunk/fs/ocfs2/dlm/dlmlock.c
   trunk/fs/ocfs2/dlm/dlmmaster.c
   trunk/fs/ocfs2/dlm/dlmrecovery.c
   trunk/fs/ocfs2/dlm/dlmthread.c
Log:
* use dlm_new_lock/dlm_init_lock to standardize how callers can
  create and init dlm_lock structures
* add dlm_dispatch_assert_master to allow assert_master calls to
  be dispatched to keventd for a few places that cannot do network
  messages of their own
* small cleanup in path that frees locks owned by dead nodes
* prepares for dlm_lock refcounting in next patch

Signed-off-by: mfasheh



Modified: trunk/fs/ocfs2/dlm/dlmcommon.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmcommon.h	2005-03-25 20:09:38 UTC (rev 2054)
+++ trunk/fs/ocfs2/dlm/dlmcommon.h	2005-03-25 21:36:27 UTC (rev 2055)
@@ -165,6 +165,14 @@
 	u8 real_master;
 } dlm_mig_lockres_priv;
 
+typedef struct _dlm_assert_master_priv
+{
+	dlm_lock_resource *lockres;
+	u8 request_from;
+	unsigned ignore_higher:1;
+} dlm_assert_master_priv;
+
+
 struct _dlm_work_item 
 {
 	struct list_head list;
@@ -174,6 +182,7 @@
 	union {
 		dlm_request_all_locks_priv ral;
 		dlm_mig_lockres_priv ml;
+		dlm_assert_master_priv am;
 	} u;
 };
 
@@ -764,6 +773,8 @@
 }
 
 
+dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, dlm_lockstatus *lksb);
+
 int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
 int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
 int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
@@ -773,6 +784,7 @@
 void dlm_shuffle_lists(dlm_ctxt *dlm, dlm_lock_resource *res);
 int dlm_launch_thread(dlm_ctxt *dlm);
 void dlm_complete_thread(dlm_ctxt *dlm);
+void dlm_flush_asts(dlm_ctxt *dlm);
 int dlm_launch_recovery_thread(dlm_ctxt *dlm);
 void dlm_complete_recovery_thread(dlm_ctxt *dlm);
 
@@ -858,7 +870,11 @@
 int dlm_begin_reco_handler(net_msg *msg, u32 len, void *data);
 int dlm_finalize_reco_handler(net_msg *msg, u32 len, void *data);
 
+int dlm_dispatch_assert_master(dlm_ctxt *dlm, dlm_lock_resource *res,
+			       int ignore_higher, u8 request_from);
+void dlm_assert_master_worker(dlm_work_item *item, void *data);
 
+
 int dlm_send_one_lockres(dlm_ctxt *dlm, dlm_lock_resource *res, 
 			 dlm_migratable_lockres *mres, 
 			 u8 send_to, u8 flags);
@@ -878,7 +894,6 @@
 					  DLM_LOCK_RES_MIGRATING));
 }
 
-void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie);
 
 void dlm_mle_node_down(dlm_ctxt *dlm, dlm_master_list_entry *mle,
 		       struct nm_node *node, int idx);

Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c	2005-03-25 20:09:38 UTC (rev 2054)
+++ trunk/fs/ocfs2/dlm/dlmlock.c	2005-03-25 21:36:27 UTC (rev 2055)
@@ -55,6 +55,7 @@
 static dlm_status dlm_send_remote_lock_request(dlm_ctxt *dlm, 
 					       dlm_lock_resource *res, 
 					       dlm_lock *lock, int flags);
+static void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie);
 
 /* Tell us whether we can grant a new lock request.
  * locking:
@@ -167,6 +168,7 @@
 	/* will exit this call with spinlock held */
 	__dlm_wait_on_lockres(res);
 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
+	
 	/* add lock to local (secondary) queue */
 	list_add_tail(&lock->list, &res->blocked);
 	spin_unlock(&res->spinlock);
@@ -231,9 +233,9 @@
 	return ret;
 }
 
-void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie)
+
+static void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie)
 {
-	memset(newlock, 0, sizeof(dlm_lock));
 	INIT_LIST_HEAD(&newlock->list);
 	INIT_LIST_HEAD(&newlock->ast_list);
 	INIT_LIST_HEAD(&newlock->bast_list);
@@ -250,6 +252,33 @@
 	newlock->bast_pending = 0;
 }
 
+dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, dlm_lockstatus *lksb)
+{
+	dlm_lock *lock;
+	
+	lock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
+	if (!lock)
+		return NULL;
+	
+	memset(lock, 0, sizeof(dlm_lock));
+
+	if (!lksb) {
+		lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
+		if (!lksb) {
+			kfree(lock);
+			return NULL;
+		}
+		/* memset only if kernel-allocated */
+		memset(lksb, 0, sizeof(dlm_lockstatus));
+		lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
+	}
+
+	dlm_init_lock(lock, type, node, cookie);
+	lock->lksb = lksb;
+	lksb->lockid = lock;
+	return lock;
+}
+
 /* handler for lock creation net message
  * locking:
  *   caller needs:  none
@@ -286,23 +315,14 @@
 		goto leave;
 
 	status = DLM_SYSERR;
-	newlock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
+	newlock = dlm_new_lock(create->requested_type, 
+			       create->node_idx, 
+			       create->cookie, NULL);
 	if (!newlock)
 		goto leave;
 
-	lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
-	if (!lksb)
-		goto leave;
+	lksb = newlock->lksb;
 
-	memset(lksb, 0, sizeof(dlm_lockstatus));
-
-	dlm_init_lock(newlock, create->requested_type, 
-		      create->node_idx, create->cookie);
-
-	newlock->lksb = lksb;
-	lksb->lockid = newlock;
-	lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
-
 	if (create->flags & LKM_GET_LVB) {
 		lksb->flags |= DLM_LKSB_GET_LVB;
 		dlmprintk("set DLM_LKSB_GET_LVB flag\n");
@@ -327,7 +347,7 @@
 		goto leave;
 	}
 	spin_unlock(&res->spinlock);
-	
+
 	newlock->lockres = res;
 	status = dlmlock_master(dlm, res, newlock, create->flags);
 leave:
@@ -437,6 +457,8 @@
 			goto retry_convert;
 		}
 	} else {
+		u64 tmpcookie;
+
 		/* LOCK request */
 		status = DLM_BADARGS;
 		if (!name)
@@ -446,12 +468,11 @@
 		if (strlen(name) > DLM_LOCKID_NAME_MAX)
 			goto error;
 
-		lock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);  /* dlm_lock */
+		dlm_get_next_cookie(lock->ml.node, &tmpcookie);
+		lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb);
 		if (!lock)
 			goto error;
 
-		lksb->lockid = lock;
-
 		if (!recovery)
 			down_read(&dlm->recovery_sem);
 
@@ -465,26 +486,11 @@
 		dlmprintk("type=%d, flags = 0x%x\n", mode, flags);
 		dlmprintk("creating lock: lock=%p res=%p\n", lock, res);
 
-#warning move this into dlm_init_lock
-		memset(lock, 0, sizeof(dlm_lock));
-		INIT_LIST_HEAD(&lock->list);
-		INIT_LIST_HEAD(&lock->ast_list);
-		INIT_LIST_HEAD(&lock->bast_list);
-		spin_lock_init(&lock->spinlock);
 		lock->lockres = res;
-		lock->ml.type = mode;
-		lock->ml.convert_type = LKM_IVMODE;
-		lock->ml.highest_blocked = LKM_IVMODE;
-		lock->ml.node = dlm->node_num;
 		lock->ast = ast;
 		lock->bast = bast;
 		lock->astdata = data;
-		lock->lksb = lksb;
-		lock->ast_pending = 0;
-		lock->bast_pending = 0;
 
-		dlm_get_next_cookie(lock->ml.node, &lock->ml.cookie);
-
 retry_lock:
 		if (flags & LKM_VALBLK) {
 			dlmprintk("LKM_VALBLK passed by caller\n");

Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c	2005-03-25 20:09:38 UTC (rev 2054)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c	2005-03-25 21:36:27 UTC (rev 2055)
@@ -921,8 +921,7 @@
 	dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
 	char *name;
 	unsigned int namelen;
-	int found, ret, bit;
-	unsigned long nodemap[BITS_TO_LONGS(NM_MAX_NODES)];
+	int found, ret;
 
 	if (!dlm_grab(dlm))
 		return DLM_MASTER_RESP_NO;
@@ -945,17 +944,6 @@
 	spin_lock(&dlm->spinlock);
 	res = __dlm_lookup_lock(dlm, name, namelen);
 	if (res) {
-		/* while we still have the dlm->spinlock,
-		 * save off the node map and clear out 
-		 * all nodes from this node forward, and
-		 * the node that called us */
-		memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
-		clear_bit(request->node_idx, nodemap);
-		clear_bit(dlm->node_num, nodemap);
-		while ((bit = find_next_bit(nodemap, NM_MAX_NODES,
-				    dlm->node_num)) < NM_MAX_NODES) {
-			clear_bit(bit, nodemap);
-		}
 		spin_unlock(&dlm->spinlock);
 
 		/* take care of the easy cases up front */
@@ -983,10 +971,13 @@
 			 * caused all nodes up to this one to 
 			 * create mles.  this node now needs to
 			 * go back and clean those up. */
-#warning this needs to move to the work queue
-			ret = dlm_do_assert_master(dlm, res->lockname.name,
-						   res->lockname.len,
-						   nodemap);
+			ret = dlm_dispatch_assert_master(dlm, res, 1, 
+							 request->node_idx);
+			if (ret < 0) {
+				dlmerror0("failed to dispatch assert "
+					  "master work\n");
+				response = DLM_MASTER_RESP_ERROR;
+			}
 			goto send_response;
 		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
 			spin_unlock(&res->spinlock);
@@ -1299,10 +1290,99 @@
 
 int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res)
 {
-#warning need to implement dlm_flush_lockres_asts
+	dlm_flush_asts(dlm);
+	/* still need to implement dlm_flush_lockres_asts */
 	return 0;
 }
 
+int dlm_dispatch_assert_master(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			       int ignore_higher, u8 request_from)
+{
+	dlm_work_item *item;
+	item = (dlm_work_item *)kmalloc(sizeof(dlm_work_item), GFP_KERNEL);
+	if (!item)
+		return -ENOMEM;
+
+
+	/* queue up work for dlm_assert_master_worker */
+	memset(item, 0, sizeof(dlm_work_item));
+	dlm_grab(dlm);  /* get an extra ref for the work item */
+	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
+	item->u.am.lockres = res; /* already have a ref */
+	/* can optionally ignore node numbers higher than this node */
+	item->u.am.ignore_higher = ignore_higher;
+	item->u.am.request_from = request_from;
+	
+	spin_lock(&dlm->work_lock);
+	list_add_tail(&item->list, &dlm->work_list);
+	spin_unlock(&dlm->work_lock);
+
+	schedule_work(&dlm->dispatched_work);
+	return 0;
+}
+
+void dlm_assert_master_worker(dlm_work_item *item, void *data)
+{
+	dlm_ctxt *dlm = data;
+	int ret = 0;
+	dlm_lock_resource *res;
+	unsigned long nodemap[BITS_TO_LONGS(NM_MAX_NODES)];
+	int ignore_higher;
+	int bit;
+	u8 request_from;
+
+	DLM_ASSERT(item);
+	dlm = item->dlm;
+	DLM_ASSERT(dlm);
+
+	res = item->u.am.lockres;
+	DLM_ASSERT(res);
+
+	ignore_higher = item->u.am.ignore_higher;
+	request_from = item->u.am.request_from;
+
+	do {
+		spin_lock(&dlm->spinlock);
+		memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
+		spin_unlock(&dlm->spinlock);
+
+		clear_bit(dlm->node_num, nodemap);
+		if (ignore_higher) {
+			/* if is this just to clear up mles for nodes below 
+			 * this node, do not send the message to the original
+			 * caller or any node number higher than this */
+			clear_bit(request_from, nodemap);
+			bit = dlm->node_num;
+			while (1) {
+				bit = find_next_bit(nodemap, NM_MAX_NODES, 
+						    bit+1);
+			       	if (bit >= NM_MAX_NODES)
+					break;
+				clear_bit(bit, nodemap);
+			}
+		}
+
+		ret = dlm_do_assert_master(dlm, res->lockname.name,
+					   res->lockname.len, 
+					   nodemap);
+		if (ret < 0) {
+			/* no choice but to try again.
+			 * maybe a node died. */ 
+			dlmerror("assert master returned %d!\n", ret);
+		}
+	} while (ret < 0);
+
+	dlm_lockres_put(dlm, res);
+
+	dlmprintk0("finished with dlm_assert_master_worker\n");
+}
+
+
+/*
+ * DLM_MIGRATE_LOCKRES
+ */
+
+
 int dlm_migrate_lockres(dlm_ctxt *dlm, dlm_lock_resource *res, u8 target)
 {
 	dlm_master_list_entry *mle = NULL;

Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-03-25 20:09:38 UTC (rev 2054)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-03-25 21:36:27 UTC (rev 2055)
@@ -1265,8 +1265,13 @@
 		spin_lock(&res->spinlock);
 		master = res->owner;
 		if (master == dlm->node_num) {
-#warning need to broadcast here that i own this
-			dlmprintk0("need to broadcast here that i own this\n");
+			int ret = dlm_dispatch_assert_master(dlm, res, 0, 0);
+			if (ret < 0) {
+				dlmerror0("could not allocate enough memory "
+					  "to send assert_master message!\n");
+				/* retry!? */
+				BUG();
+			}
 		}
 		spin_unlock(&res->spinlock);
 	}
@@ -1373,22 +1378,13 @@
 		}
 
 		/* lock is for another node. */
-		newlock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
-		lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
-		if (!newlock || !lksb) {
+		newlock = dlm_new_lock(ml->type, ml->node, ml->cookie, NULL);
+		if (!newlock) {
 			ret = -ENOMEM;
 			goto leave;
 		}
-
-		memset(newlock, 0, sizeof(dlm_lock));
-		memset(lksb, 0, sizeof(dlm_lockstatus));
-
-		dlm_init_lock(newlock, ml->type, ml->node, ml->cookie);
-		newlock->lksb = lksb;
-		__dlm_lockres_get(res);
-		newlock->lockres = res;
-		lksb->lockid = newlock;
-		lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
+		lksb = newlock->lksb;
+		newlock->lockres = dlm_lockres_grab(dlm, res);
 		
 		if (ml->convert_type != LKM_IVMODE) {
 			DLM_ASSERT(queue == &res->converting);
@@ -1473,15 +1469,42 @@
 	}
 }
 
+static void dlm_free_dead_locks(dlm_ctxt *dlm, dlm_lock_resource *res,
+				u8 dead_node)
+{
+	struct list_head *iter, *tmpiter;
+	dlm_lock *lock;
 
+	assert_spin_locked(&res->spinlock);
 
+	/* TODO: check pending_asts, pending_basts here */
+	list_for_each_safe(iter, tmpiter, &res->granted) {
+		lock = list_entry (iter, dlm_lock, list);
+		if (lock->ml.node == dead_node) {
+			list_del_init(&lock->list);
+			kfree(lock);
+		}
+	}
+	list_for_each_safe(iter, tmpiter, &res->converting) {
+		lock = list_entry (iter, dlm_lock, list);
+		if (lock->ml.node == dead_node) {
+			list_del_init(&lock->list);
+			kfree(lock);
+		}
+	}
+	list_for_each_safe(iter, tmpiter, &res->blocked) {
+		lock = list_entry (iter, dlm_lock, list);
+		if (lock->ml.node == dead_node) {
+			list_del_init(&lock->list);
+			kfree(lock);
+		}
+	}
+}
 
-#warning may need to change kfree to put_lock and refcounting here
 static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node)
 {
-	struct list_head *iter, *iter2, *tmpiter;
+	struct list_head *iter;
 	dlm_lock_resource *res;
-	dlm_lock *lock;
 	int i;
 	struct list_head *bucket;
 
@@ -1511,31 +1534,10 @@
 						 res->lockname.len))
 				continue;
 			spin_lock(&res->spinlock);
-			if (res->owner == dead_node) {
+			if (res->owner == dead_node)
 				dlm_move_lockres_to_recovery_list(dlm, res);
-			} else if (res->owner == dlm->node_num) {
-				list_for_each_safe(iter2, tmpiter, &res->granted) {
-					lock = list_entry (iter2, dlm_lock, list);
-					if (lock->ml.node == dead_node) {
-						list_del_init(&lock->list);
-						kfree(lock);
-					}
-				}
-				list_for_each_safe(iter2, tmpiter, &res->converting) {
-					lock = list_entry (iter2, dlm_lock, list);
-					if (lock->ml.node == dead_node) {
-						list_del_init(&lock->list);
-						kfree(lock);
-					}
-				}
-				list_for_each_safe(iter2, tmpiter, &res->blocked) {
-					lock = list_entry (iter2, dlm_lock, list);
-					if (lock->ml.node == dead_node) {
-						list_del_init(&lock->list);
-						kfree(lock);
-					}
-				}
-			}
+			else if (res->owner == dlm->node_num)
+				dlm_free_dead_locks(dlm, res, dead_node);
 			spin_unlock(&res->spinlock);
 		}
 	}

Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c	2005-03-25 20:09:38 UTC (rev 2054)
+++ trunk/fs/ocfs2/dlm/dlmthread.c	2005-03-25 21:36:27 UTC (rev 2055)
@@ -423,7 +423,7 @@
 	return empty;
 }
 
-static void dlm_flush_asts(dlm_ctxt *dlm)
+void dlm_flush_asts(dlm_ctxt *dlm)
 {
 	struct list_head *iter, *iter2;
 	dlm_lock *lock;



More information about the Ocfs2-commits mailing list