[Ocfs2-commits] khackel commits r2417 - trunk/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Wed Jun 22 00:51:19 CDT 2005


Author: khackel
Signed-off-by: mfasheh
Date: 2005-06-22 00:51:17 -0500 (Wed, 22 Jun 2005)
New Revision: 2417

Modified:
   trunk/fs/ocfs2/dlm/dlmast.c
   trunk/fs/ocfs2/dlm/dlmcommon.h
   trunk/fs/ocfs2/dlm/dlmconvert.c
   trunk/fs/ocfs2/dlm/dlmdebug.c
   trunk/fs/ocfs2/dlm/dlmdomain.c
   trunk/fs/ocfs2/dlm/dlmlock.c
   trunk/fs/ocfs2/dlm/dlmmaster.c
   trunk/fs/ocfs2/dlm/dlmrecovery.c
   trunk/fs/ocfs2/dlm/dlmthread.c
   trunk/fs/ocfs2/dlm/dlmunlock.c
Log:
* big recovery commit
* fixes bug 406, where killing a node in a 4 node cluster would cause
  recovery to hang on the surviving nodes
* further fixes bug 379, lock mastery during node death failed to recover
* replace recovery rwsem with waitqueue.  recovery thread should never
  block for anything.  block new callers of dlmlock() on recovery and
  deal with rollback of callers already in progress.  adds
  dlm_begin_recovery/dlm_end_recovery/dlm_wait_for_recovery.
* do not block dlm_thread on recovery because other nodes need to get
  to the superblock lock and other locks right away.  this seems to
  result in a delay in the final ast sent, and I will have to prune
  these entries from the ast/bast lists if the new tcp changes do not
  detect the node death quickly enough.
* split out the dlm_kick_thread function.  can now dirty a lockres
  while locked, and can dirty a thread without immediately kicking
  the dlm_thread (to batch up many dirty lockreses).
* allow for any locally mastered lockres to be dirtied, under the
  assumption that its migrating/recovering/in-progress state will
  be cleared within a short time
* added 4 pending flags to dlm_lock to more adequately track the in-
  progress states of lock resources.  this helps revert (for convert
  and lock) or commit (cancel, unlock) the pending action on the lock
  when the master dies.  this occurs before sending the lock state to
  the new master node.
* return correct error from dlmconvert_remote when caller attempts to
  convert an already-converting lock
* filter network errors in several net message paths and handle them
  appropriately instead of returning generic error code (caller could
  not distinguish between error types)
* added debug code to dump one lockres only
* properly implements restarting of lock mastery when node death
  occurs.  added logic to avoid extra messaging in every possible
  scenario.
* replace one instance of lockres list++ iteration with a new
  dlm_list_idx_to_ptr function.  need to change all other cases of
  similar usage eventually.
* in dlm_move_lockres_to_recovery_list, do commit or rollback of
  pending convert, lock, cancel and unlock calls
* in dlm_finish_local_lockres_recovery, use proper call to change owner
  and do not kick dlm_thread yet.  wait until all lockreses have
  been run to kick the thread, avoiding a hang.
* in dlm_free_dead_locks, make sure to dirty the lockres.

Signed-off-by: mfasheh



Modified: trunk/fs/ocfs2/dlm/dlmast.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmast.c	2005-06-21 22:03:49 UTC (rev 2416)
+++ trunk/fs/ocfs2/dlm/dlmast.c	2005-06-22 05:51:17 UTC (rev 2417)
@@ -306,8 +306,6 @@
 	DLM_ASSERT(res->owner != dlm->node_num);
 
 	mlog(0, "lockres %.*s\n", res->lockname.len, res->lockname.name);
-	if (!dlm_is_recovery_lock(past->name, past->namelen))
-		down_read(&dlm->recovery_sem);
 
 	spin_lock(&res->spinlock);
 	if (res->state & DLM_LOCK_RES_RECOVERING) {
@@ -348,8 +346,6 @@
 	ret = DLM_NORMAL;
 unlock_out:
 	spin_unlock(&res->spinlock);
-	if (!dlm_is_recovery_lock(past->name, past->namelen))
-		up_read(&dlm->recovery_sem);
 	goto leave;
 
 do_ast:
@@ -382,9 +378,6 @@
 	else
 		dlm_do_local_bast(dlm, res, lock, past->blocked_type);
 
-	if (!dlm_is_recovery_lock(past->name, past->namelen))
-		up_read(&dlm->recovery_sem);
-
 leave:
 
 	if (res)

Modified: trunk/fs/ocfs2/dlm/dlmcommon.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmcommon.h	2005-06-21 22:03:49 UTC (rev 2416)
+++ trunk/fs/ocfs2/dlm/dlmcommon.h	2005-06-22 05:51:17 UTC (rev 2417)
@@ -66,6 +66,7 @@
 	return 0;
 }
 
+#define DLM_RECO_STATE_ACTIVE  0x0001
 
 typedef struct _dlm_recovery_ctxt
 {
@@ -74,7 +75,9 @@
 	struct list_head node_data;
 	u8  new_master;
 	u8  dead_node;
+	u16 state;
 	unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+	wait_queue_head_t event;
 } dlm_recovery_ctxt;
 
 typedef enum _dlm_ctxt_state {
@@ -271,7 +274,11 @@
 	void *astdata;
 	dlm_lockstatus *lksb;
 	unsigned ast_pending:1,
-		 bast_pending:1;
+		 bast_pending:1,
+		 convert_pending:1,
+		 lock_pending:1,
+		 cancel_pending:1,
+		 unlock_pending:1;
 };
 
 
@@ -284,6 +291,29 @@
 #define DLM_LKSB_UNUSED5           0x40
 #define DLM_LKSB_UNUSED6           0x80
 
