[Ocfs2-commits] mfasheh commits r2068 - trunk/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Tue Mar 29 13:08:10 CST 2005


Author: mfasheh
Signed-off-by: zab
Date: 2005-03-29 13:08:08 -0600 (Tue, 29 Mar 2005)
New Revision: 2068

Modified:
   trunk/fs/ocfs2/dlm/dlmunlock.c
Log:
* Teach dlmunlock how to schedule a bit before re sending an unlock request
  that's currently busy.

* Do proper locking around all lockres values (specifically owner) during
  dlmunlock.

* the dlmunlock network request handler didn't know how to double check if    
  it was the current master of the lock. Now it can check that and tell the
  requesting node to back off and retry. For lock resources that don't exist 
  any more on that node we return the same value as they may have been
  migrated away.

* dlm_unlock_lock_handler was forgetting to drop the resource spinlock in   
  several error paths.

Signed-off-by: zab



Modified: trunk/fs/ocfs2/dlm/dlmunlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmunlock.c	2005-03-29 02:47:56 UTC (rev 2067)
+++ trunk/fs/ocfs2/dlm/dlmunlock.c	2005-03-29 19:08:08 UTC (rev 2068)
@@ -38,8 +38,8 @@
 #include <linux/socket.h>
 #include <linux/inet.h>
 #include <linux/spinlock.h>
+#include <linux/delay.h>
 
-
 #include "cluster/heartbeat.h"
 #include "cluster/nodemanager.h"
 #include "cluster/tcp.h"
@@ -65,7 +65,8 @@
 						 dlm_lock_resource *res, 
 						 dlm_lock *lock, 
 						 dlm_lockstatus *lksb, 
-						 int flags);
+						 int flags,
+						 u8 owner);
 
 
 
@@ -83,6 +84,7 @@
 	dlm_status status;
 	int actions = 0;
 	int in_use;
+        u8 owner;
 
 	dlmprintk("master_node = %d, valblk = %d\n", master_node,
 		  flags & LKM_VALBLK);
@@ -148,11 +150,12 @@
 	}
 
 	if (!master_node) {
+		owner = res->owner;
 		/* drop locks and send message */
 		spin_unlock(&lock->spinlock);
 		spin_unlock(&res->spinlock);
-		status = dlm_send_remote_unlock_request(dlm, res, lock, 
-							lksb, flags);
+		status = dlm_send_remote_unlock_request(dlm, res, lock, lksb,
+							flags, owner);
 		spin_lock(&res->spinlock);
 		spin_lock(&lock->spinlock);
 	}
@@ -216,7 +219,8 @@
 						 dlm_lock_resource *res, 
 						 dlm_lock *lock, 
 						 dlm_lockstatus *lksb, 
-						 int flags)
+						 int flags,
+						 u8 owner)
 {
 	dlm_unlock_lock unlock;
 	int tmpret;
@@ -246,7 +250,7 @@
 
 	dlm_unlock_lock_to_net(&unlock);
 	tmpret = net_send_message_iov(DLM_UNLOCK_LOCK_MSG, dlm->key, 
-				      iov, iovlen, res->owner, &status);
+				      iov, iovlen, owner, &status);
 	if (tmpret >= 0) {
 		// successfully sent and received
 		if (status == DLM_CANCELGRANT)
@@ -315,25 +319,40 @@
 
 	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : "none");
 
-	status = DLM_IVLOCKID;
 	res = dlm_lookup_lock(dlm, unlock->name, unlock->namelen);
-	if (!res)
+	if (!res) {
+		/* We assume here that a no lock resource simply means
+		 * it was migrated away and destroyed before the other
+		 * node could detect it. */
+		dlmprintk0("returning DLM_FORWARD -- res no longer exists\n");
+		status = DLM_FORWARD;
 		goto not_found;
+	}
 
 	queue=&res->granted;
 	found = 0;
 	spin_lock(&res->spinlock);
 	if (res->state & DLM_LOCK_RES_RECOVERING) {
+		spin_unlock(&res->spinlock);
 		dlmprintk0("returning DLM_RECOVERING\n");
 		status = DLM_RECOVERING;
 		goto leave;
 	}
+
 	if (res->state & DLM_LOCK_RES_MIGRATING) {
+		spin_unlock(&res->spinlock);
 		dlmprintk0("returning DLM_MIGRATING\n");
 		status = DLM_MIGRATING;
 		goto leave;
 	}
 
+	if (res->owner != dlm->node_num) {
+		spin_unlock(&res->spinlock);
+		dlmprintk0("returning DLM_FORWARD -- not master\n");
+		status = DLM_FORWARD;
+		goto leave;
+	}
+
 	for (i=0; i<3; i++) {
 		list_for_each(iter, queue) {
 			lock = list_entry(iter, dlm_lock, list);
@@ -349,8 +368,10 @@
 		queue++;
 	}
 	spin_unlock(&res->spinlock);
-	if (!found)
+	if (!found) {
+		status = DLM_IVLOCKID;
 		goto not_found;
+	}
 
 	/* lock was found on queue */
 	lksb = lock->lksb;
@@ -457,7 +478,7 @@
 	dlm_status status;
 	dlm_lock_resource *res;
 	dlm_lock *lock = NULL;
-	int call_ast = 0;
+	int call_ast, is_master;
 
 	dlmprintk0("\n");
 
@@ -481,10 +502,15 @@
 	DLM_ASSERT(lock);
 	DLM_ASSERT(res);
 retry:
+	call_ast = 0;
 	/* need to retry up here because owner may have changed */
 	dlmprintk("lock=%p res=%p\n", lock, res);
 
-	if (res->owner == dlm->node_num) {
+	spin_lock(&res->spinlock);
+	is_master = (res->owner == dlm->node_num);
+	spin_unlock(&res->spinlock);
+
+	if (is_master) {
 		status = dlmunlock_master(dlm, res, lock, lksb, flags, 
 					  &call_ast);
 		dlmprintk("done calling dlmunlock_master: returned %d, "
@@ -499,10 +525,19 @@
 	if (status == DLM_RECOVERING ||
 	    status == DLM_MIGRATING ||
 	    status == DLM_FORWARD) {
+		/* We want to go away for a tiny bit to allow recovery
+		 * / migration to complete on this resource. I don't
+		 * know of any wait queue we could sleep on as this
+		 * may be happening on another node. Perhaps the
+		 * proper solution is to queue up requests on the
+		 * other end? */
+		msleep(50);
+
 		dlmprintk0("retrying unlock due to pending recovery/"
 			   "migration/in-progress\n");
 		goto retry;
 	}
+
 	if (call_ast) {
 		dlmprintk("calling unlockast(%p, %d)\n",
 			  data, lksb->status);



More information about the Ocfs2-commits mailing list