[Ocfs2-commits] mfasheh commits r1975 - trunk/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Tue Mar 15 14:34:48 CST 2005


Author: mfasheh
Signed-off-by: khackel
Date: 2005-03-15 14:34:47 -0600 (Tue, 15 Mar 2005)
New Revision: 1975

Modified:
   trunk/fs/ocfs2/dlm/dlmast.c
   trunk/fs/ocfs2/dlm/dlmconvert.c
   trunk/fs/ocfs2/dlm/dlmlock.c
   trunk/fs/ocfs2/dlm/dlmmod.c
   trunk/fs/ocfs2/dlm/dlmmod.h
   trunk/fs/ocfs2/dlm/dlmthread.c
   trunk/fs/ocfs2/dlm/dlmunlock.c
Log:
* fix a bug in dlm with proxy ast's being called from network
  handlers. Unfortunately this introduces another (smaller) bug wrt
  bast firing which should be fixed in a future commit.

Most of this patch was written by Kurt Hackel with some fixes /
cleanups from myself.

Signed-off-by: khackel



Modified: trunk/fs/ocfs2/dlm/dlmast.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmast.c	2005-03-15 20:21:14 UTC (rev 1974)
+++ trunk/fs/ocfs2/dlm/dlmast.c	2005-03-15 20:34:47 UTC (rev 1975)
@@ -50,26 +50,61 @@
 #include "dlmcommon.h"
 #include "dlmmod.h"
 
-static int dlm_send_proxy_ast(dlm_ctxt *dlm, dlm_lock_resource *res, 
-			      dlm_lock *lock, int type, int blocked_type);
 
-int dlm_do_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock)
+static void dlm_update_lvb(dlm_ctxt *dlm, dlm_lock_resource *res,
+			   dlm_lock *lock);
+
+void __dlm_queue_ast(dlm_ctxt *dlm, dlm_lock *lock)
 {
-	int ret;
-	dlm_astlockfunc_t *fn;
-	dlm_lockstatus *lksb;
+	dlmprintk0("\n");
 
+	DLM_ASSERT(dlm);
+	DLM_ASSERT(lock);
+
+	assert_spin_locked(&dlm->spinlock);
+	DLM_ASSERT(list_empty(&lock->ast_list));
+	list_add_tail(&lock->ast_list, &dlm->pending_asts);
+}
+
+void dlm_queue_ast(dlm_ctxt *dlm, dlm_lock *lock)
+{
 	dlmprintk0("\n");
 
+	DLM_ASSERT(dlm);
 	DLM_ASSERT(lock);
-	DLM_ASSERT(res);
-	DLM_ASSERT(lock->lksb);
 
-	lksb = lock->lksb;
-	fn = lock->ast;
+	spin_lock(&dlm->spinlock);
+	DLM_ASSERT(list_empty(&lock->ast_list));
+	list_add_tail(&lock->ast_list, &dlm->pending_asts);
+	spin_unlock(&dlm->spinlock);
+}
 
+
+void __dlm_queue_bast(dlm_ctxt *dlm, dlm_lock *lock)
+{
+	dlmprintk0("\n");
+
+	DLM_ASSERT(dlm);
+	DLM_ASSERT(lock);
+	assert_spin_locked(&dlm->spinlock);
+
+	DLM_ASSERT(list_empty(&lock->bast_list));
+
+	list_add_tail(&lock->bast_list, &dlm->pending_basts);
+}
+
+
+static void dlm_update_lvb(dlm_ctxt *dlm, dlm_lock_resource *res,
+			   dlm_lock *lock)
+{
+	dlm_lockstatus *lksb = lock->lksb;
+	DLM_ASSERT(lksb);
+
+	/* only updates if this node masters the lockres */
 	if (res->owner == dlm->group_index) {
-		/* this node is the lockres master */
+
+		spin_lock(&res->spinlock);
+		/* check the lksb flags for the direction */
 		if (lksb->flags & DLM_LKSB_GET_LVB) {
 			dlmprintk("getting lvb from lockres for %s node\n",
 				  lock->ml.node == dlm->group_index ? "master" :
@@ -81,54 +116,73 @@
 				  "remote");
 			memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
 		}
+		spin_unlock(&res->spinlock);
 	}
 
-	ret = 0;
-	if (lock->ml.node != dlm->group_index) {
-		/* lock request came from another node
-		 * go do the ast over there */
-		ret = dlm_send_proxy_ast(dlm, res, lock, DLM_AST, 0);
-	} else {
-		DLM_ASSERT(fn);
-		(*fn)(lock->astdata);
-	}
-
 	/* reset any lvb flags on the lksb */
 	lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
-	return ret;
 }
 