+
+enum dlm_lockres_list {
+	DLM_GRANTED_LIST = 0,
+	DLM_CONVERTING_LIST,
+	DLM_BLOCKED_LIST
+};
+
+static inline struct list_head * dlm_list_idx_to_ptr(dlm_lock_resource *res,
+						     enum dlm_lockres_list idx)
+{
+	struct list_head *ret = NULL;
+	if (idx == DLM_GRANTED_LIST)
+		ret = &res->granted;
+	else if (idx == DLM_CONVERTING_LIST)
+		ret = &res->converting;
+	else if (idx == DLM_BLOCKED_LIST)
+		ret = &res->blocked;
+	else
+		BUG();
+	return ret;
+}
+
+
 enum dlm_mle_type {
 	DLM_MLE_BLOCK,
 	DLM_MLE_MASTER,
@@ -824,7 +854,12 @@
 int dlm_convert_lock_handler(o2net_msg *msg, u32 len, void *data);
 int dlm_proxy_ast_handler(o2net_msg *msg, u32 len, void *data);
 
+void dlm_revert_pending_convert(dlm_lock_resource *res, dlm_lock *lock);
+void dlm_revert_pending_lock(dlm_lock_resource *res, dlm_lock *lock);
+
 int dlm_unlock_lock_handler(o2net_msg *msg, u32 len, void *data);
+void dlm_commit_pending_cancel(dlm_lock_resource *res, dlm_lock *lock);
+void dlm_commit_pending_unlock(dlm_lock_resource *res, dlm_lock *lock);
 
 void dlm_shuffle_lists(dlm_ctxt *dlm, dlm_lock_resource *res);
 int dlm_launch_thread(dlm_ctxt *dlm);
@@ -833,6 +868,7 @@
 int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res);
 int dlm_launch_recovery_thread(dlm_ctxt *dlm);
 void dlm_complete_recovery_thread(dlm_ctxt *dlm);
+void dlm_wait_for_recovery(dlm_ctxt *dlm);
 
 void dlm_get(dlm_ctxt *dlm);
 void dlm_put(dlm_ctxt *dlm);
@@ -854,6 +890,7 @@
 				    const char *name,
 				    unsigned int len);
 
+int dlm_is_host_down(int errno);
 void dlm_change_lockres_owner(dlm_ctxt *dlm, dlm_lock_resource *res, u8 owner);
 dlm_lock_resource * dlm_get_lock_resource(dlm_ctxt *dlm, 
 					  const char *lockid,
@@ -887,9 +924,13 @@
 				      0, flags);
 }
 
+void dlm_print_one_lock_resource(dlm_lock_resource *res);
+void __dlm_print_one_lock_resource(dlm_lock_resource *res);
+
 u8 dlm_nm_this_node(dlm_ctxt *dlm);
 void dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res);
 void __dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res);
+void __dlm_dirty_lockres(dlm_ctxt *dlm, dlm_lock_resource *res);
 	
 
 int dlm_nm_init(dlm_ctxt *dlm);

Modified: trunk/fs/ocfs2/dlm/dlmconvert.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmconvert.c	2005-06-21 22:03:49 UTC (rev 2416)
+++ trunk/fs/ocfs2/dlm/dlmconvert.c	2005-06-22 05:51:17 UTC (rev 2417)
@@ -234,6 +234,15 @@
 	return status;
 }
 
+void dlm_revert_pending_convert(dlm_lock_resource *res, dlm_lock *lock)
+{
+	/* do not alter lock refcount.  switching lists. */
+	list_del_init(&lock->list);
+	list_add_tail(&lock->list, &res->granted);
+	lock->ml.convert_type = LKM_IVMODE;
+	lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
+}
+
 /* messages the master site to do lock conversion
  * locking:
  *   caller needs:  none
@@ -251,6 +260,9 @@
 
 	spin_lock(&res->spinlock);
 	if (res->state & DLM_LOCK_RES_RECOVERING) {
+		mlog(0, "bailing out early since res is RECOVERING "
+		     "on secondary queue\n");
+		/* __dlm_print_one_lock_resource(res); */
 		status = DLM_RECOVERING;
 		goto bail;
 	}
@@ -263,11 +275,14 @@
 	/* do not alter lock refcount.  switching lists. */
 	list_del_init(&lock->list);
 	list_add_tail(&lock->list, &res->converting);
+	lock->convert_pending = 1;
 	if (lock->ml.convert_type != LKM_IVMODE) {
+		__dlm_print_one_lock_resource(res);
 		mlog(ML_ERROR, "converting a remote lock that is already "
-		     "converting!\n");
-		/* TODO: return correct error */
-		BUG();
+		     "converting! (cookie=%llu, conv=%d)\n",
+		     lock->ml.cookie, lock->ml.convert_type);
+		status = DLM_DENIED;
+		goto denied;
 	}
 	lock->ml.convert_type = type;
 
@@ -291,16 +306,12 @@
 	status = dlm_send_remote_convert_request(dlm, res, lock, flags, type);
 	
 	spin_lock(&res->spinlock);
+denied:
 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
-
+	lock->convert_pending = 0;
 	/* if it failed, move it back to granted queue */
-	if (status != DLM_NORMAL) {
-		/* do not alter lock refcount.  switching lists. */
-		list_del_init(&lock->list);
-		list_add_tail(&lock->list, &res->granted);
-		lock->ml.convert_type = LKM_IVMODE;
-		lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
-	}
+	if (status != DLM_NORMAL)
+		dlm_revert_pending_convert(res, lock);
 bail:
 	spin_unlock(&res->spinlock);
 
@@ -367,7 +378,13 @@
 		}
 	} else {
 		mlog_errno(tmpret);
-		ret = dlm_err_to_dlm_status(tmpret);
+		if (dlm_is_host_down(tmpret)) {
+			ret = DLM_RECOVERING;
+			mlog(0, "node %u died so returning DLM_RECOVERING "
+			     "from convert message!\n", res->owner);
+		} else {
+			ret = dlm_err_to_dlm_status(tmpret);
+		}
 	}
 
 	return ret;

Modified: trunk/fs/ocfs2/dlm/dlmdebug.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmdebug.c	2005-06-21 22:03:49 UTC (rev 2416)
+++ trunk/fs/ocfs2/dlm/dlmdebug.c	2005-06-22 05:51:17 UTC (rev 2417)
@@ -51,7 +51,13 @@
 static void dlm_dump_purge_list(dlm_ctxt *dlm);
 static int dlm_dump_all_purge_lists(const char __user *data, unsigned int len);
 static int dlm_trigger_migration(const char __user *data, unsigned int len);
+static int dlm_dump_one_lock_resource(const char __user *data, 
+				       unsigned int len);
 
