[Ocfs2-commits] khackel commits r2129 - trunk/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Fri Apr 8 19:16:37 CDT 2005


Author: khackel
Signed-off-by: mfasheh
Date: 2005-04-08 19:16:35 -0500 (Fri, 08 Apr 2005)
New Revision: 2129

Modified:
   trunk/fs/ocfs2/dlm/dlmast.c
   trunk/fs/ocfs2/dlm/dlmcommon.h
   trunk/fs/ocfs2/dlm/dlmconvert.c
   trunk/fs/ocfs2/dlm/dlmdomain.c
   trunk/fs/ocfs2/dlm/dlmlock.c
   trunk/fs/ocfs2/dlm/dlmmaster.c
   trunk/fs/ocfs2/dlm/dlmrecovery.c
   trunk/fs/ocfs2/dlm/dlmthread.c
   trunk/fs/ocfs2/dlm/dlmunlock.c
Log:
* make lockres migration dependent upon AST flushing (no migration
  until all ASTs and BASTs are flushed)
* added refcounting for dlm_lock
* ASTs and BASTs now use a reserve/claim/release system
* rework dlm_flush_asts (and added dlm->ast_lock) to do work without
  holding the dlm lock
* rework dlm_thread dirty_list running to do work without
  holding the dlm_lock
* handle res->state flags more uniformly in several paths
* uniformly attach and detach lockres and lock structures
* unlock is no longer unsafe

Signed-off-by: mfasheh



Modified: trunk/fs/ocfs2/dlm/dlmast.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmast.c	2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmast.c	2005-04-09 00:16:35 UTC (rev 2129)
@@ -58,11 +58,13 @@
 	DLM_ASSERT(dlm);
 	DLM_ASSERT(lock);
 
-	assert_spin_locked(&dlm->spinlock);
+	assert_spin_locked(&dlm->ast_lock);
 	DLM_ASSERT(list_empty(&lock->ast_list));
 	if (lock->ast_pending)
 		dlmprintk0("lock has an ast getting flushed right now\n");
 
+	/* putting lock on list, add a ref */
+	dlm_lock_get(lock);
 	spin_lock(&lock->spinlock);
 	list_add_tail(&lock->ast_list, &dlm->pending_asts);
 	lock->ast_pending = 1;
@@ -76,9 +78,9 @@
 	DLM_ASSERT(dlm);
 	DLM_ASSERT(lock);
 
-	spin_lock(&dlm->spinlock);
+	spin_lock(&dlm->ast_lock);
 	__dlm_queue_ast(dlm, lock);
-	spin_unlock(&dlm->spinlock);
+	spin_unlock(&dlm->ast_lock);
 }
 
 
@@ -88,19 +90,32 @@
 
 	DLM_ASSERT(dlm);
 	DLM_ASSERT(lock);
-	assert_spin_locked(&dlm->spinlock);
+	assert_spin_locked(&dlm->ast_lock);
 
 	DLM_ASSERT(list_empty(&lock->bast_list));
 	if (lock->bast_pending)
 		dlmprintk0("lock has a bast getting flushed right now\n");
 
+	/* putting lock on list, add a ref */
+	dlm_lock_get(lock);
 	spin_lock(&lock->spinlock);
 	list_add_tail(&lock->bast_list, &dlm->pending_basts);
 	lock->bast_pending = 1;
 	spin_unlock(&lock->spinlock);
 }
 
+void dlm_queue_bast(dlm_ctxt *dlm, dlm_lock *lock)
+{
+	dlmprintk0("\n");
 
+	DLM_ASSERT(dlm);
+	DLM_ASSERT(lock);
+
+	spin_lock(&dlm->ast_lock);
+	__dlm_queue_bast(dlm, lock);
+	spin_unlock(&dlm->ast_lock);
+}
+
 static void dlm_update_lvb(dlm_ctxt *dlm, dlm_lock_resource *res,
 			   dlm_lock *lock)
 {
@@ -302,6 +317,7 @@
 do_ast:
 	ret = DLM_NORMAL;
 	if (past->type == DLM_AST) {
+		/* do not alter lock refcount.  switching lists. */
 		list_del_init(&lock->list);
 		list_add_tail(&lock->list, &res->granted);
 		dlmprintk("ast: adding to granted list... type=%d, "

Modified: trunk/fs/ocfs2/dlm/dlmcommon.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmcommon.h	2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmcommon.h	2005-04-09 00:16:35 UTC (rev 2129)
@@ -105,6 +105,7 @@
 	struct list_head pending_basts;
 	unsigned int purge_count;
 	spinlock_t spinlock;
+	spinlock_t ast_lock;
 	struct rw_semaphore recovery_sem;
 	char *name;
 	u8 node_num;
@@ -239,6 +240,8 @@
 	struct list_head purge;
 	unsigned long    last_used;
 
+	unsigned migration_pending:1;
+	atomic_t asts_reserved;
 	spinlock_t spinlock;
 	wait_queue_head_t wq;
 	u8  owner;              //node which owns the lock resource, or unknown
@@ -272,6 +275,7 @@
 	struct list_head bast_list;
 	dlm_lock_resource *lockres;
 	spinlock_t spinlock;
+	struct kref lock_refs;
 
 	// ast and bast must be callable while holding a spinlock!
 	dlm_astlockfunc_t *ast;     
@@ -772,9 +776,32 @@
 		dlm_migratable_lock_to_host(&(mr->ml[i]));
 }
 
+static inline dlm_status __dlm_lockres_state_to_status(dlm_lock_resource *res)
+{
+	dlm_status status = DLM_NORMAL;
 
+	assert_spin_locked(&res->spinlock);
+
+	if (res->state & DLM_LOCK_RES_RECOVERING) {
+		dlmprintk0("returning DLM_RECOVERING\n");
+		status = DLM_RECOVERING;
+	} else if (res->state & DLM_LOCK_RES_MIGRATING) {
+		dlmprintk0("returning DLM_MIGRATING\n");
+		status = DLM_MIGRATING;
+	} else if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
+		dlmprintk0("returning DLM_FORWARD\n");
+		status = DLM_FORWARD;
+	}
+	return status;
+}
+
 dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, dlm_lockstatus *lksb);
+void dlm_lock_get(dlm_lock *lock);
+void dlm_lock_put(dlm_lock *lock);
 
+void dlm_lock_detach_lockres(dlm_lock *lock);
+void dlm_lock_attach_lockres(dlm_lock *lock, dlm_lock_resource *res);
+	
 int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
 int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
 int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
@@ -785,6 +812,7 @@
 int dlm_launch_thread(dlm_ctxt *dlm);
 void dlm_complete_thread(dlm_ctxt *dlm);
 void dlm_flush_asts(dlm_ctxt *dlm);
+int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res);
 int dlm_launch_recovery_thread(dlm_ctxt *dlm);
 void dlm_complete_recovery_thread(dlm_ctxt *dlm);
 
@@ -819,6 +847,7 @@
 void __dlm_queue_ast(dlm_ctxt *dlm, dlm_lock *lock);
 void dlm_queue_ast(dlm_ctxt *dlm, dlm_lock *lock);
 void __dlm_queue_bast(dlm_ctxt *dlm, dlm_lock *lock);