+void dlm_do_local_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock)
+{
+	dlm_astlockfunc_t *fn;
+	dlm_lockstatus *lksb;
 
-int dlm_do_bast(dlm_ctxt *dlm, dlm_lock_resource *res, 
-		dlm_lock *lock, int blocked_type)
+	dlmprintk0("\n");
+
+	DLM_ASSERT(lock);
+	DLM_ASSERT(res);
+	lksb = lock->lksb;
+	DLM_ASSERT(lksb);
+	fn = lock->ast;
+	DLM_ASSERT(fn);
+	DLM_ASSERT(lock->ml.node == dlm->group_index);
+
+	dlm_update_lvb(dlm, res, lock);
+	(*fn)(lock->astdata);
+}
+
+
+int dlm_do_remote_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock)
 {
 	int ret;
-	dlm_bastlockfunc_t *fn = lock->bast;
+	dlm_lockstatus *lksb;
+	int lksbflags = 0;
 
 	dlmprintk0("\n");
 
-	if (lock->ml.node != dlm->group_index) {
-		ret = dlm_send_proxy_ast(dlm, res, lock, 
-					 DLM_BAST, blocked_type);
-		goto leave;
-	}
+	DLM_ASSERT(lock);
+	DLM_ASSERT(res);
+	lksb = lock->lksb;
+	DLM_ASSERT(lksb);
+	DLM_ASSERT(lock->ml.node != dlm->group_index);
 
-	if (!fn) {
-		dlmprintk("eek! lock has no bast %.*s!  cookie=%llu\n", 
-		       res->lockname.len, res->lockname.name, lock->ml.cookie);
-		ret = -EINVAL;
-		goto leave;
-	}
-	(*fn)(lock->astdata, blocked_type);
 	ret = 0;
-leave:
+	dlm_update_lvb(dlm, res, lock);
+		
+	/* lock request came from another node
+	 * go do the ast over there */
+	ret = dlm_send_proxy_ast(dlm, res, lock, lksbflags);
 	return ret;
 }
 
+void dlm_do_local_bast(dlm_ctxt *dlm, dlm_lock_resource *res, 
+		       dlm_lock *lock, int blocked_type)
+{
+	dlm_bastlockfunc_t *fn = lock->bast;
+	dlmprintk0("\n");
 
+	DLM_ASSERT(lock->ml.node == dlm->group_index);
+	DLM_ASSERT(fn);
+	
+	(*fn)(lock->astdata, blocked_type);
+}
+
+
+
 int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data)
 {
-	int ret, status;
+	int ret;
 	unsigned int locklen;
 	dlm_ctxt *dlm = data;
 	dlm_lock_resource *res = NULL;
@@ -185,6 +239,9 @@
 		goto leave;
 	}
 
+	/* cannot get a proxy ast message if this node owns it */
+	DLM_ASSERT(res->owner != dlm->group_index);
+
 	dlmprintk("lockres %.*s\n", res->lockname.len, res->lockname.name);
 	if (!dlm_is_recovery_lock(past->name, past->namelen))
 		down_read(&dlm->recovery_sem);
@@ -242,21 +299,14 @@
 			DLM_ASSERT(lock->lksb->flags & DLM_LKSB_GET_LVB);
 			memcpy(lock->lksb->lvb, past->lvb, DLM_LVB_LEN);
 		}
-		status = dlm_do_ast(dlm, res, lock);
-		dlmprintk("ast done: now... type=%d, convert_type=%d\n",
-			  lock->ml.type, lock->ml.convert_type);
-	} else {
-		dlmprintk("bast: before... type=%d, convert_type=%d\n",
-			  lock->ml.type, lock->ml.convert_type);
-		status = dlm_do_bast(dlm, res, lock, past->blocked_type);
-		dlmprintk("bast: after... type=%d, convert_type=%d\n",
-			  lock->ml.type, lock->ml.convert_type);
 	}
+	spin_unlock(&res->spinlock);
+	
+	if (past->type == DLM_AST)
+		dlm_do_local_ast(dlm, res, lock);
+	else
+		dlm_do_local_bast(dlm, res, lock, past->blocked_type);
 
-	if (status < 0)
-		dlmprintk("eeek: ast/bast returned %d\n", status);
-
-	spin_unlock(&res->spinlock);
 	if (!dlm_is_recovery_lock(past->name, past->namelen))
 		up_read(&dlm->recovery_sem);
 