+static int dlm_parse_domain_and_lockres(char *buf, unsigned int len,
+					dlm_ctxt **dlm,
+					dlm_lock_resource **res);
+
 typedef int (dlm_debug_func_t)(const char __user *data, unsigned int len);
 
 typedef struct _dlm_debug_funcs
@@ -62,6 +68,7 @@
 
 static dlm_debug_funcs dlm_debug_map[] = {
 	{ 'r', dlm_dump_all_lock_resources },
+	{ 'R', dlm_dump_one_lock_resource },
 	{ 'm', dlm_dump_all_mles },
 	{ 'p', dlm_dump_all_purge_lists  },
 	{ 'M', dlm_trigger_migration },
@@ -133,15 +140,82 @@
 	return len;
 }
 
-static void dlm_print_one_lock_resource(dlm_lock_resource *res)
+static int dlm_dump_one_lock_resource(const char __user *data, 
+				       unsigned int len)
 {
+	dlm_ctxt *dlm;
+	dlm_lock_resource *res;
+	char *buf = NULL;
+	int ret = -EINVAL;
+	int tmpret;
+
+	if (len >= PAGE_SIZE-1) {
+		mlog(ML_ERROR, "user passed too much data: %d bytes\n", len);
+		goto leave;
+	}
+	if (len < 5) {
+		mlog(ML_ERROR, "user passed too little data: %d bytes\n", len);
+		goto leave;
+	}
+	buf = kmalloc(len+1, GFP_KERNEL);
+	if (!buf) {
+		mlog(ML_ERROR, "could not alloc %d bytes\n", len+1);
+		ret = -ENOMEM;
+		goto leave;
+	}
+	if (strncpy_from_user(buf, data, len) < len) {
+		mlog(ML_ERROR, "failed to get all user data.  done.\n");
+		goto leave;
+	}
+	buf[len]='\0';
+	mlog(0, "got this data from user: %s\n", buf);
+
+	if (*buf != 'R') {
+		mlog(0, "bad data\n");
+		goto leave;
+	}
+
+	tmpret = dlm_parse_domain_and_lockres(buf, len, &dlm, &res);
+	if (tmpret < 0) {
+		mlog(0, "bad data\n");
+		goto leave;
+	}
+
+	mlog(ML_NOTICE, "dlm_ctxt: %s, node=%u, key=%u\n",
+		dlm->name, dlm->node_num, dlm->key);
+	
+	dlm_print_one_lock_resource(res);
+	dlm_lockres_put(res);
+	dlm_put(dlm);
+	ret = len;
+
+leave:
+	if (buf)
+		kfree(buf);
+	return ret;
+}
+
+
+void dlm_print_one_lock_resource(dlm_lock_resource *res)
+{
+	mlog(ML_NOTICE, "lockres: %.*s, owner=%u, state=%u\n", 
+	       res->lockname.len, res->lockname.name, 
+	       res->owner, res->state);
+	spin_lock(&res->spinlock);
+	__dlm_print_one_lock_resource(res);
+	spin_unlock(&res->spinlock);
+}
+
+void __dlm_print_one_lock_resource(dlm_lock_resource *res)
+{
 	struct list_head *iter2;
 	dlm_lock *lock;
 
+	assert_spin_locked(&res->spinlock);
+
 	mlog(ML_NOTICE, "lockres: %.*s, owner=%u, state=%u\n", 
 	       res->lockname.len, res->lockname.name, 
 	       res->owner, res->state);
-	spin_lock(&res->spinlock);
 	mlog(ML_NOTICE, "  granted queue: \n");
 	list_for_each(iter2, &res->granted) {
 		lock = list_entry(iter2, dlm_lock, list);
@@ -172,9 +246,9 @@
 		       lock->ml.cookie);
 		spin_unlock(&lock->spinlock);
 	}
-	spin_unlock(&res->spinlock);
 }
 
+
 void dlm_print_one_lock(dlm_lock *lockid)
 {
 	dlm_print_one_lock_resource(lockid->lockres);
@@ -240,42 +314,19 @@
 	return len;
 }
 