+void dlm_queue_bast(dlm_ctxt *dlm, dlm_lock *lock);
 void dlm_do_local_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock);
 int dlm_do_remote_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock);
 void dlm_do_local_bast(dlm_ctxt *dlm, dlm_lock_resource *res, 
@@ -852,6 +881,8 @@
 
 int dlm_migrate_lockres(dlm_ctxt *dlm, dlm_lock_resource *res, u8 target);
 int dlm_finish_migration(dlm_ctxt *dlm, dlm_lock_resource *res, u8 old_master);
+void dlm_lockres_release_ast(dlm_lock_resource *res);
+void __dlm_lockres_reserve_ast(dlm_lock_resource *res);
 
 int dlm_master_request_handler(net_msg *msg, u32 len, void *data);
 int dlm_assert_master_handler(net_msg *msg, u32 len, void *data);
@@ -878,6 +909,7 @@
 
 /* will exit holding res->spinlock, but may drop in function */
 void __dlm_wait_on_lockres_flags(dlm_lock_resource *res, int flags);
+void __dlm_wait_on_lockres_flags_set(dlm_lock_resource *res, int flags);
 
 /* will exit holding res->spinlock, but may drop in function */
 static inline void __dlm_wait_on_lockres(dlm_lock_resource *res)

Modified: trunk/fs/ocfs2/dlm/dlmconvert.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmconvert.c	2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmconvert.c	2005-04-09 00:16:35 UTC (rev 2129)
@@ -79,6 +79,7 @@
 	spin_lock(&res->spinlock);
 	/* we are not in a network handler, this is fine */
 	__dlm_wait_on_lockres(res);
+	__dlm_lockres_reserve_ast(res);
 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
 
 	status = __dlmconvert_master(dlm, res, lock, flags, type, 
@@ -88,8 +89,11 @@
 	spin_unlock(&res->spinlock);
 	wake_up(&res->wq);
 
+	/* either queue the ast or release it */
 	if (call_ast)
 		dlm_queue_ast(dlm, lock);
+	else
+		dlm_lockres_release_ast(res);
 	
 	if (kick_thread)
 		dlm_kick_thread(dlm, res);
@@ -216,6 +220,7 @@
 		  res->lockname.name);
 
 	lock->ml.convert_type = type;
+	/* do not alter lock refcount.  switching lists. */
 	list_del_init(&lock->list);
 	list_add_tail(&lock->list, &res->converting);
 
@@ -252,6 +257,7 @@
 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
 
 	/* move lock to local convert queue */
+	/* do not alter lock refcount.  switching lists. */
 	list_del_init(&lock->list);
 	list_add_tail(&lock->list, &res->converting);
 	if (lock->ml.convert_type != LKM_IVMODE) {
@@ -288,6 +294,7 @@
 
 	/* if it failed, move it back to granted queue */
 	if (status != DLM_NORMAL) {
+		/* do not alter lock refcount.  switching lists. */
 		list_del_init(&lock->list);
 		list_add_tail(&lock->list, &res->granted);
 		lock->ml.convert_type = LKM_IVMODE;
@@ -389,7 +396,6 @@
 	dlm_status status = DLM_NORMAL;
 	u32 flags;
 	int call_ast = 0, kick_thread = 0;
-	int found = 0;
 
 	if (!dlm_grab(dlm))
 		return DLM_REJECTED;
@@ -421,28 +427,17 @@
 		goto leave;
 
 	spin_lock(&res->spinlock);
-	if (res->state & DLM_LOCK_RES_RECOVERING) {
-		spin_unlock(&res->spinlock);
-		dlmprintk0("returning DLM_RECOVERING\n");
-		status = DLM_RECOVERING;
-		goto leave;
-	}
-	if (res->state & DLM_LOCK_RES_MIGRATING) {
-		spin_unlock(&res->spinlock);
-		dlmprintk0("returning DLM_MIGRATING\n");
-		status = DLM_MIGRATING;
-		goto leave;
-	}
 	list_for_each(iter, &res->granted) {
 		lock = list_entry(iter, dlm_lock, list);
 		if (lock->ml.cookie == cnv->cookie &&
 		    lock->ml.node == cnv->node_idx) {
-			found = 1;
+			dlm_lock_get(lock);
 			break;
 		}
+		lock = NULL;
 	}
 	spin_unlock(&res->spinlock);
-	if (!found)
+	if (!lock)
 		goto leave;
 
 	/* found the lock */
@@ -461,9 +456,9 @@
 	}
 
 	spin_lock(&res->spinlock);
-	if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
-		status = DLM_FORWARD;
-	} else {
+	status = __dlm_lockres_state_to_status(res);
+	if (status == DLM_NORMAL) {
+		__dlm_lockres_reserve_ast(res);
 		res->state |= DLM_LOCK_RES_IN_PROGRESS;
 		status = __dlmconvert_master(dlm, res, lock, flags, 
 					     cnv->requested_type,
@@ -480,9 +475,15 @@
 	if (!lock)
 		dlmprintk("did not find lock to convert on "
 			  "grant queue! cookie=%llu\n", cnv->cookie);
+	else
+		dlm_lock_put(lock);
 
+	/* either queue the ast or release it */
 	if (call_ast)
 		dlm_queue_ast(dlm, lock);
+	else 
+		dlm_lockres_release_ast(res);
+
 	if (kick_thread)
 		dlm_kick_thread(dlm, res);
 

Modified: trunk/fs/ocfs2/dlm/dlmdomain.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmdomain.c	2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmdomain.c	2005-04-09 00:16:35 UTC (rev 2129)
@@ -47,7 +47,13 @@
 /*
  *
  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
- *    dlm_domain_lock -> dlm_ctxt -> dlm_lock_resource -> dlm_lock
+ *    dlm_domain_lock 
+ *    dlm_ctxt->spinlock
+ *    dlm_lock_resource->spinlock
+ *    dlm_ctxt->master_lock
+ *    dlm_ctxt->ast_lock
+ *    dlm_master_list_entry->spinlock
+ *    dlm_lock->spinlock
  *
  */
 
@@ -1153,6 +1159,7 @@
 
 	spin_lock_init(&dlm->spinlock);
 	spin_lock_init(&dlm->master_lock);
+	spin_lock_init(&dlm->ast_lock);
 	INIT_LIST_HEAD(&dlm->list);
 	INIT_LIST_HEAD(&dlm->dirty_list);
 	INIT_LIST_HEAD(&dlm->reco.resources);

Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c	2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmlock.c	2005-04-09 00:16:35 UTC (rev 2129)
@@ -56,6 +56,7 @@
 					       dlm_lock_resource *res, 
 					       dlm_lock *lock, int flags);
 static void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie);
+static void dlm_lock_release(struct kref *kref);
 
 /* Tell us whether we can grant a new lock request.
  * locking:
@@ -97,7 +98,7 @@
 static dlm_status dlmlock_master(dlm_ctxt *dlm, dlm_lock_resource *res, 
 				 dlm_lock *lock, int flags)
 {
-	int call_ast = 0;
+	int call_ast = 0, kick_thread = 0;
 	dlm_status status = DLM_NORMAL;
 
 	DLM_ASSERT(lock);
@@ -108,13 +109,24 @@
 	dlmprintk("type=%d\n", lock->ml.type);
 
 	spin_lock(&res->spinlock);
+	/* if called from dlm_create_lock_handler, need to 
+	 * ensure it will not sleep in dlm_wait_on_lockres */
+	status = __dlm_lockres_state_to_status(res);
+	if (status != DLM_NORMAL && 
+	    lock->ml.node != dlm->node_num) {
+		/* erf.  state changed after lock was dropped. */
+		spin_unlock(&res->spinlock);
+		return status;
+	}
 	__dlm_wait_on_lockres(res);
+	__dlm_lockres_reserve_ast(res);
 
 	if (dlm_can_grant_new_lock(res, lock)) {
 		dlmprintk("I can grant this lock right away\n");
 		/* got it right away */
 		lock->lksb->status = DLM_NORMAL;
 		status = DLM_NORMAL;
+		dlm_lock_get(lock);
 		list_add_tail(&lock->list, &res->granted);
 
 		/* for the recovery lock, we can't allow the ast 
@@ -123,25 +135,33 @@
 		 * with LKM_NOQUEUE so we do not need the ast in
 		 * this special case */ 
 		if (!dlm_is_recovery_lock(res->lockname.name,
-					  res->lockname.len))
+					  res->lockname.len)) {
+			kick_thread = 1;
 			call_ast = 1;
+		}
 	} else {
 		/* for NOQUEUE request, unless we get the
 		 * lock right away, return DLM_NOTQUEUED */
 		if (flags & LKM_NOQUEUE)
 			status = DLM_NOTQUEUED;
-		else
+		else {
+			dlm_lock_get(lock);
 			list_add_tail(&lock->list, &res->blocked);
+			kick_thread = 1;
+		}
 	}
 
 	spin_unlock(&res->spinlock);
 	wake_up(&res->wq);
 