@@ -269,8 +319,11 @@
 	return ret;
 }
 
-static int dlm_send_proxy_ast(dlm_ctxt *dlm, dlm_lock_resource *res, 
-			      dlm_lock *lock, int type, int blocked_type)
+
+
+int dlm_send_proxy_ast_msg(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			   dlm_lock *lock, int msg_type, 
+			   int blocked_type, int flags)
 {
 	int ret = 0;
 	dlm_proxy_ast past;
@@ -280,11 +333,11 @@
 
 	dlmprintk("res %.*s, to=%u, type=%d, blocked_type=%d\n",
 		  res->lockname.len, res->lockname.name, lock->ml.node, 
-		  type, blocked_type);
+		  msg_type, blocked_type);
 
 	memset(&past, 0, sizeof(dlm_proxy_ast));
 	past.node_idx = dlm->group_index;
-	past.type = type;
+	past.type = msg_type;
 	past.blocked_type = blocked_type;
 	past.namelen = res->lockname.len;
 	strncpy(past.name, res->lockname.name, past.namelen);
@@ -292,8 +345,8 @@
 
 	iov[0].iov_len = sizeof(dlm_proxy_ast);
 	iov[0].iov_base = &past;
-	if (lock->lksb->flags & DLM_LKSB_GET_LVB) {
-		dlmprintk("sending LKM_GET_LVB flag\n");
+	if (flags & DLM_LKSB_GET_LVB) {
+		dlmprintk0("returning requested LVB data\n");
 		past.flags |= LKM_GET_LVB;
 		iov[1].iov_len = DLM_LVB_LEN;
 		iov[1].iov_base = lock->lksb->lvb;

Modified: trunk/fs/ocfs2/dlm/dlmconvert.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmconvert.c	2005-03-15 20:21:14 UTC (rev 1974)
+++ trunk/fs/ocfs2/dlm/dlmconvert.c	2005-03-15 20:34:47 UTC (rev 1975)
@@ -82,10 +82,8 @@
 				     &call_ast, &kick_thread);
 	spin_unlock(&res->spinlock);
 
-#warning fix all ast calling!!!
 	if (call_ast)
-		if (dlm_do_ast(dlm, res, lock) < 0)
-			dlmprintk0("eek\n");
+		dlm_queue_ast(dlm, lock);
 	
 	if (kick_thread)
 		dlm_kick_thread(dlm, res);
@@ -445,10 +443,8 @@
 		dlmprintk("did not find lock to convert on "
 			  "grant queue! cookie=%llu\n", cnv->cookie);
 
-#warning fix all ast calling!!!
 	if (call_ast)
-		if (dlm_do_ast(dlm, res, lock) < 0)
-			dlmprintk0("eek\n");
+		dlm_queue_ast(dlm, lock);
 	if (kick_thread)
 		dlm_kick_thread(dlm, res);
 

Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c	2005-03-15 20:21:14 UTC (rev 1974)
+++ trunk/fs/ocfs2/dlm/dlmlock.c	2005-03-15 20:34:47 UTC (rev 1975)
@@ -127,8 +127,7 @@
 	wake_up(&res->wq);
 
 	if (call_ast)
-		if (dlm_do_ast(dlm, res, lock) < 0)
-			dlmprintk0("eek\n");
+		dlm_queue_ast(dlm, lock);
 
 	dlm_lockres_calc_usage(dlm, res);
 	dlm_kick_thread(dlm, res);
@@ -239,6 +238,7 @@
 	memset(newlock, 0, sizeof(dlm_lock));
 	INIT_LIST_HEAD(&newlock->list);
 	INIT_LIST_HEAD(&newlock->ast_list);
+	INIT_LIST_HEAD(&newlock->bast_list);
 	spin_lock_init(&newlock->spinlock);
 	newlock->ml.type = type;
 	newlock->ml.convert_type = LKM_IVMODE;

Modified: trunk/fs/ocfs2/dlm/dlmmod.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.c	2005-03-15 20:21:14 UTC (rev 1974)
+++ trunk/fs/ocfs2/dlm/dlmmod.c	2005-03-15 20:34:47 UTC (rev 1975)
@@ -303,6 +303,7 @@
 		memset(lock, 0, sizeof(dlm_lock));
 		INIT_LIST_HEAD(&lock->list);
 		INIT_LIST_HEAD(&lock->ast_list);
+		INIT_LIST_HEAD(&lock->bast_list);
 		spin_lock_init(&lock->spinlock);
 		lock->lockres = res;
 		lock->ml.type = mode;
@@ -723,6 +724,8 @@
 	spin_lock_init(&dlm->master_lock);
 	INIT_LIST_HEAD(&dlm->list);
 	INIT_LIST_HEAD(&dlm->dirty_list);
+	INIT_LIST_HEAD(&dlm->pending_asts);
+	INIT_LIST_HEAD(&dlm->pending_basts);
 	INIT_LIST_HEAD(&dlm->reco.resources);
 	INIT_LIST_HEAD(&dlm->reco.received);
 	INIT_LIST_HEAD(&dlm->purge_list);

Modified: trunk/fs/ocfs2/dlm/dlmmod.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.h	2005-03-15 20:21:14 UTC (rev 1974)
+++ trunk/fs/ocfs2/dlm/dlmmod.h	2005-03-15 20:34:47 UTC (rev 1975)
@@ -206,6 +206,8 @@
 	struct list_head *resources;
 	struct list_head dirty_list;
 	struct list_head purge_list;
+	struct list_head pending_asts;
+	struct list_head pending_basts;
 	unsigned int purge_count;
 	spinlock_t spinlock;
 	struct rw_semaphore recovery_sem;
@@ -300,6 +302,7 @@
 
 	struct list_head list;
 	struct list_head ast_list;
+	struct list_head bast_list;
 	dlm_lock_resource *lockres;
 	spinlock_t spinlock;
 
@@ -583,6 +586,7 @@
 void dlm_thread_run_lock_resources(dlm_ctxt *dlm);
 int dlm_launch_thread(dlm_ctxt *dlm);
 void dlm_complete_thread(dlm_ctxt *dlm);
+void dlm_flush_asts(dlm_ctxt *dlm);
 
 dlm_status dlmlock(dlm_ctxt *dlm,
 		   int mode,
@@ -661,8 +665,32 @@
 
 int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
 int dlm_refresh_lock_resource(dlm_ctxt *dlm, dlm_lock_resource *res);
-int dlm_do_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock);
-int dlm_do_bast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int blocked_type);
+
+
+void __dlm_queue_ast(dlm_ctxt *dlm, dlm_lock *lock);
+void dlm_queue_ast(dlm_ctxt *dlm, dlm_lock *lock);
+void __dlm_queue_bast(dlm_ctxt *dlm, dlm_lock *lock);
+void dlm_do_local_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock);
+int dlm_do_remote_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock);
+void dlm_do_local_bast(dlm_ctxt *dlm, dlm_lock_resource *res, 
+		       dlm_lock *lock, int blocked_type);
+int dlm_send_proxy_ast_msg(dlm_ctxt *dlm, dlm_lock_resource *res, 
+			   dlm_lock *lock, int msg_type, 
+			   int blocked_type, int flags);
+static inline int dlm_send_proxy_bast(dlm_ctxt *dlm, dlm_lock_resource *res, 
+				      dlm_lock *lock, int blocked_type)
+{
+	return dlm_send_proxy_ast_msg(dlm, res, lock, DLM_BAST,
+				      blocked_type, 0);
+}
+
+static inline int dlm_send_proxy_ast(dlm_ctxt *dlm, dlm_lock_resource *res, 
+				     dlm_lock *lock, int flags)
+{
+	return dlm_send_proxy_ast_msg(dlm, res, lock, DLM_AST,
+				      0, flags);
+}
+
 u8 dlm_nm_this_node(dlm_ctxt *dlm);
 void dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res);
 

Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c	2005-03-15 20:21:14 UTC (rev 1974)
+++ trunk/fs/ocfs2/dlm/dlmthread.c	2005-03-15 20:34:47 UTC (rev 1975)
@@ -186,31 +186,14 @@
 {
 	dlm_lock *lock, *target;
 	struct list_head *iter, *tmpiter;
-	LIST_HEAD(bast_list);
 	struct list_head *head;
 	s8 hi;
+	int can_grant = 1;
 
 	dlmprintk("shuffle res %.*s\n", res->lockname.len, res->lockname.name);
 
 	spin_lock(&res->spinlock);
 
-#if 0
-	{
-		int g=0, c=0, b=0;
-		list_for_each(iter, &res->granted) {
-			g++;
-		}
-		list_for_each(iter, &res->converting) {
-			c++;
-		}
-		list_for_each(iter, &res->blocked) {
-			b++;
-		}
-		dlmprintk("(%d) granted: %d, converting: %d, blocked: %d\n", 
-			  current->pid, g, c, b);
-	}
-#endif
-
 converting:
 	if (list_empty(&res->converting))
 		goto blocked;
@@ -229,8 +212,11 @@
 		if (lock==target)
 			continue;
 		if (!dlm_lock_compatible(lock->ml.type, target->ml.convert_type)) {
+			can_grant = 0;
+			/* queue the BAST if not already */
 			if (lock->ml.highest_blocked == LKM_IVMODE)
-				list_add(&lock->ast_list, &bast_list);
+				__dlm_queue_bast(dlm, lock);
+			/* update the highest_blocked if needed */
 			if (lock->ml.highest_blocked < target->ml.convert_type)
 				lock->ml.highest_blocked = target->ml.convert_type;
 		}
@@ -241,15 +227,16 @@
 		if (lock==target)
 			continue;
 		if (!dlm_lock_compatible(lock->ml.type, target->ml.convert_type)) {
+			can_grant = 0;
 			if (lock->ml.highest_blocked == LKM_IVMODE)
-				list_add(&lock->ast_list, &bast_list);
+				__dlm_queue_bast(dlm, lock);
 			if (lock->ml.highest_blocked < target->ml.convert_type)
 				lock->ml.highest_blocked = target->ml.convert_type;
 		}
 	}
 	
 	/* we can convert the lock */