-static int dlm_trigger_migration(const char __user *data, unsigned int len)
+static int dlm_parse_domain_and_lockres(char *buf, unsigned int len,
+					dlm_ctxt **dlm,
+					dlm_lock_resource **res)
 {
-	dlm_lock_resource *res;
-	dlm_ctxt *dlm;
 	char *resname;
 	char *domainname;
-	char *tmp, *buf = NULL;
+	char *tmp;
 	int ret = -EINVAL;
-	int tmpret;
 
-	if (len >= PAGE_SIZE-1) {
-		mlog(ML_ERROR, "user passed too much data: %d bytes\n", len);
-		goto leave;
-	}
-	if (len < 5) {
-		mlog(ML_ERROR, "user passed too little data: %d bytes\n", len);
-		goto leave;
-	}
-	buf = kmalloc(len+1, GFP_KERNEL);
-	if (!buf) {
-		mlog(ML_ERROR, "could not alloc %d bytes\n", len+1);
-		ret = -ENOMEM;
-		goto leave;
-	}
-	if (strncpy_from_user(buf, data, len) < len) {
-		mlog(ML_ERROR, "failed to get all user data.  done.\n");
-		goto leave;
-	}
-	buf[len]='\0';
-	mlog(0, "got this data from user: %s\n", buf);
+	*dlm = NULL;
+	*res = NULL;
 
 	tmp = buf;
-	if (*tmp != 'M') {
-		mlog(0, "bad data\n");
-		goto leave;
-	}
 	tmp++;
 	if (*tmp != ' ') {
 		mlog(0, "bad data\n");
@@ -310,27 +361,74 @@
 	mlog(0, "now looking up domain %s, lockres %s\n",
 	       domainname, resname);
 	spin_lock(&dlm_domain_lock);
-	dlm = __dlm_lookup_domain(domainname);
+	*dlm = __dlm_lookup_domain(domainname);
 	spin_unlock(&dlm_domain_lock);
 
-	if (!dlm_grab(dlm)) {
+	if (!dlm_grab(*dlm)) {
 		mlog(ML_ERROR, "bad dlm!\n");
+		*dlm = NULL;
 		goto leave;
 	}
 
-	res = dlm_lookup_lockres(dlm, resname, strlen(resname));
-	if (!res) {
+	*res = dlm_lookup_lockres(*dlm, resname, strlen(resname));
+	if (!*res) {
 		mlog(ML_ERROR, "bad lockres!\n");
-		dlm_put(dlm);
+		dlm_put(*dlm);
+		*dlm = NULL;
 		goto leave;
 	}
 
-	mlog(0, "found dlm=%p, lockres=%p\n", dlm, res);
+	mlog(0, "found dlm=%p, lockres=%p\n", *dlm, *res);
+	ret = 0;
+
+leave:
+	return ret;
+}
+
+static int dlm_trigger_migration(const char __user *data, unsigned int len)
+{
+	dlm_lock_resource *res;
+	dlm_ctxt *dlm;
+	char *buf = NULL;
+	int ret = -EINVAL;
+	int tmpret;
+
+	if (len >= PAGE_SIZE-1) {
+		mlog(ML_ERROR, "user passed too much data: %d bytes\n", len);
+		goto leave;
+	}
+	if (len < 5) {
+		mlog(ML_ERROR, "user passed too little data: %d bytes\n", len);
+		goto leave;
+	}
+	buf = kmalloc(len+1, GFP_KERNEL);
+	if (!buf) {
+		mlog(ML_ERROR, "could not alloc %d bytes\n", len+1);
+		ret = -ENOMEM;
+		goto leave;
+	}
+	if (strncpy_from_user(buf, data, len) < len) {
+		mlog(ML_ERROR, "failed to get all user data.  done.\n");
+		goto leave;
+	}
+	buf[len]='\0';
+	mlog(0, "got this data from user: %s\n", buf);
+
+	if (*buf != 'M') {
+		mlog(0, "bad data\n");
+		goto leave;
+	}
+
+	tmpret = dlm_parse_domain_and_lockres(buf, len, &dlm, &res);
+	if (tmpret < 0) {
+		mlog(0, "bad data\n");
+		goto leave;
+	}
 	tmpret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
 	mlog(0, "dlm_migrate_lockres returned %d\n", tmpret);
 	if (tmpret < 0)
-		mlog(ML_ERROR, "failed to migrate %s: %d\n", 
-		     resname, tmpret);
+		mlog(ML_ERROR, "failed to migrate %.*s: %d\n", 
+		     res->lockname.len, res->lockname.name, tmpret);
 	dlm_lockres_put(res);
 	dlm_put(dlm);
 	ret = len;

Modified: trunk/fs/ocfs2/dlm/dlmdomain.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmdomain.c	2005-06-21 22:03:49 UTC (rev 2416)
+++ trunk/fs/ocfs2/dlm/dlmdomain.c	2005-06-22 05:51:17 UTC (rev 2417)
@@ -1216,6 +1216,7 @@
 	INIT_LIST_HEAD(&dlm->reco.node_data);
 	INIT_LIST_HEAD(&dlm->purge_list);
 	INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
+	dlm->reco.state = 0;
 
 	INIT_LIST_HEAD(&dlm->pending_asts);
 	INIT_LIST_HEAD(&dlm->pending_basts);
@@ -1231,6 +1232,7 @@
 	dlm->dlm_reco_thread_task = NULL;
 	init_waitqueue_head(&dlm->dlm_thread_wq);
 	init_waitqueue_head(&dlm->dlm_reco_thread_wq);
+	init_waitqueue_head(&dlm->reco.event);
 	INIT_LIST_HEAD(&dlm->master_list);
 	INIT_LIST_HEAD(&dlm->mle_hb_events);
 	init_rwsem(&dlm->recovery_sem);

Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c	2005-06-21 22:03:49 UTC (rev 2416)
+++ trunk/fs/ocfs2/dlm/dlmlock.c	2005-06-22 05:51:17 UTC (rev 2417)
@@ -38,6 +38,7 @@
 #include <linux/socket.h>
 #include <linux/inet.h>
 #include <linux/spinlock.h>
+#include <linux/delay.h>
 
 
 #include "cluster/heartbeat.h"
@@ -165,6 +166,14 @@
 	return status;
 }
 
+void dlm_revert_pending_lock(dlm_lock_resource *res, dlm_lock *lock)
+{
+	/* remove from local queue if it failed */
+	list_del_init(&lock->list);
+	lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
+}
+
+
 /* 
  * locking:
  *   caller needs:  none
@@ -190,6 +199,7 @@
 	/* add lock to local (secondary) queue */
 	dlm_lock_get(lock);
 	list_add_tail(&lock->list, &res->blocked);
+	lock->lock_pending = 1;
 	spin_unlock(&res->spinlock);
 
 	/* spec seems to say that you will get DLM_NORMAL when the lock 
@@ -198,10 +208,9 @@
 
 	spin_lock(&res->spinlock);
 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
+	lock->lock_pending = 0;
 	if (status != DLM_NORMAL) {
-		/* remove from local queue if it failed */
-		list_del_init(&lock->list);
-		lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
+		dlm_revert_pending_lock(res, lock);
 		dlm_lock_put(lock);
 	}
 	spin_unlock(&res->spinlock);
@@ -246,7 +255,13 @@
 		ret = status;  // this is already a dlm_status
 	} else {
 		mlog_errno(tmpret);
-		ret = dlm_err_to_dlm_status(tmpret);
+		if (dlm_is_host_down(tmpret)) {
+			ret = DLM_RECOVERING;
+			mlog(0, "node %u died so returning DLM_RECOVERING "
+			     "from lock message!\n", res->owner);
+		} else {
+			ret = dlm_err_to_dlm_status(tmpret);
+		}
 	}
 
 	return ret;
@@ -328,6 +343,11 @@
 	newlock->ml.cookie = cookie;
 	newlock->ast_pending = 0;
 	newlock->bast_pending = 0;
+	newlock->convert_pending = 0;
+	newlock->lock_pending = 0;
+	newlock->unlock_pending = 0;
+	newlock->cancel_pending = 0;
+
 	kref_init(&newlock->lock_refs, dlm_lock_release);
 }
 
@@ -522,7 +542,7 @@
 			goto error;
 		}
 retry_convert:
-		down_read(&dlm->recovery_sem);
+		dlm_wait_for_recovery(dlm);
 
 		if (res->owner == dlm->node_num)
 			status = dlmconvert_master(dlm, res, lock, flags, mode);
@@ -536,8 +556,7 @@
 			 * no waiting will be necessary */
 			mlog(0, "retrying convert with migration/recovery/"
 			     "in-progress\n");
-			up_read(&dlm->recovery_sem);
-			yield();
+			msleep(100);
 			goto retry_convert;
 		}
 	} else {
@@ -558,13 +577,13 @@
 			goto error;
 
 		if (!recovery)
-			down_read(&dlm->recovery_sem);
+			dlm_wait_for_recovery(dlm);
 
 		/* find or create the lock resource */
 		res = dlm_get_lock_resource(dlm, name, flags);
 		if (!res) {
 			status = DLM_IVLOCKID;
-			goto up_error;
+			goto error;
 		}
 
 		mlog(0, "type=%d, flags = 0x%x\n", mode, flags);
@@ -598,22 +617,17 @@
 		    status == DLM_FORWARD) {
 			mlog(0, "retrying lock with migration/"
 			     "recovery/in progress\n");
-			up_read(&dlm->recovery_sem);
-			yield();
-			down_read(&dlm->recovery_sem);
+			msleep(100);
+			dlm_wait_for_recovery(dlm);
 			goto retry_lock;
 		}
 
 		if (status != DLM_NORMAL) {
 			lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
-			goto up_error;
+			goto error;
 		}
 	}
 
-up_error:
-	if (!recovery)
-		up_read(&dlm->recovery_sem);
-
 error:
 	if (status != DLM_NORMAL) {
 		if (lock && !convert)

Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c	2005-06-21 22:03:49 UTC (rev 2416)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c	2005-06-22 05:51:17 UTC (rev 2417)
@@ -150,7 +150,7 @@
 static void dlm_mark_lockres_migrating(dlm_ctxt *dlm, dlm_lock_resource *res);
 
 
-static int dlm_is_host_down(int errno)
+int dlm_is_host_down(int errno)
 {
 	switch (errno) {
 		case -EBADF:
@@ -731,6 +731,8 @@
 }
 
 
+#define DLM_MASTERY_TIMEOUT_MS   5000
+
 static int dlm_wait_for_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res, 
 				     dlm_master_list_entry *mle,
 				     int blocked)
@@ -798,8 +800,18 @@
 
 	/* sleep if we haven't finished voting yet */
 	if (sleep) {
+		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
+
+		/*
+		if (atomic_read(&mle->mle_refs.refcount) < 2)
+			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle, 
+			atomic_read(&mle->mle_refs.refcount), 
+			res->lockname.len, res->lockname.name);
+		*/
 		atomic_set(&mle->woken, 0);
-		wait_event(mle->wq, (atomic_read(&mle->woken) == 1));
+		(void)wait_event_timeout(mle->wq, 
+					 (atomic_read(&mle->woken) == 1), 
+					 timeo);
 		if (res->owner == O2NM_MAX_NODES) {
 			mlog(0, "waiting again\n");
 			goto recheck;
@@ -931,7 +943,11 @@
 		} else {
 			mlog(ML_ERROR, "node down! %d\n", node);
 		
-			/* if the node wasn't involved in mastery skip it */
+			/* if the node wasn't involved in mastery skip it,
+			 * but clear it out from the maps so that it will
+			 * not affect mastery of this lockres */
+			clear_bit(node, mle->response_map);
+			clear_bit(node, mle->vote_map);
 			if (!test_bit(node, mle->maybe_map))
 				goto next;
 
@@ -941,11 +957,9 @@
 			if (blocked) {
 				int lowest = find_next_bit(mle->maybe_map, 
 						       O2NM_MAX_NODES, 0);
-		
+
 				/* act like it was never there */
 				clear_bit(node, mle->maybe_map);
-				clear_bit(node, mle->vote_map);
-				clear_bit(node, mle->response_map);
 
 			       	if (node != lowest)
 					goto next;
@@ -982,14 +996,13 @@
 				goto next;
 			}
 
+			clear_bit(node, mle->maybe_map);
 			if (node > dlm->node_num)
 				goto next;
 
 			mlog(0, "dead node in map!\n");
 			/* yuck. go back and re-contact all nodes 
 			 * in the vote_map, removing this node. */
-			clear_bit(node, mle->maybe_map);
-			clear_bit(node, mle->vote_map);
 			memset(mle->response_map, 0, 
 			       sizeof(mle->response_map));
 		}

Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-06-21 22:03:49 UTC (rev 2416)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-06-22 05:51:17 UTC (rev 2417)
@@ -249,8 +249,45 @@
 	mlog(0, "quitting DLM recovery thread\n");
 	return 0;
 }
+	
+/* callers of the top-level api calls (dlmlock/dlmunlock) should
+ * block on the dlm->reco.event when recovery is in progress.  
+ * the dlm recovery thread will set this state when it begins
+ * recovering a dead node (as the new master or not) and clear
+ * the state and wake as soon as all affected lock resources have
+ * been marked with the RECOVERY flag */
+static int dlm_in_recovery(dlm_ctxt *dlm)
+{
+	int in_recovery;
+	spin_lock(&dlm->spinlock);
+	in_recovery = !!(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
+	spin_unlock(&dlm->spinlock);
+	return in_recovery;
+}
 
 
+void dlm_wait_for_recovery(dlm_ctxt *dlm)
+{
+	wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
+}
+
+static void dlm_begin_recovery(dlm_ctxt *dlm)
+{
+	spin_lock(&dlm->spinlock);
+	BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
+	dlm->reco.state |= DLM_RECO_STATE_ACTIVE;
+	spin_unlock(&dlm->spinlock);
+}
+
+static void dlm_end_recovery(dlm_ctxt *dlm)
+{
+	spin_lock(&dlm->spinlock);
+	BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE));
+	dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE;
+	spin_unlock(&dlm->spinlock);
+	wake_up(&dlm->reco.event);
+}
+
 int dlm_do_recovery(dlm_ctxt *dlm)
 {
 	int status = 0;
@@ -294,7 +331,7 @@
 
 	/* take write barrier */
 	/* (stops the list reshuffling thread, proxy ast handling) */
-	down_write(&dlm->recovery_sem);
+	dlm_begin_recovery(dlm);
 
 	if (dlm->reco.new_master == dlm->node_num)
 		goto master_here;
@@ -307,21 +344,22 @@
 			goto master_here;
 		}
 		mlog(0, "another node will master this recovery session.\n");