+	/* either queue the ast or release it */
 	if (call_ast)
 		dlm_queue_ast(dlm, lock);
+	else 
+		dlm_lockres_release_ast(res);
 
 	dlm_lockres_calc_usage(dlm, res);
-	if (status == DLM_NORMAL)
+	if (kick_thread)
 		dlm_kick_thread(dlm, res);
 
 	return status;
@@ -170,6 +190,7 @@
 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
 	
 	/* add lock to local (secondary) queue */
+	dlm_lock_get(lock);
 	list_add_tail(&lock->list, &res->blocked);
 	spin_unlock(&res->spinlock);
 
@@ -183,6 +204,7 @@
 		/* remove from local queue if it failed */
 		list_del_init(&lock->list);
 		lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
+		dlm_lock_put(lock);
 	}
 	spin_unlock(&res->spinlock);
 
@@ -233,7 +255,68 @@
 	return ret;
 }
 
+void dlm_lock_get(dlm_lock *lock)
+{
+	kref_get(&lock->lock_refs);
+}
 
+void dlm_lock_put(dlm_lock *lock)
+{
+	kref_put(&lock->lock_refs, dlm_lock_release);
+}
+
+static void dlm_lock_release(struct kref *kref)
+{
+	dlm_lock *lock;
+	dlm_lockstatus *lksb;
+
+	DLM_ASSERT(kref);
+	lock = container_of(kref, dlm_lock, lock_refs);
+
+	lksb = lock->lksb;
+	DLM_ASSERT(lksb);
+	DLM_ASSERT(lksb->lockid == lock);
+	DLM_ASSERT(list_empty(&lock->list));
+	DLM_ASSERT(list_empty(&lock->ast_list));
+	DLM_ASSERT(list_empty(&lock->bast_list));
+	DLM_ASSERT(!lock->ast_pending);
+	DLM_ASSERT(!lock->bast_pending);
+
+	dlm_lock_detach_lockres(lock);
+	
+	if (lksb->flags & DLM_LKSB_KERNEL_ALLOCATED) {
+		dlmprintk0("freeing kernel-allocated lksb\n");
+		kfree(lksb);
+	} else {
+		dlmprintk0("clearing lockid pointer on user-allocated lksb\n");
+		lksb->lockid = NULL;
+	}
+	kfree(lock);
+}
+
+/* associate a lock with it's lockres, getting a ref on the lockres */
+void dlm_lock_attach_lockres(dlm_lock *lock, dlm_lock_resource *res)
+{
+	DLM_ASSERT(lock);
+	DLM_ASSERT(res);
+	dlm_lockres_get(res);
+	lock->lockres = res;
+}
+
+/* drop ref on lockres, if there is still one associated with lock */
+void dlm_lock_detach_lockres(dlm_lock *lock)
+{
+	dlm_lock_resource *res;
+	
+	DLM_ASSERT(lock);
+	res = lock->lockres;
+	if (res) {
+		lock->lockres = NULL;
+		dlmprintk0("removing lock's lockres reference\n");
+		dlm_lockres_put(res);
+	}
+}
+
 static void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie)
 {
 	INIT_LIST_HEAD(&newlock->list);
@@ -250,6 +333,7 @@
 	newlock->ml.cookie = cookie;
 	newlock->ast_pending = 0;
 	newlock->bast_pending = 0;
+	kref_init(&newlock->lock_refs, dlm_lock_release);
 }
 
 dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, dlm_lockstatus *lksb)
@@ -334,29 +418,21 @@
 		goto leave;
 
 	spin_lock(&res->spinlock);
-	if (res->state & DLM_LOCK_RES_RECOVERING) {
-		dlmprintk0("returning DLM_RECOVERING\n");
-		status = DLM_RECOVERING;
-		spin_unlock(&res->spinlock);
+	status = __dlm_lockres_state_to_status(res);
+	spin_unlock(&res->spinlock);
+		
+	if (status != DLM_NORMAL) {
+		dlmprintk("lockres recovering/migrating/in-progress\n");
 		goto leave;
 	}
-	if (res->state & DLM_LOCK_RES_MIGRATING) {
-		dlmprintk0("returning DLM_MIGRATING\n");
-		status = DLM_MIGRATING;
-		spin_unlock(&res->spinlock);
-		goto leave;
-	}
-	spin_unlock(&res->spinlock);
 
-	newlock->lockres = res;
+	dlm_lock_attach_lockres(newlock, res);
+	
 	status = dlmlock_master(dlm, res, newlock, create->flags);
 leave:
-	if (status != DLM_NORMAL) {
+	if (status != DLM_NORMAL)
 		if (newlock)
-			kfree(newlock);
-		if (lksb)
-			kfree(lksb);
-	}
+			dlm_lock_put(newlock);
 
 	if (res)
 		dlm_lockres_put(res);
@@ -366,6 +442,7 @@
 	return status;
 }
 
+
 /* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
 static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
 {
@@ -392,6 +469,9 @@
 	dlm_lock *lock = NULL;
 	int convert = 0, recovery = 0;
 
+	/* yes this function is a mess.  
+	 * TODO: clean this up.  lots of common code in the 
+	 *       lock and convert paths, especially in the retry blocks */
 	if (!lksb)
 		return DLM_BADARGS;
 
@@ -405,8 +485,8 @@
 	convert = (flags & LKM_CONVERT);
 	recovery = (flags & LKM_RECOVERY);
 