-	if (list_empty(&bast_list)) {
+	if (can_grant) {
 		spin_lock(&target->spinlock);
 		DLM_ASSERT(target->ml.highest_blocked == LKM_IVMODE);	
 		
@@ -268,16 +255,14 @@
 
 		spin_unlock(&target->spinlock);
 
-		if (dlm_do_ast(dlm, res, target) < 0)
-			dlmprintk0("eek\n");
+		__dlm_queue_ast(dlm, target);
 		/* go back and check for more */
 		goto converting;
 	}
 
 blocked:
-	if (list_empty(&res->blocked)) {
-		goto basts;
-	}
+	if (list_empty(&res->blocked))
+		goto leave;
 	target = list_entry(res->blocked.next, dlm_lock, list);
 
 	head = &res->granted;
@@ -286,8 +271,9 @@
 		if (lock==target)
 			continue;
 		if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
+			can_grant = 0;
 			if (lock->ml.highest_blocked == LKM_IVMODE)
-				list_add(&lock->ast_list, &bast_list);
+				__dlm_queue_bast(dlm, lock);
 			if (lock->ml.highest_blocked < target->ml.type)
 				lock->ml.highest_blocked = target->ml.type;
 		}
@@ -299,8 +285,9 @@
 		if (lock==target)
 			continue;
 		if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