-	} else {
-		mlog(0, "new_master=%u, this node=%u, dead_node=%u\n", 
-		     dlm->reco.new_master, dlm->node_num, dlm->reco.dead_node);
 	}
+	mlog(0, "dlm=%s, new_master=%u, this node=%u, dead_node=%u\n", 
+	     dlm->name, dlm->reco.new_master, 
+	     dlm->node_num, dlm->reco.dead_node);
 
 	/* it is safe to start everything back up here
 	 * because all of the dead node's lock resources
 	 * have been marked as in-recovery */
-	up_write(&dlm->recovery_sem);
+	dlm_end_recovery(dlm);
 
 	/* sleep out in main dlm_recovery_thread loop. */
 	return 0;
 
 master_here:
-	mlog(0, "mastering recovery of %u here!\n", dlm->reco.dead_node);
+	mlog(0, "mastering recovery of %s:%u here(this=%u)!\n", 
+	     dlm->name, dlm->reco.dead_node, dlm->node_num);
 
 	status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
 	if (status < 0) {
@@ -331,7 +369,7 @@
 		/* success!  see if any other nodes need recovery */
 		dlm_reset_recovery(dlm);
 	}
-	up_write(&dlm->recovery_sem);
+	dlm_end_recovery(dlm);
 
 	/* continue and look for another dead node */
 	return -EAGAIN;
@@ -474,8 +512,15 @@
 							  dlm->node_num);
 			spin_unlock(&dlm->spinlock);
 			mlog(0, "should be done with recovery!\n");
+
+			mlog(0, "finishing recovery of %s at %lu, "
+			     "dead=%u, this=%u, new=%u\n", dlm->name, 
+			     jiffies, dlm->reco.dead_node, 
+			     dlm->node_num, dlm->reco.new_master);
 			destroy = 1;
 			status = ret;
+			/* rescan everything marked dirty along the way */
+			dlm_kick_thread(dlm, NULL);
 			break;
 		}
 		/* wait to be signalled, with periodic timeout
@@ -923,8 +968,8 @@
 				    mig_cookie, flags, res->owner);
 
 	total_locks = 0;
-	queue = &res->granted;
-	for (i=0; i<3; i++) {
+	for (i=DLM_GRANTED_LIST; i<=DLM_BLOCKED_LIST; i++) {
+		queue = dlm_list_idx_to_ptr(res, i);
 		list_for_each(iter, queue) {
 			lock = list_entry (iter, dlm_lock, list);
 
@@ -944,7 +989,6 @@
 				BUG();
 			}
 		}
-		queue++;
 	}
 	/* flush any remaining locks */
 	ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
@@ -958,6 +1002,7 @@
 }
 
 
+
 /*
  * this message will contain no more than one page worth of 
  * recovery data, and it will work on only one lockres.
@@ -1433,19 +1478,85 @@
 
 void dlm_move_lockres_to_recovery_list(dlm_ctxt *dlm, dlm_lock_resource *res)
 {
+	int i;
+	struct list_head *queue, *iter, *iter2;
+	dlm_lock *lock;
+	
 	res->state |= DLM_LOCK_RES_RECOVERING;
 	if (!list_empty(&res->recovering))
 		list_del_init(&res->recovering);
 	list_add_tail(&res->recovering, &dlm->reco.resources);
+
+	/* find any pending locks and put them back on proper list */
+	for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) {
+		queue = dlm_list_idx_to_ptr(res, i);
+		list_for_each_safe(iter, iter2, queue) {
+			lock = list_entry (iter, dlm_lock, list);
+			dlm_lock_get(lock);
+			if (lock->convert_pending) {
+				/* move converting lock back to granted */
+				BUG_ON(i != DLM_CONVERTING_LIST);
+				mlog(0, "node died with convert pending "
+				     "on %.*s. move back to granted list.\n",
+				     res->lockname.len, res->lockname.name);
+				dlm_revert_pending_convert(res, lock);
+				lock->convert_pending = 0;
+			} else if (lock->lock_pending) {
+				/* remove pending lock requests completely */
+				BUG_ON(i != DLM_BLOCKED_LIST);
+				mlog(0, "node died with lock pending "
+				     "on %.*s. remove from blocked list and skip.\n",
+				     res->lockname.len, res->lockname.name);
+				/* lock will be floating until ref in
+				 * dlmlock_remote is freed after the network
+				 * call returns.  ok for it to not be on any
+				 * list since no ast can be called
+				 * (the master is dead). */
+				dlm_revert_pending_lock(res, lock);
+				lock->lock_pending = 0;
+			} else if (lock->unlock_pending) {
+				/* if an unlock was in progress, treat as 
+				 * if this had completed successfully
+				 * before sending this lock state to the
+				 * new master.  note that the dlm_unlock
+				 * call is still responsible for calling 
+				 * the unlockast.  that will happen after
+				 * the network call times out.  for now, 
+				 * just move lists to prepare the new
+				 * recovery master.  */
+				BUG_ON(i != DLM_GRANTED_LIST);
+				mlog(0, "node died with unlock pending "
+				     "on %.*s. remove from blocked list and skip.\n",
+				     res->lockname.len, res->lockname.name);
+				dlm_commit_pending_unlock(res, lock);
+				lock->unlock_pending = 0;
+			} else if (lock->cancel_pending) {
+				/* if a cancel was in progress, treat as 
+				 * if this had completed successfully
+				 * before sending this lock state to the
+				 * new master */
+				BUG_ON(i != DLM_CONVERTING_LIST);
+				mlog(0, "node died with cancel pending "
+				     "on %.*s. move back to granted list.\n",
+				     res->lockname.len, res->lockname.name);
+				dlm_commit_pending_cancel(res, lock);
+				lock->cancel_pending = 0;
+			}
+			dlm_lock_put(lock);
+		}
+	}
 }
 