-	if (recovery && (!dlm_is_recovery_lock(name, strlen(name)) ||
-		 convert) ) {
+	if (recovery && 
+	    (!dlm_is_recovery_lock(name, strlen(name)) || convert) ) {
 		goto error;
 	}
 	if (convert && (flags & LKM_LOCAL)) {
@@ -418,9 +498,18 @@
 		/* CONVERT request */
 
 		/* if converting, must pass in a valid dlm_lock */
-		if (!lksb->lockid || !lksb->lockid->lockres)
+		lock = lksb->lockid;
+		if (!lock) {
+			dlmerror0("NULL lock pointer in convert request\n");
 			goto error;
-		lock = lksb->lockid;
+		}
+		
+		res = lock->lockres;
+		if (!res) {
+			dlmerror0("NULL lockres pointer in convert request\n");
+			goto error;
+		}
+		dlm_lockres_get(res);
 
 		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are 
 	 	 * static after the original lock call.  convert requests will 
@@ -437,8 +526,6 @@
 				  lock->bast, lock->astdata);
 			goto error;
 		}
-		res = lock->lockres;
-		dlm_lockres_get(res);
 retry_convert:
 		down_read(&dlm->recovery_sem);
 
@@ -455,6 +542,7 @@
 			dlmprintk0("retrying convert with migration/"
 				   "recovery/in-progress\n");
 			up_read(&dlm->recovery_sem);
+			yield();
 			goto retry_convert;
 		}
 	} else {
@@ -487,7 +575,7 @@
 		dlmprintk("type=%d, flags = 0x%x\n", mode, flags);
 		dlmprintk("creating lock: lock=%p res=%p\n", lock, res);
 
-		lock->lockres = res;
+		dlm_lock_attach_lockres(lock, res);
 		lock->ast = ast;
 		lock->bast = bast;
 		lock->astdata = data;
@@ -511,10 +599,12 @@
 		else 
 			status = dlmlock_remote(dlm, res, lock, flags);
 
-		if (status == DLM_RECOVERING || status == DLM_MIGRATING) {
-			dlmprintk0("retrying lock with migration or "
-				   "recovery in progress\n");
+		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
+		    status == DLM_FORWARD) {
+			dlmprintk0("retrying lock with migration/"
+				   "recovery/in progress\n");
 			up_read(&dlm->recovery_sem);
+			yield();
 			down_read(&dlm->recovery_sem);
 			goto retry_lock;
 		}
@@ -531,14 +621,14 @@
 
 error:
 	if (status != DLM_NORMAL) {
-		if (lock && !convert) {
-			kfree(lock);
-			lksb->lockid = NULL;
-		}
+		if (lock && !convert)
+			dlm_lock_put(lock);
 		// this is kind of unnecessary
 		lksb->status = status;
 	}
 
+	/* put lockres ref from the convert path 
+	 * or from dlm_get_lock_resource */
 	if (res)
 		dlm_lockres_put(res);
 

Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c	2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c	2005-04-09 00:16:35 UTC (rev 2129)
@@ -132,7 +132,7 @@
 				     dlm_master_list_entry *mle);
 static int dlm_restart_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res,
 				    dlm_master_list_entry *mle);
-static void dlm_add_migration_mle(dlm_ctxt *dlm, 
+static int dlm_add_migration_mle(dlm_ctxt *dlm, 
 				 dlm_lock_resource *res, 
 				 dlm_master_list_entry *mle, 
 				 dlm_master_list_entry **oldmle, 
@@ -141,8 +141,10 @@
 
 static u8 dlm_pick_migration_target(dlm_ctxt *dlm, dlm_lock_resource *res);
 static void dlm_remove_nonlocal_locks(dlm_ctxt *dlm, dlm_lock_resource *res);
+static void dlm_mark_lockres_migrating(dlm_ctxt *dlm, dlm_lock_resource *res);
 
 
+
 /*
  * MASTER LIST FUNCTIONS
  */
@@ -473,6 +475,8 @@
 	INIT_LIST_HEAD(&res->dirty);
 	INIT_LIST_HEAD(&res->recovering);
 	INIT_LIST_HEAD(&res->purge);
+	atomic_set(&res->asts_reserved, 0);
+	res->migration_pending = 0;
 
 	kref_init(&res->refs, dlm_lockres_release);
 
@@ -1269,13 +1273,6 @@
 	return 0;
 }
 
-int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res)
-{
-	dlm_flush_asts(dlm);
-	/* still need to implement dlm_flush_lockres_asts */
-	return 0;
-}
-
 int dlm_dispatch_assert_master(dlm_ctxt *dlm, dlm_lock_resource *res, 
 			       int ignore_higher, u8 request_from)
 {
@@ -1386,12 +1383,20 @@
 
 	dlmprintk("migrating %.*s to %u\n", namelen, name, target);
 
+	/* 
+	 * ensure this lockres is a proper candidate for migration 
+	 */
 	spin_lock(&res->spinlock);
 	if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
 		dlmprintk0("cannot migrate lockres with unknown owner!\n");
 		spin_unlock(&res->spinlock);
 		goto leave;
 	}
+	if (res->owner != dlm->node_num) {
+		dlmprintk0("cannot migrate lockres this node doesn't own!\n");
+		spin_unlock(&res->spinlock);
+		goto leave;
+	}
 	dlmprintk0("checking queues...\n");
 	queue = &res->granted;
 	for (i=0; i<3; i++) {
@@ -1415,13 +1420,18 @@
 	dlmprintk0("all locks on this lockres are nonlocal.  continuing\n");
 	spin_unlock(&res->spinlock);
 
+	/* no work to do */
 	if (empty) {
 		dlmprintk0("no locks were found on this lockres! done!\n");
 		ret = 0;
 		goto leave;
 	}
 
-	/* preallocate.. if this fails, abort */
+	/*
+	 * preallocate up front
+	 * if this fails, abort
+	 */
+
 	ret = -ENOMEM;
 	mres = (dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);
 	if (!mres) {
@@ -1436,6 +1446,10 @@
 	}
 	ret = 0;
 
+	/*
+	 * find a node to migrate the lockres to
+	 */
+
 	dlmprintk0("picking a migration node\n");
 	spin_lock(&dlm->spinlock);
 	/* pick a new node */
@@ -1453,34 +1467,33 @@
 
 	if (ret) {
 		spin_unlock(&dlm->spinlock);
-		goto not_live;
+		goto fail;
 	}
 
 	dlmprintk("continuing with target = %u\n", target);
 
-	/* clear any existing master requests and
-	 * add the migration mle to the list */	
+	/* 
+	 * clear any existing master requests and
+	 * add the migration mle to the list 
+	 */	
 	spin_lock(&dlm->master_lock);
-	dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen, 
-			      target, dlm->node_num);
-	mle_added = 1;
+	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name, 
+				    namelen, target, dlm->node_num);
 	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->spinlock);
 