+			can_grant = 0;
 			if (lock->ml.highest_blocked == LKM_IVMODE)
-				list_add(&lock->ast_list, &bast_list);
+				__dlm_queue_bast(dlm, lock);
 			if (lock->ml.highest_blocked < target->ml.type)
 				lock->ml.highest_blocked = target->ml.type;
 		}
@@ -308,7 +295,7 @@
 	
 	/* we can grant the blocked lock (only 
 	 * possible if converting list empty) */
-	if (list_empty(&bast_list)) {
+	if (can_grant) {
 		spin_lock(&target->spinlock);
 		DLM_ASSERT(target->ml.highest_blocked == LKM_IVMODE);
 		
@@ -325,27 +312,12 @@
 		
 		spin_unlock(&target->spinlock);
 
-		if (dlm_do_ast(dlm, res, target) < 0)
-			dlmprintk0("eek\n");
+		__dlm_queue_ast(dlm, target);
 		/* go back and check for more */
 		goto converting;
 	}
 
-basts:
-	list_for_each_safe(iter, tmpiter, &bast_list) {
-		lock = list_entry(iter, dlm_lock, ast_list);
-		spin_lock(&lock->spinlock);
-		DLM_ASSERT(lock->ml.highest_blocked > LKM_IVMODE);
-		hi = lock->ml.highest_blocked;
-		lock->ml.highest_blocked = LKM_IVMODE;
-		list_del(&lock->ast_list);
-		spin_unlock(&lock->spinlock);
-
-		dlmprintk("delivering a bast for this lockres (blocked = %d\n",
-			  hi);
-		if (dlm_do_bast(dlm, res, lock, hi) < 0)
-			dlmprintk0("eeek\n");
-	}
+leave:
 	spin_unlock(&res->spinlock);
 }
 