+
+
 /* removes all recovered locks from the recovery list.
  * sets the res->owner to the new master.
  * unsets the RECOVERY flag and wakes waiters. */
 static void dlm_finish_local_lockres_recovery(dlm_ctxt *dlm, u8 dead_node,
 					      u8 new_master)
 {
-	struct list_head *iter, *iter2;
+	int i;
+	struct list_head *iter, *iter2, *bucket;
 	dlm_lock_resource *res;
 
 	mlog_entry_void();
@@ -1457,14 +1568,47 @@
 		if (res->owner == dead_node) {
 			list_del_init(&res->recovering);
 			spin_lock(&res->spinlock);
-			res->owner = new_master;
+			dlm_change_lockres_owner(dlm, res, new_master);
 			res->state &= ~DLM_LOCK_RES_RECOVERING;
-			__dlm_kick_thread(dlm, res);
+			__dlm_dirty_lockres(dlm, res);
 			spin_unlock(&res->spinlock);
 			wake_up(&res->wq);
 		}
-		
 	}
+	
+	/* this will become unnecessary eventually, but
+	 * for now we need to run the whole hash, clear
+	 * the RECOVERING state and set the owner 
+	 * if necessary */
+	for (i=0; i<DLM_HASH_SIZE; i++) {
+		bucket = &(dlm->resources[i]);
+		list_for_each(iter, bucket) {
+			res = list_entry (iter, dlm_lock_resource, list);
+			if (res->state & DLM_LOCK_RES_RECOVERING) {
+				if (res->owner == dead_node) {
+					mlog(0, "(this=%u) res %.*s owner=%u "
+					     "was not on recovering list, but "
+					     "clearing state anyway\n", 
+					     dlm->node_num, res->lockname.len,
+					     res->lockname.name, new_master);
+				} else if (res->owner == dlm->node_num) {
+					mlog(0, "(this=%u) res %.*s owner=%u "
+					     "was not on recovering list, "
+					     "owner is THIS node, clearing\n",
+					     dlm->node_num, res->lockname.len,
+					     res->lockname.name, new_master);
+				} else 
+					continue;
+
+				spin_lock(&res->spinlock);
+				dlm_change_lockres_owner(dlm, res, new_master);
+				res->state &= ~DLM_LOCK_RES_RECOVERING;
+				__dlm_dirty_lockres(dlm, res);
+				spin_unlock(&res->spinlock);
+				wake_up(&res->wq);
+			}
+		}
+	}
 }
 
 static void dlm_free_dead_locks(dlm_ctxt *dlm, dlm_lock_resource *res,
@@ -1473,6 +1617,7 @@
 	struct list_head *iter, *tmpiter;
 	dlm_lock *lock;
 
+	assert_spin_locked(&dlm->spinlock);
 	assert_spin_locked(&res->spinlock);
 
 	/* TODO: check pending_asts, pending_basts here */
@@ -1497,6 +1642,9 @@
 			dlm_lock_put(lock);
 		}
 	}
+
+	/* do not kick thread yet */
+	__dlm_dirty_lockres(dlm, res);
 }
 
 static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node)
@@ -1639,11 +1787,15 @@
 
 static void dlm_reco_ast(void *astdata)
 {
-	mlog(0, "ast for recovery lock fired!\n");
+	dlm_ctxt *dlm = astdata;
+	mlog(0, "ast for recovery lock fired!, this=%u, dlm=%s\n",
+	     dlm->node_num, dlm->name);
 }
 static void dlm_reco_bast(void *astdata, int blocked_type)
 {
-	mlog(0, "bast for recovery lock fired!\n");
+	dlm_ctxt *dlm = astdata;
+	mlog(0, "bast for recovery lock fired!, this=%u, dlm=%s\n",
+	     dlm->node_num, dlm->name);
 }
 static void dlm_reco_unlock_ast(void *astdata, dlm_status st)
 {
@@ -1657,6 +1809,8 @@
 	dlm_lockstatus lksb;
 	int status = -EINVAL;
 
+	mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
+	     dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
 retry:
 	memset(&lksb, 0, sizeof(lksb));
 
@@ -1664,6 +1818,8 @@
 		      DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
 
 	if (ret == DLM_NORMAL) {
+		mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
+		     dlm->name, dlm->node_num);
 		/* I am master, send message to all nodes saying 
 		 * that I am beginning a recovery session */
 		status = dlm_send_begin_reco_message(dlm, 
@@ -1688,6 +1844,8 @@
 			goto retry;
 		}
 	} else if (ret == DLM_NOTQUEUED) {
+		mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
+		     dlm->name, dlm->node_num);		
 		/* another node is master. wait on 
 		 * reco.new_master != O2NM_INVALID_NODE_NUM */
 		status = -EEXIST;
@@ -1792,7 +1950,8 @@
 	int nodenum;
 	int status;
 
-	mlog(0, "finishing recovery for node %u\n", dlm->reco.dead_node);
+	mlog(0, "finishing recovery for node %s:%u\n",
+	     dlm->name, dlm->reco.dead_node);
 
 	spin_lock(&dlm->spinlock);
 	dlm_node_iter_init(dlm->domain_map, &iter);
@@ -1835,8 +1994,18 @@
 	
 	spin_lock(&dlm->spinlock);
 