-	/* set migrating flag on lockres */
-	spin_lock(&res->spinlock);
-	res->state |= DLM_LOCK_RES_MIGRATING;
-	spin_unlock(&res->spinlock);
-
-	/* flush the last of the pending asts */
-	ret = dlm_flush_lockres_asts(dlm, res);
-	if (ret < 0) {
-		spin_lock(&res->spinlock);
-		res->state &= ~DLM_LOCK_RES_MIGRATING;
-		spin_unlock(&res->spinlock);
+	if (ret == -EEXIST) {
+		dlmprintk0("another process is already migrating it\n");
+		goto fail;
 	}
+	mle_added = 1;
 
-not_live:
+	/*
+	 * set the MIGRATING flag and flush asts
+	 */
+	dlm_mark_lockres_migrating(dlm, res);
+
+fail:	
 	if (oldmle) {
 		/* master is known, detach if not already detached */
 		dlm_mle_detach_hb_events(dlm, oldmle);
@@ -1503,6 +1516,11 @@
 	 * the lockres
 	 */
 
+
+	/* get an extra reference on the mle.
+	 * otherwise the assert_master from the new
+	 * master will destroy this. */
+	dlm_get_mle(mle);
 	
 	/* notify new node and send all lock state */
 	/* call send_one_lockres with migration flag.
@@ -1517,6 +1535,7 @@
 		/* migration failed, detach and clean up mle */
 		dlm_mle_detach_hb_events(dlm, mle);
 		dlm_put_mle(mle);
+		dlm_put_mle(mle);
 		goto leave;
 	}
 
@@ -1548,6 +1567,7 @@
 			/* migration failed, detach and clean up mle */
 			dlm_mle_detach_hb_events(dlm, mle);
 			dlm_put_mle(mle);
+			dlm_put_mle(mle);
 			goto leave;
 		}
 		/* TODO: if node died: stop, clean up, return error */
@@ -1578,6 +1598,39 @@
 EXPORT_SYMBOL(dlm_migrate_lockres);
 
 
+static void dlm_mark_lockres_migrating(dlm_ctxt *dlm, dlm_lock_resource *res)
+{
+	/* need to set MIGRATING flag on lockres.  this is done by 
+	 * ensuring that all asts have been flushed for this lockres. */
+	spin_lock(&res->spinlock);
+	DLM_ASSERT(!(res->migration_pending));
+	res->migration_pending = 1;
+	/* strategy is to reserve an extra ast then release 
+	 * it below, letting the release do all of the work */
+	__dlm_lockres_reserve_ast(res);
+	spin_unlock(&res->spinlock);
+
+	/* now flush all the pending asts.. hang out for a bit */
+	dlm_flush_lockres_asts(dlm, res);
+	dlm_lockres_release_ast(res);
+
+	/* if the extra ref we just put was the final one, this 
+	 * will pass thru immediately.  otherwise, we need to wait
+	 * for the last ast to finish. */	
+	spin_lock(&res->spinlock);
+	__dlm_wait_on_lockres_flags_set(res, DLM_LOCK_RES_MIGRATING);
+	spin_unlock(&res->spinlock);
+
+	/* 
+	 * at this point:
+	 *
+	 *   o the DLM_LOCK_RES_MIGRATING flag is set
+	 *   o there are no pending asts on this lockres
+	 *   o all processes trying to reserve an ast on this 
+	 *     lockres must wait for the MIGRATING flag to clear
+	 */
+}
+
 /* last step in the migration process.
  * original master calls this to free all of the dlm_lock
  * structures that used to be for other nodes. */
@@ -1597,13 +1650,15 @@
 			lock = list_entry (iter, dlm_lock, list);
 			DLM_ASSERT(lock);
 			if (lock->ml.node != dlm->node_num) {
-				dlmprintk("freeing lock for node %u\n",
+				dlmprintk("putting lock for node %u\n",
 					  lock->ml.node);
+				/* be extra careful */
+				DLM_ASSERT(list_empty(&lock->ast_list));
+				DLM_ASSERT(list_empty(&lock->bast_list));
+				DLM_ASSERT(!lock->ast_pending);
+				DLM_ASSERT(!lock->bast_pending);
 				list_del_init(&lock->list);
-				dlm_lockres_put(lock->lockres);
-				DLM_ASSERT(lock->lksb);
-				kfree(lock->lksb);
-				kfree(lock);
+				dlm_lock_put(lock);
 			}
 		}	
 		queue++;
@@ -1757,8 +1812,11 @@
 		spin_unlock(&res->spinlock);
 	}
 
-	dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen, 
-			      migrate->new_master, migrate->master);
+	/* ignore status.  only nonzero status would BUG. */
+	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, 
+				    name, namelen, 
+				    migrate->new_master, 
+				    migrate->master);
 
 unlock:
 	spin_unlock(&dlm->master_lock);
@@ -1784,7 +1842,7 @@
  * the list after setting it's master field, and then add
  * the new migration mle.  this way we can hold with the rule
  * of having only one mle for a given lock name at all times. */
-static void dlm_add_migration_mle(dlm_ctxt *dlm, 
+static int dlm_add_migration_mle(dlm_ctxt *dlm, 
 				 dlm_lock_resource *res, 
 				 dlm_master_list_entry *mle, 
 				 dlm_master_list_entry **oldmle, 
@@ -1792,6 +1850,7 @@
 				 u8 new_master, u8 master)
 {
 	int found;
+	int ret = 0;
 
 	*oldmle = NULL;
 
@@ -1800,18 +1859,38 @@
 	assert_spin_locked(&dlm->spinlock);
 	assert_spin_locked(&dlm->master_lock);
 
+	/* caller is responsible for any ref taken here on oldmle */
 	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
 	if (found) {
 		dlm_master_list_entry *tmp = *oldmle;
-		/* this is essentially what assert_master does */
 		spin_lock(&tmp->spinlock);
-		tmp->master = master;
-		atomic_set(&tmp->woken, 1);
-		wake_up(&tmp->wq);
-		/* remove it from the list so that only one
-		 * mle will be found */
-		list_del_init(&tmp->list);
-		INIT_LIST_HEAD(&tmp->list);
+		if (tmp->type == DLM_MLE_MIGRATION) {
+			if (master == dlm->node_num) {
+				/* ah another process raced me to it */
+				dlmprintk("tried to migrate %.*s, but some "
+					  "process beat me to it\n",
+					  namelen, name);
+				ret = -EEXIST;
+			} else {
+				/* bad.  2 NODES are trying to migrate! */
+				dlmerror("migration error.  mle: master=%u "
+					 "new_master=%u // request: "
+					 "master=%u new_master=%u // "
+					 "lockres=%.*s\n",
+					 tmp->master, tmp->new_master,
+					 master, new_master, 
+					 namelen, name);
+				BUG();
+			}
+		} else {
+			/* this is essentially what assert_master does */
+			tmp->master = master;
+			atomic_set(&tmp->woken, 1);
+			wake_up(&tmp->wq);
+			/* remove it from the list so that only one
+			 * mle will be found */
+			list_del_init(&tmp->list);
+		}
 		spin_unlock(&tmp->spinlock);
 	}
 
@@ -1822,6 +1901,8 @@
 	/* do this for consistency with other mle types */
 	set_bit(new_master, mle->maybe_map);
 	list_add(&mle->list, &dlm->master_list);
+
+	return ret;
 }
 
 
@@ -1979,3 +2060,50 @@
 leave:
 	return ret;
 }