@@ -406,6 +378,58 @@
 	return empty;
 }
 
+void dlm_flush_asts(dlm_ctxt *dlm)
+{
+	struct list_head *iter, *iter2;
+	dlm_lock *lock;
+	dlm_lock_resource *res;
+	LIST_HEAD(ast_tmp);
+	LIST_HEAD(bast_tmp);
+	u8 hi;
+
+	dlmprintk0("\n");
+
+	spin_lock(&dlm->spinlock);
+	list_splice_init(&dlm->pending_asts, &ast_tmp);
+	list_splice_init(&dlm->pending_basts, &bast_tmp);
+	spin_unlock(&dlm->spinlock);
+
+	list_for_each_safe(iter, iter2, &ast_tmp) {
+		lock = list_entry(iter, dlm_lock, ast_list);
+		res = lock->lockres;
+		dlmprintk0("delivering an ast for this lockres\n");
+
+		list_del_init(&lock->ast_list);
+		if (lock->ml.node != dlm->group_index) {
+			if (dlm_do_remote_ast(dlm, res, lock) < 0)
+				dlmprintk("eek\n");
+		} else
+			dlm_do_local_ast(dlm, res, lock);
+	}
+
+	list_for_each_safe(iter, iter2, &bast_tmp) {
+		lock = list_entry(iter, dlm_lock, bast_list);
+		res = lock->lockres;
+
+		/* get the highest blocked lock, and reset */
+		spin_lock(&lock->spinlock);
+		DLM_ASSERT(lock->ml.highest_blocked > LKM_IVMODE);
+		hi = lock->ml.highest_blocked;
+		lock->ml.highest_blocked = LKM_IVMODE;
+		spin_unlock(&lock->spinlock);
+
+		list_del_init(&lock->bast_list);
+
+		dlmprintk("delivering a bast for this lockres "
+			  "(blocked = %d\n", hi);
+		if (lock->ml.node != dlm->group_index) {
+			if (dlm_send_proxy_bast(dlm, res, lock, hi) < 0)
+				dlmprintk0("eeek\n");
+		} else
+			dlm_do_local_bast(dlm, res, lock, hi);
+	}
+}
+
 #define DLM_THREAD_TIMEOUT_MS (4 * 1000)
 
 static int dlm_thread(void *data)
@@ -439,6 +463,7 @@
 			spin_unlock(&res->spinlock);
 		}
 		spin_unlock(&dlm->spinlock);
+		dlm_flush_asts(dlm);
 		up_read(&dlm->recovery_sem);
 
 		wait_event_interruptible_timeout(dlm->dlm_thread_wq,

Modified: trunk/fs/ocfs2/dlm/dlmunlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmunlock.c	2005-03-15 20:21:14 UTC (rev 1974)
+++ trunk/fs/ocfs2/dlm/dlmunlock.c	2005-03-15 20:34:47 UTC (rev 1975)
@@ -85,7 +85,8 @@
 {
 	dlm_status status;
 	int actions = 0;
-	
+	int in_use;
+
 	dlmprintk("master_node = %d, valblk = %d\n", master_node,
 		  flags & LKM_VALBLK);
 
@@ -94,6 +95,18 @@
 	else
 		DLM_ASSERT(res->owner != dlm->group_index);
 
+	spin_lock(&dlm->spinlock);
+	/* We want to be sure that we're not freeing a lock
+	 * that still has AST's pending... */
+	in_use = !list_empty(&lock->ast_list);
+	spin_unlock(&dlm->spinlock);
+	if (in_use) {
+		dlmprintk("lockres %.*s: Someone is calling dlmunlock while "
+			  "waiting for an ast!", res->lockname.len,
+			  res->lockname.name);
+		return DLM_BADPARAM;
+	}
+
 	spin_lock(&res->spinlock);
 	if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
 		if (!master_node) {
@@ -159,9 +172,10 @@
 	wake_up(&res->wq);
 
 	if (actions & DLM_UNLOCK_FREE_LOCK) {
-#warning this must change to proper refcounting
-		/* TODO: refcounting... tho for now this will work because 
-		 * the middle layer is keeping track of everything */
+#warning this can corrupt memory!
+		/* XXX If this lock has a bast pending, then we've
+		 * just free'd memory that the dlmthread will be
+		 * referencing... BAAAAD! */
 		kfree(lock);
 		lksb->lockid = NULL;
 	}



More information about the Ocfs2-commits mailing list