-	DLM_ASSERT (dlm->reco.new_master == fr->node_idx);
-	DLM_ASSERT (dlm->reco.dead_node == fr->dead_node);
+	if (dlm->reco.new_master != fr->node_idx) {
+		mlog(ML_ERROR, "node %u sent recovery finalize msg, but node "
+		     "%u is supposed to be the new master, dead=%u\n",
+		     fr->node_idx, dlm->reco.new_master, fr->dead_node);
+		BUG();
+	}
+	if (dlm->reco.dead_node != fr->dead_node) {
+		mlog(ML_ERROR, "node %u sent recovery finalize msg for dead "
+		     "node %u, but node %u is supposed to be dead\n",
+		     fr->node_idx, fr->dead_node, dlm->reco.dead_node);
+		BUG();
+	}
 	
 	dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
 

Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c	2005-06-21 22:03:49 UTC (rev 2416)
+++ trunk/fs/ocfs2/dlm/dlmthread.c	2005-06-22 05:51:17 UTC (rev 2417)
@@ -421,32 +421,38 @@
 	if (res) {
 		spin_lock(&dlm->spinlock);
 		spin_lock(&res->spinlock);
-		__dlm_kick_thread(dlm, res);
+		__dlm_dirty_lockres(dlm, res);
 		spin_unlock(&res->spinlock);
 		spin_unlock(&dlm->spinlock);
-	} else
-		wake_up(&dlm->dlm_thread_wq);
+	}
+	wake_up(&dlm->dlm_thread_wq);
 }
 
 void __dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res)
 {
 	mlog_entry("dlm=%p, res=%p\n", dlm, res);
-	if (res) {
-		assert_spin_locked(&dlm->spinlock);
-		assert_spin_locked(&res->spinlock);
+	if (res)
+		__dlm_dirty_lockres(dlm, res);
 
-		/* don't shuffle secondary queues */
-		if ((res->owner == dlm->node_num) &&
-		    !(res->state & DLM_LOCK_RES_DIRTY)) {
-			list_add_tail(&res->dirty, &dlm->dirty_list);
-			res->state |= DLM_LOCK_RES_DIRTY;
-		}
-	}
-
 	wake_up(&dlm->dlm_thread_wq);
 }
 
+void __dlm_dirty_lockres(dlm_ctxt *dlm, dlm_lock_resource *res)
+{
+	mlog_entry("dlm=%p, res=%p\n", dlm, res);
+	
+	assert_spin_locked(&dlm->spinlock);
+	assert_spin_locked(&res->spinlock);
 
+	/* don't shuffle secondary queues */
+	if ((res->owner == dlm->node_num) &&
+	    !(res->state & DLM_LOCK_RES_DIRTY)) {
+		list_add_tail(&res->dirty, &dlm->dirty_list);
+		res->state |= DLM_LOCK_RES_DIRTY;
+	}
+}
+
+
 /* Launch the NM thread for the mounted volume */
 int dlm_launch_thread(dlm_ctxt *dlm)
 {
@@ -608,8 +614,6 @@
 		 * transition. */
 		dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
 
-		down_read(&dlm->recovery_sem);
-
 		/* We really don't want to hold dlm->spinlock while 
 		 * calling dlm_shuffle_lists on each lockres that
 		 * needs to have its queues adjusted and AST/BASTs 
@@ -638,16 +642,20 @@
 			 * dirty_list in this gap, but that is ok */
 
 			spin_lock(&res->spinlock);
-			DLM_ASSERT(!(res->state & DLM_LOCK_RES_MIGRATING));
-			DLM_ASSERT(!(res->state & DLM_LOCK_RES_RECOVERING));
 			DLM_ASSERT(res->owner == dlm->node_num);
 			
-			if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
+			/* it is now ok to move lockreses in these states
+			 * to the dirty list, assuming that they will only be
+			 * dirty for a short while. */
+			if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
+					  DLM_LOCK_RES_MIGRATING |
+					  DLM_LOCK_RES_RECOVERING)) {
 				/* move it to the tail and keep going */
 				spin_unlock(&res->spinlock);
 				mlog(0, "delaying list shuffling for in-"
-				     "progress lockres %.*s\n",
-				     res->lockname.len, res->lockname.name);
+				     "progress lockres %.*s, state=%d\n",
+				     res->lockname.len, res->lockname.name,
+				     res->state);
 				delay = 1;
 				goto in_progress;
 			}
@@ -689,7 +697,6 @@
 
 		spin_unlock(&dlm->spinlock);
 		dlm_flush_asts(dlm);
-		up_read(&dlm->recovery_sem);
 
 		/* yield and continue right away if there is more work to do */
 		if (!n) {

Modified: trunk/fs/ocfs2/dlm/dlmunlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmunlock.c	2005-06-21 22:03:49 UTC (rev 2416)
+++ trunk/fs/ocfs2/dlm/dlmunlock.c	2005-06-22 05:51:17 UTC (rev 2417)
@@ -164,12 +164,21 @@
 	if (!master_node) {
 		owner = res->owner;
 		/* drop locks and send message */
+		if (flags & LKM_CANCEL)
+			lock->cancel_pending = 1;
+		else
+			lock->unlock_pending = 1;
 		spin_unlock(&lock->spinlock);
 		spin_unlock(&res->spinlock);
 		status = dlm_send_remote_unlock_request(dlm, res, lock, lksb,
 							flags, owner);
 		spin_lock(&res->spinlock);
 		spin_lock(&lock->spinlock);
+		if (flags & LKM_CANCEL)
+			lock->cancel_pending = 0;
+		else
+			lock->unlock_pending = 0;
+
 	}
 
 	/* get an extra ref on lock.  if we are just switching 
@@ -221,6 +230,21 @@
 	return status;
 }
 
+void dlm_commit_pending_unlock(dlm_lock_resource *res, dlm_lock *lock)
+{
+	/* leave DLM_LKSB_PUT_LVB on the lksb so any final
+	 * update of the lvb will be sent to the new master */
+	list_del_init(&lock->list);
+}
+
+void dlm_commit_pending_cancel(dlm_lock_resource *res, dlm_lock *lock)
+{
+	list_del_init(&lock->list);
+	list_add_tail(&lock->list, &res->granted);
+	lock->ml.convert_type = LKM_IVMODE;
+}
+
+
 static inline dlm_status dlmunlock_master(dlm_ctxt *dlm,
 					  dlm_lock_resource *res,
 					  dlm_lock *lock,
@@ -295,7 +319,19 @@
 		lksb->status = status;
 	} else {
 		mlog_errno(tmpret);
-		ret = dlm_err_to_dlm_status(tmpret);
+		if (dlm_is_host_down(tmpret)) {
+			/* NOTE: this seems strange, but it is what we want.
+			 * when the master goes down during a cancel or 
+			 * unlock, the recovery code completes the operation
+			 * as if the master had not died, then passes the 
+			 * updated state to the recovery master.  this thread
+			 * just needs to finish out the operation and call
+			 * the unlockast. */
+			ret = DLM_NORMAL;
+		} else {
+			/* something bad.  this will BUG in ocfs2 */
+			ret = dlm_err_to_dlm_status(tmpret);
+		}
 		lksb->status = ret;
 	}
 



More information about the Ocfs2-commits mailing list