+
+/*
+ * LOCKRES AST REFCOUNT
+ * this is integral to migration
+ */
+
+/* for future intent to call an ast, reserve one ahead of time.
+ * this should be called only after waiting on the lockres
+ * with dlm_wait_on_lockres, and while still holding the
+ * spinlock after the call. */
+void __dlm_lockres_reserve_ast(dlm_lock_resource *res)
+{
+	assert_spin_locked(&res->spinlock);
+	DLM_ASSERT(!(res->state & DLM_LOCK_RES_MIGRATING));
+
+	atomic_inc(&res->asts_reserved);
+}
+
+/* 
+ * used to drop the reserved ast, either because it went unused, 
+ * or because the ast/bast was actually called.
+ *
+ * also, if there is a pending migration on this lockres, 
+ * and this was the last pending ast on the lockres, 
+ * atomically set the MIGRATING flag before we drop the lock.  
+ * this is how we ensure that migration can proceed with no 
+ * asts in progress.  note that it is ok if the state of the 
+ * queues is such that a lock should be granted in the future
+ * or that a bast should be fired, because the new master will
+ * shuffle the lists on this lockres as soon as it is migrated.
+ */
+void dlm_lockres_release_ast(dlm_lock_resource *res)
+{
+	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
+		return;
+
+	if (!res->migration_pending) {
+		spin_unlock(&res->spinlock);
+		return;
+	}
+
+	DLM_ASSERT(!(res->state & DLM_LOCK_RES_MIGRATING));
+	res->migration_pending = 0;
+	res->state |= DLM_LOCK_RES_MIGRATING;
+	spin_unlock(&res->spinlock);
+	wake_up(&res->wq);
+}

Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-04-09 00:16:35 UTC (rev 2129)
@@ -1028,6 +1028,7 @@
 					 "%.*s, but marked as recovering!\n",
 					 mres->lockname_len, mres->lockname);
 				ret = -EFAULT;
+				spin_unlock(&res->spinlock);
 				goto leave;
 			}
 			res->state |= DLM_LOCK_RES_MIGRATING;
@@ -1369,6 +1370,7 @@
 			 * to match the master here */
 				
 			/* move the lock to its proper place */
+			/* do not alter lock refcount.  switching lists. */
 			list_del_init(&lock->list);
 			list_add_tail(&lock->list, queue);
 			spin_unlock(&res->spinlock);
@@ -1384,8 +1386,7 @@
 			goto leave;
 		}
 		lksb = newlock->lksb;
-		dlm_lockres_get(res);
-		newlock->lockres = res;
+		dlm_lock_attach_lockres(newlock, res);
 		
 		if (ml->convert_type != LKM_IVMODE) {
 			DLM_ASSERT(queue == &res->converting);
@@ -1417,6 +1418,7 @@
 		 * preserved relative to locks from other nodes.
 		 */
 		spin_lock(&res->spinlock);
+		dlm_lock_get(newlock);
 		list_add_tail(&newlock->list, queue);
 		spin_unlock(&res->spinlock);
 	}
@@ -1427,9 +1429,7 @@
 		dlmprintk("error occurred while processing recovery "
 			  "data! %d\n", ret);
 		if (newlock)
-			kfree(newlock);
-		if (lksb)
-			kfree(lksb);
+			dlm_lock_put(newlock);
 	}
 	dlmprintk("returning %d\n", ret);
 	return ret;
@@ -1483,21 +1483,21 @@
 		lock = list_entry (iter, dlm_lock, list);
 		if (lock->ml.node == dead_node) {
 			list_del_init(&lock->list);
-			kfree(lock);
+			dlm_lock_put(lock);
 		}
 	}
 	list_for_each_safe(iter, tmpiter, &res->converting) {
 		lock = list_entry (iter, dlm_lock, list);
 		if (lock->ml.node == dead_node) {
 			list_del_init(&lock->list);
-			kfree(lock);
+			dlm_lock_put(lock);
 		}
 	}
 	list_for_each_safe(iter, tmpiter, &res->blocked) {
 		lock = list_entry (iter, dlm_lock, list);
 		if (lock->ml.node == dead_node) {
 			list_del_init(&lock->list);
-			kfree(lock);
+			dlm_lock_put(lock);
 		}
 	}
 }

Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c	2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmthread.c	2005-04-09 00:16:35 UTC (rev 2129)
@@ -58,6 +58,7 @@
 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
 
 /* will exit holding res->spinlock, but may drop in function */
+/* waits until flags are cleared on res->state */
 void __dlm_wait_on_lockres_flags(dlm_lock_resource *res, int flags)
 {
 	DECLARE_WAITQUEUE(wait, current);
@@ -77,6 +78,26 @@
 	current->state = TASK_RUNNING;
 }
 
+/* opposite of the above, waits until flags are SET */
+void __dlm_wait_on_lockres_flags_set(dlm_lock_resource *res, int flags)
+{
+	DECLARE_WAITQUEUE(wait, current);
+
+	assert_spin_locked(&res->spinlock);
+
+	add_wait_queue(&res->wq, &wait);
+repeat:
+	set_current_state(TASK_UNINTERRUPTIBLE);
+	if ((res->state & flags) != flags) {
+		spin_unlock(&res->spinlock);
+		schedule();
+		spin_lock(&res->spinlock);
+		goto repeat;
+	}
+	remove_wait_queue(&res->wq, &wait);
+	current->state = TASK_RUNNING;
+}
+
 static int __dlm_lockres_unused(dlm_lock_resource *res)
 {
 	if (list_empty(&res->granted) &&
@@ -236,7 +257,14 @@
 	// dlmprintk("shuffle res %.*s\n", res->lockname.len, 
 	//	  res->lockname.name);
 
-	spin_lock(&res->spinlock);
+	/* because this function is called with the lockres
+	 * spinlock, and because we know that it is not migrating/
+	 * recovering/in-progress, it is fine to reserve asts and
+	 * basts right before queueing them all throughout */
+	assert_spin_locked(&res->spinlock);
+	DLM_ASSERT(!(res->state & (DLM_LOCK_RES_MIGRATING|
+				   DLM_LOCK_RES_RECOVERING|
+				   DLM_LOCK_RES_IN_PROGRESS)));
 
 converting:
 	if (list_empty(&res->converting))
@@ -255,14 +283,18 @@
 		lock = list_entry(iter, dlm_lock, list);
 		if (lock==target)
 			continue;
-		if (!dlm_lock_compatible(lock->ml.type, target->ml.convert_type)) {
+		if (!dlm_lock_compatible(lock->ml.type, 
+					 target->ml.convert_type)) {
 			can_grant = 0;
 			/* queue the BAST if not already */
-			if (lock->ml.highest_blocked == LKM_IVMODE)
-				__dlm_queue_bast(dlm, lock);
+			if (lock->ml.highest_blocked == LKM_IVMODE) {
+				__dlm_lockres_reserve_ast(res);
+				dlm_queue_bast(dlm, lock);
+			}
 			/* update the highest_blocked if needed */
 			if (lock->ml.highest_blocked < target->ml.convert_type)
-				lock->ml.highest_blocked = target->ml.convert_type;
+				lock->ml.highest_blocked = 
+					target->ml.convert_type;
 		}
 	}
 	head = &res->converting;
@@ -270,12 +302,16 @@
 		lock = list_entry(iter, dlm_lock, list);
 		if (lock==target)
 			continue;
-		if (!dlm_lock_compatible(lock->ml.type, target->ml.convert_type)) {
+		if (!dlm_lock_compatible(lock->ml.type, 
+					 target->ml.convert_type)) {
 			can_grant = 0;
-			if (lock->ml.highest_blocked == LKM_IVMODE)
-				__dlm_queue_bast(dlm, lock);
+			if (lock->ml.highest_blocked == LKM_IVMODE) {
+				__dlm_lockres_reserve_ast(res);
+				dlm_queue_bast(dlm, lock);
+			}
 			if (lock->ml.highest_blocked < target->ml.convert_type)
-				lock->ml.highest_blocked = target->ml.convert_type;
+				lock->ml.highest_blocked = 
+					target->ml.convert_type;
 		}
 	}
 	
@@ -299,7 +335,8 @@
 
 		spin_unlock(&target->spinlock);
 
-		__dlm_queue_ast(dlm, target);
+		__dlm_lockres_reserve_ast(res);
+		dlm_queue_ast(dlm, target);
 		/* go back and check for more */
 		goto converting;
 	}
@@ -316,8 +353,10 @@
 			continue;
 		if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
 			can_grant = 0;
-			if (lock->ml.highest_blocked == LKM_IVMODE)
-				__dlm_queue_bast(dlm, lock);
+			if (lock->ml.highest_blocked == LKM_IVMODE) {
+				__dlm_lockres_reserve_ast(res);
+				dlm_queue_bast(dlm, lock);
+			}
 			if (lock->ml.highest_blocked < target->ml.type)
 				lock->ml.highest_blocked = target->ml.type;
 		}
@@ -330,8 +369,10 @@
 			continue;
 		if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
 			can_grant = 0;
-			if (lock->ml.highest_blocked == LKM_IVMODE)
-				__dlm_queue_bast(dlm, lock);
+			if (lock->ml.highest_blocked == LKM_IVMODE) {
+				__dlm_lockres_reserve_ast(res);
+				dlm_queue_bast(dlm, lock);
+			}
 			if (lock->ml.highest_blocked < target->ml.type)
 				lock->ml.highest_blocked = target->ml.type;
 		}
@@ -356,13 +397,14 @@
 		
 		spin_unlock(&target->spinlock);
 
-		__dlm_queue_ast(dlm, target);
+		__dlm_lockres_reserve_ast(res);
+		dlm_queue_ast(dlm, target);
 		/* go back and check for more */
 		goto converting;
 	}
 
 leave:
-	spin_unlock(&res->spinlock);
+	return;
 }
 
 /* must have NO locks when calling this */
@@ -423,29 +465,37 @@
 	return empty;
 }
 
+
+int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res)
+{
+	dlm_flush_asts(dlm);
+	/* still need to implement dlm_flush_lockres_asts */
+	return 0;
+}
+
+
+
 void dlm_flush_asts(dlm_ctxt *dlm)
 {
-	struct list_head *iter, *iter2;
 	dlm_lock *lock;
 	dlm_lock_resource *res;
-	LIST_HEAD(ast_tmp);
-	LIST_HEAD(bast_tmp);
 	u8 hi;
 
-	spin_lock(&dlm->spinlock);
-	list_splice_init(&dlm->pending_asts, &ast_tmp);
-	list_splice_init(&dlm->pending_basts, &bast_tmp);
-	spin_unlock(&dlm->spinlock);
-
-	list_for_each_safe(iter, iter2, &ast_tmp) {
-		lock = list_entry(iter, dlm_lock, ast_list);
+	spin_lock(&dlm->ast_lock);
+	while (!list_empty(&dlm->pending_asts)) {
+		lock = list_entry(dlm->pending_asts.next, 
+				  dlm_lock, ast_list);
+		/* get an extra ref on lock */
+		dlm_lock_get(lock);
 		res = lock->lockres;
 		dlmprintk0("delivering an ast for this lockres\n");
+
 		DLM_ASSERT(lock->ast_pending);
 
-		spin_lock(&lock->spinlock);
+		/* remove from list (including ref) */
 		list_del_init(&lock->ast_list);
-		spin_unlock(&lock->spinlock);
+		dlm_lock_put(lock);
+		spin_unlock(&dlm->ast_lock);
 
 		if (lock->ml.node != dlm->node_num) {
 			if (dlm_do_remote_ast(dlm, res, lock) < 0)
@@ -453,7 +503,8 @@
 		} else
 			dlm_do_local_ast(dlm, res, lock);
 
-		spin_lock(&lock->spinlock);
+		spin_lock(&dlm->ast_lock);
+
 		/* possible that another ast was queued while
 		 * we were delivering the last one */
 		if (!list_empty(&lock->ast_list)) {
@@ -462,11 +513,18 @@
 				   "keep the ast_pending flag set.\n");
 		} else
 			lock->ast_pending = 0;
-		spin_unlock(&lock->spinlock);
+
+		/* drop the extra ref.
+		 * this may drop it completely. */
+		dlm_lock_put(lock);
+		dlm_lockres_release_ast(res);
 	}
-
-	list_for_each_safe(iter, iter2, &bast_tmp) {
-		lock = list_entry(iter, dlm_lock, bast_list);
+	
+	while (!list_empty(&dlm->pending_basts)) {
+		lock = list_entry(dlm->pending_basts.next, 
+				  dlm_lock, bast_list);
+		/* get an extra ref on lock */
+		dlm_lock_get(lock);
 		res = lock->lockres;
 
 		DLM_ASSERT(lock->bast_pending);
@@ -476,19 +534,24 @@
 		DLM_ASSERT(lock->ml.highest_blocked > LKM_IVMODE);
 		hi = lock->ml.highest_blocked;
 		lock->ml.highest_blocked = LKM_IVMODE;
+		spin_unlock(&lock->spinlock);
 
+		/* remove from list (including ref) */
 		list_del_init(&lock->bast_list);
-		spin_unlock(&lock->spinlock);
+		dlm_lock_put(lock);
+		spin_unlock(&dlm->ast_lock);
 
 		dlmprintk("delivering a bast for this lockres "
 			  "(blocked = %d\n", hi);
+		
 		if (lock->ml.node != dlm->node_num) {
 			if (dlm_send_proxy_bast(dlm, res, lock, hi) < 0)
 				dlmprintk0("eeek\n");
 		} else
 			dlm_do_local_bast(dlm, res, lock, hi);
 		
-		spin_lock(&lock->spinlock);
+		spin_lock(&dlm->ast_lock);
+
 		/* possible that another bast was queued while
 		 * we were delivering the last one */
 		if (!list_empty(&lock->bast_list)) {
@@ -497,15 +560,22 @@
 				   "keep the bast_pending flag set.\n");
 		} else
 			lock->bast_pending = 0;
-		spin_unlock(&lock->spinlock);
+		
+		/* drop the extra ref.
+		 * this may drop it completely. */
+		dlm_lock_put(lock);
+		dlm_lockres_release_ast(res);
 	}
+	spin_unlock(&dlm->ast_lock);
 }
 
+
 #define DLM_THREAD_TIMEOUT_MS (4 * 1000)
+#define DLM_THREAD_MAX_DIRTY  100
+#define DLM_THREAD_MAX_ASTS   10
 
 static int dlm_thread(void *data)
 {
-	struct list_head *iter, *tmpiter;
 	dlm_lock_resource *res;
 	dlm_ctxt *dlm = data;
 	unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
@@ -513,33 +583,91 @@
 	dlmprintk("dlm thread running for %s...\n", dlm->name);
 
 	while (!kthread_should_stop()) {
+		int n = DLM_THREAD_MAX_DIRTY;
 
 		dlm_run_purge_list(dlm);
 
 		down_read(&dlm->recovery_sem);
+
+		/* this will now do the dlm_shuffle_lists
+		 * while the dlm->spinlock is unlocked */
 		spin_lock(&dlm->spinlock);
-		list_for_each_safe(iter, tmpiter, &dlm->dirty_list) {
-			res = list_entry(iter, dlm_lock_resource, dirty);
-
+		while (!list_empty(&dlm->dirty_list)) {
+			int delay = 0;
+			res = list_entry(dlm->dirty_list.next, 
+					 dlm_lock_resource, dirty);
+			
+			/* peel a lockres off, remove it from the list,
+			 * unset the dirty flag and drop the dlm lock */
 			DLM_ASSERT(res);
+			dlm_lockres_get(res);
+			
 			spin_lock(&res->spinlock);
+			res->state &= ~DLM_LOCK_RES_DIRTY;
 			list_del_init(&res->dirty);
-			res->state &= ~DLM_LOCK_RES_DIRTY;
-			BUG_ON(res->owner != dlm->node_num);
 			spin_unlock(&res->spinlock);
+			spin_unlock(&dlm->spinlock);
 
+
+			spin_lock(&res->spinlock);
+			DLM_ASSERT(!(res->state & DLM_LOCK_RES_MIGRATING));
+			DLM_ASSERT(!(res->state & DLM_LOCK_RES_RECOVERING));
+			DLM_ASSERT(res->owner == dlm->node_num);
+			
+			if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
+				/* move it to the tail and keep going */
+				spin_unlock(&res->spinlock);
+				dlmprintk("delaying list shuffling for in-"
+					  "progress lockres %.*s\n",
+					  res->lockname.len, 
+					  res->lockname.name);
+				delay = 1;
+				goto in_progress;
+			}
+
+			/* at this point the lockres is not migrating/
+			 * recovering/in-progress.  we have the lockres
+			 * spinlock and do NOT have the dlm lock.
+			 * safe to reserve/queue asts and run the lists. */
+		
 			dlmprintk("calling dlm_shuffle_lists with "
-				  "dlm=%p, res=%p\n", dlm, res);
+	  			  "dlm=%p, res=%p\n", dlm, res);
+		
+			/* called while holding lockres lock */
 			dlm_shuffle_lists(dlm, res);
+			spin_unlock(&res->spinlock);
 
-			spin_lock(&res->spinlock);
-			__dlm_lockres_calc_usage(dlm, res);
-			spin_unlock(&res->spinlock);
+			dlm_lockres_calc_usage(dlm, res);
+
+in_progress:	
+
+			spin_lock(&dlm->spinlock);
+			/* if the lock was in-progress, stick
+			 * it on the back of the list */
+			if (delay) {
+				spin_lock(&res->spinlock);
+				list_add_tail(&res->dirty, &dlm->dirty_list);
+				res->state |= DLM_LOCK_RES_DIRTY;
+				spin_unlock(&res->spinlock);
+			}
+			dlm_lockres_put(res);
+			
+			/* unlikely, but we may need to give time to
+			 * other tasks */
+			if (!--n) {
+				dlmprintk0("throttling dlm_thread\n");
+				break;
+			}
 		}
+
 		spin_unlock(&dlm->spinlock);
 		dlm_flush_asts(dlm);
 		up_read(&dlm->recovery_sem);
 
+		/* no need to sleep if we know there is more work to do */
+		if (!n)
+			continue;
+
 		wait_event_interruptible_timeout(dlm->dlm_thread_wq,
 						 !dlm_dirty_list_empty(dlm) ||
 						 kthread_should_stop(),

Modified: trunk/fs/ocfs2/dlm/dlmunlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmunlock.c	2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmunlock.c	2005-04-09 00:16:35 UTC (rev 2129)
@@ -76,6 +76,7 @@
  *   taken:         res->spinlock and lock->spinlock taken and dropped
  *   held on exit:  none
  * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
+ * all callers should have taken an extra ref on lock coming in
  */
 static dlm_status dlmunlock_common(dlm_ctxt *dlm, dlm_lock_resource *res, 
 				   dlm_lock *lock, dlm_lockstatus *lksb, 
@@ -160,10 +161,21 @@
 		spin_lock(&lock->spinlock);
 	}
 
-	if (actions & DLM_UNLOCK_REMOVE_LOCK)
+	/* get an extra ref on lock.  if we are just switching 
+	 * lists here, we dont want the lock to go away. */
+	dlm_lock_get(lock);
+
+	if (actions & DLM_UNLOCK_REMOVE_LOCK) {
 		list_del_init(&lock->list);
-	if (actions & DLM_UNLOCK_REGRANT_LOCK)
+		dlm_lock_put(lock);
+	}
+	if (actions & DLM_UNLOCK_REGRANT_LOCK) {
+		dlm_lock_get(lock);
 		list_add_tail(&lock->list, &res->granted);
+	}
+	
+	/* remove the extra ref on lock */
+	dlm_lock_put(lock);
 
 leave:
 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
@@ -171,13 +183,15 @@
 	spin_unlock(&res->spinlock);
 	wake_up(&res->wq);
 
+	/* let the caller's final dlm_lock_put handle the actual kfree
+	 * NOTE: this silly block and the FREE_LOCK constant
+	 * can go once the lock refcounting stuff is tested */
 	if (actions & DLM_UNLOCK_FREE_LOCK) {
-#warning this can corrupt memory!
-		/* XXX If this lock has a bast pending, then we've
-		 * just free'd memory that the dlmthread will be
-		 * referencing... BAAAAD! */
-		kfree(lock);
-		lksb->lockid = NULL;
+		/* this should always be coupled with list removal */
+		DLM_ASSERT(actions & DLM_UNLOCK_REMOVE_LOCK);
+		dlmprintk("lock %llu should be gone now! refs=%d\n",
+			  lock->ml.cookie, 
+			  atomic_read(&lock->lock_refs.refcount));
 	}
 	if (actions & DLM_UNLOCK_CALL_AST)
 		*call_ast = 1;
@@ -358,6 +372,7 @@
 			lock = list_entry(iter, dlm_lock, list);
 			if (lock->ml.cookie == unlock->cookie &&
 		    	    lock->ml.node == unlock->node_idx) {
+				dlm_lock_get(lock);
 				found = 1;
 				break;
 			}
@@ -399,6 +414,7 @@
 	else {
 		/* send the lksb->status back to the other node */
 		status = lksb->status;
+		dlm_lock_put(lock);
 	}
 
 leave:
@@ -498,6 +514,8 @@
 
 	lock = lksb->lockid;
 	DLM_ASSERT(lock);
+	dlm_lock_get(lock);
+
 	res = lock->lockres;
 	DLM_ASSERT(res);
 	dlm_lockres_get(res);
@@ -531,6 +549,8 @@
 		 * may be happening on another node. Perhaps the
 		 * proper solution is to queue up requests on the
 		 * other end? */
+
+		/* do we want to yield(); ?? */
 		msleep(50);
 
 		dlmprintk0("retrying unlock due to pending recovery/"
@@ -551,6 +571,7 @@
 
 	dlm_lockres_calc_usage(dlm, res);
 	dlm_lockres_put(res);
+	dlm_lock_put(lock);
 
 	dlmprintk("returning status=%d!\n", status);
 	return status;



More information about the Ocfs2-commits mailing list