[Ocfs2-commits] khackel commits r1696 - in trunk: cluster src

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Tue Dec 7 19:34:06 CST 2004


Author: khackel
Date: 2004-12-07 19:34:04 -0600 (Tue, 07 Dec 2004)
New Revision: 1696

Modified:
   trunk/cluster/dlmmod.c
   trunk/cluster/dlmmod.h
   trunk/cluster/heartbeat.c
   trunk/cluster/tcp.c
   trunk/cluster/tcp.h
   trunk/src/vote.c
Log:
fixes several bugs in ocfs, tcp and dlm.  THIS IS THE FIRST REVISION THAT MOUNTS ON TWO NODES.  fear not, it will die in i_revalidate as soon as you ls ;-)

Modified: trunk/cluster/dlmmod.c
===================================================================
--- trunk/cluster/dlmmod.c	2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/cluster/dlmmod.c	2004-12-08 01:34:04 UTC (rev 1696)
@@ -98,6 +98,7 @@
 
 typedef struct _dlm_create_lock
 {
+	u32 flags;   // TODO: reduce the size of this
 	u16 node_idx;
 	s8 requested_type;
 	u8 namelen;
@@ -107,6 +108,7 @@
 
 typedef struct _dlm_convert_lock
 {
+	u32 flags;   // TODO: reduce the size of this
 	u16 node_idx;
 	s8 requested_type;
 	u8 namelen;
@@ -116,7 +118,7 @@
 
 typedef struct _dlm_unlock_lock
 {
-	u32 flags;
+	u32 flags;   // TODO: reduce the size of this
 	u16 node_idx;
 	u8 namelen;
 	u8 name[NM_MAX_NAME_LEN];
@@ -425,6 +427,8 @@
 	spin_unlock(&res->spinlock);
 	dlm_kick_thread(dlm, res);
 	if (!got_it && (flags & LKM_NOQUEUE)) {
+		dlmprintk("did not get NOQUEUE lock %*s at level %d\n", 
+			  res->lockname.len, res->lockname.name, lock->type);
 		return DLM_NOTQUEUED;
 	}
 	return DLM_NORMAL;
@@ -494,7 +498,6 @@
 	dlm_status status = DLM_NORMAL;
 	struct list_head *iter;
 	dlm_lock *tmplock=NULL;
-	int remote_in_place = 0;
 
 	dlmprintk("type=%d, convert_type=%d, new convert_type=%d\n", lock->type, lock->convert_type, type);
 
@@ -543,11 +546,8 @@
 	/* fall thru to grant */
 
 grant:
-	if (lock->node != dlm->group_index) {
-		dlmprintk0("no in-place convert for nonlocal locks :(  see if this helps...\n");
-		remote_in_place = 1;
-		goto switch_queues;
-	}
+	if (lock->node != dlm->group_index)
+		dlmprintk0("doing in-place convert for nonlocal lock\n");
 
 	/* immediately grant the new lock type */
 	//printk("doing in-place %sconvert from %d to %d\n", 
@@ -575,6 +575,8 @@
 
 switch_queues:
 	if (flags & LKM_NOQUEUE) {
+		dlmprintk("failed to convert NOQUEUE lock %*s from %d to %d...\n",
+			  res->lockname.len, res->lockname.name, lock->type, type);
 		spin_unlock(&lock->spinlock);
 		spin_unlock(&res->spinlock);
 		return DLM_NOTQUEUED;
@@ -582,11 +584,7 @@
 
 	lock->convert_type = type;
 	list_del(&lock->list);
-	/* make sure the remote in-place convert gets handled right away */
-	if (remote_in_place)
-		list_add(&lock->list, &res->converting);
-	else
-		list_add_tail(&lock->list, &res->converting);
+	list_add_tail(&lock->list, &res->converting);
 	
 	spin_unlock(&lock->spinlock);
 	spin_unlock(&res->spinlock);
@@ -652,6 +650,8 @@
 	dlm_lock_resource *res;
 	dlm_lock *lock = NULL;
 	int call_ast = 0;
+	
+	dlmprintk0("\n");
 
 	if (!lksb)
 		return DLM_BADARGS;
@@ -670,6 +670,9 @@
 	lock = lksb->lockid;
 	res = lock->lockres;
 
+	spin_lock(&res->spinlock);
+	spin_lock(&lock->spinlock);
+
 	status = dlmunlock_local(dlm, res, lock, lksb, flags, &call_ast);
 	if (call_ast)
 		(*unlockast)(data, lksb->status);
@@ -682,6 +685,8 @@
 	dlm_status status;
 	int free_lock = 0, remote_ready = 0;
 	int local = 0, remove = 0, regrant = 0;
+	
+	dlmprintk0("\n");
 
 	/* according to spec and opendlm code
 	 *  flags & LKM_CANCEL != 0: must be converting or blocked
@@ -692,14 +697,16 @@
 	*call_ast = 0;
 
 recheck:
-	spin_lock(&res->spinlock);
-	spin_lock(&lock->spinlock);
 
 	local = (res->owner == dlm->group_index);
 
+	dlmprintk0("checking flags...\n");
+
 	if (flags & LKM_CANCEL) {
+		dlmprintk0("cancel request\n");
 		/* cancel request */
 		if (dlm_lock_on_list(&res->blocked, lock)) {
+			dlmprintk0("on blocked list\n");
 			/* cancel this outright */
 			lksb->status = DLM_NORMAL;
 			status = DLM_NORMAL;
@@ -708,6 +715,7 @@
 			remove = 1;
 			regrant = 0;
 		} else if (dlm_lock_on_list(&res->converting, lock)) {
+			dlmprintk0("on converting list\n");
 			/* cancel the request, put back on granted */
 			lksb->status = DLM_NORMAL;
 			status = DLM_NORMAL;
@@ -716,6 +724,7 @@
 			remove = 1;
 			regrant = 1;
 		} else if (dlm_lock_on_list(&res->granted, lock)) {
+			dlmprintk0("on granted list\n");
 			/* too late, already granted.  DLM_CANCELGRANT */
 			lksb->status = DLM_CANCELGRANT;
 			status = DLM_NORMAL;
@@ -725,6 +734,7 @@
 			regrant = 0;
 		} else {
 			/* err. um. eek! */
+			dlmprintk0("on NO list!\n");
 			printk("lock to cancel is not on any list!  bug!\n");
 			lksb->status = DLM_IVLOCKID;
 			status = DLM_IVLOCKID;
@@ -734,8 +744,11 @@
 			regrant = 0;
 		}
 	} else {
+		dlmprintk0("unlock request\n");
+
 		/* unlock request */
 		if (!dlm_lock_on_list(&res->granted, lock)) {
+			dlmprintk0("not on granted list\n");
 			lksb->status = DLM_DENIED;
 			status = DLM_DENIED;
 			free_lock = 0;
@@ -743,6 +756,7 @@
 			remove = 0;
 			regrant = 0;
 		} else {
+			dlmprintk0("on granted list\n");
 			/* unlock granted lock */
 			lksb->status = DLM_NORMAL;
 			status = DLM_NORMAL;
@@ -753,10 +767,14 @@
 		}
 	}
 
+	dlmprintk0("checking local/remote\n");
+
 	if (!local) {
+		dlmprintk0("nonlocal\n");
 		/* safe since nothing can change on this 
-		 * seconndary queue without lockres lock */
+		 * secondary queue without lockres lock */
 		spin_unlock(&lock->spinlock);
+		dlmprintk0("unlocked lock spinlock\n");
 
 		/* if there was an outstanding change on the
 		 * lockres, conditions could have changed */
@@ -765,38 +783,56 @@
 			__dlm_wait_on_lockres(res);
 			res->state |= DLM_LOCK_RES_IN_PROGRESS;
 			remote_ready = 1;
-			spin_unlock(&res->spinlock);
+			dlmprintk0("unlocked lockres spinlock, not ready, rechecking!\n");
+			spin_lock(&lock->spinlock);
 			goto recheck;
 		}
 
 		if (res->state & DLM_LOCK_RES_RECOVERING) {
 			/* !!!!! */
+			dlmprintk0("unlocking lock spinlock\n");
 			spin_unlock(&res->spinlock);
+			dlmprintk0("lockres is recovering!\n");
 			return DLM_RECOVERING;
 		} else {
+			dlmprintk0("unlocking lockres spinlock\n");
 			spin_unlock(&res->spinlock);
 			status = dlm_send_remote_unlock_request(dlm, res, lock, lksb, flags);
+			dlmprintk0("locking lockres spinlock\n");
 			spin_lock(&res->spinlock);
 			res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
 		}
+		dlmprintk0("locking lock spinlock\n");
 		spin_lock(&lock->spinlock);
 	}
 
-	if (remove)
+	if (remove) {
+		dlmprintk0("removing lock from list\n");
 		list_del(&lock->list);
-	if (regrant)
+	}
+	if (regrant) {
+		dlmprintk0("moving lock to granted list\n");
 		list_add_tail(&lock->list, &res->granted);
+	}
 
+	dlmprintk0("unlocking lock spinlock\n");
 	spin_unlock(&lock->spinlock);
+	dlmprintk0("unlocking lockres spinlock\n");
 	spin_unlock(&res->spinlock);
+	
+	dlmprintk0("done with locks\n");
 
 	if (free_lock) {
+		dlmprintk0("need to free lock\n");
+
 #warning this must change to proper refcounting
 		/* TODO: refcounting... tho for now this will work because 
 		 * the middle layer is keeping track of everything */
 		kfree(lock);
 		lksb->lockid = NULL;
+		dlmprintk0("done freeing\n");
 	}
+	dlmprintk("aha done with everything, returning %d\n", status);
 	return status;
 }
 	
@@ -925,6 +961,8 @@
 	dlm_lock_resource *tmpres=NULL;
 	struct list_head *bucket;
 	
+	dlmprintk0("\n");
+
 	bucket = &(dlm->resources[lockname->hash & DLM_HASH_MASK]);
 
 	/* check for pre-existing lock */
@@ -1435,6 +1473,7 @@
 	create.requested_type = lock->type;
 	create.cookie = lock->cookie;
 	create.namelen = res->lockname.len;
+	create.flags = flags;
 	strncpy(create.name, res->lockname.name, create.namelen);
 
 	ret = DLM_NOLOCKMGR;
@@ -1489,8 +1528,16 @@
 	if (res) {
 		spin_lock(&res->spinlock);
 		newlock->lockres = res;
-		status = dlmlock_local(dlm, res, newlock, 0);
+		status = dlmlock_local(dlm, res, newlock, create->flags);
 		spin_unlock(&res->spinlock);
+
+		if (create->flags & LKM_NOQUEUE &&
+		    status == DLM_NOTQUEUED) {
+			dlmprintk("failed to get NOQUEUE lock %*s at level %d...\n",
+				  res->lockname.len, res->lockname.name, newlock->type);
+			/* never added to blocked queue, just delete */
+			kfree(newlock);
+		}
 	}
 
 	return status;
@@ -1512,6 +1559,7 @@
 	convert.requested_type = type;
 	convert.cookie = lock->cookie;
 	convert.namelen = res->lockname.len;
+	convert.flags = flags;
 	strncpy(convert.name, res->lockname.name, convert.namelen);
 
 	ret = DLM_NOLOCKMGR;
@@ -1565,7 +1613,7 @@
 			    lock->node == convert->node_idx) {
 				found = 1;
 	rdtsc(u5.hilo[0], u5.hilo[1]);
-				status = dlmconvert_local(dlm, res, lock, 0, convert->requested_type);
+				status = dlmconvert_local(dlm, res, lock, convert->flags, convert->requested_type);
 	rdtsc(u6.hilo[0], u6.hilo[1]);
 				break;
 			}

Modified: trunk/cluster/dlmmod.h
===================================================================
--- trunk/cluster/dlmmod.h	2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/cluster/dlmmod.h	2004-12-08 01:34:04 UTC (rev 1696)
@@ -120,48 +120,48 @@
 }
 
 typedef enum _dlm_status {
-	DLM_NORMAL,               /* request in progress */
-	DLM_GRANTED,              /* request granted */
-	DLM_DENIED,               /* request denied */
-	DLM_DENIED_NOLOCKS,       /* request denied, out of system resources */
-	DLM_WORKING,              /* async request in progress */
-	DLM_BLOCKED,              /* lock request blocked */
-	DLM_BLOCKED_ORPHAN,       /* lock request blocked by a orphan lock*/
-	DLM_DENIED_GRACE_PERIOD,  /* topological change in progress */
-	DLM_SYSERR,               /* system error */
-	DLM_NOSUPPORT,            /* unsupported */
-	DLM_CANCELGRANT,          /* can't cancel convert: already granted */
-	DLM_IVLOCKID,             /* bad lockid */
-	DLM_SYNC,                 /* synchronous request granted */
-	DLM_BADTYPE,              /* bad resource type */
-	DLM_BADRESOURCE,          /* bad resource handle */
-	DLM_MAXHANDLES,           /* no more resource handles */
-	DLM_NOCLINFO,             /* can't contact cluster manager */
-	DLM_NOLOCKMGR,            /* can't contact lock manager */
-	DLM_NOPURGED,             /* can't contact purge daemon */
-	DLM_BADARGS,              /* bad api args */
-	DLM_VOID,                 /* no status */
-	DLM_NOTQUEUED,            /* NOQUEUE was specified and request failed */
-	DLM_IVBUFLEN,             /* invalid resource name length */
-	DLM_CVTUNGRANT,           /* attempted to convert ungranted lock */
-	DLM_BADPARAM,             /* invalid lock mode specified */
-	DLM_VALNOTVALID,          /* value block has been invalidated */
-	DLM_REJECTED,             /* request rejected, unrecognized client */
-	DLM_ABORT,                /* blocked lock request cancelled */
-	DLM_CANCEL,               /* conversion request cancelled */
-	DLM_IVRESHANDLE,          /* invalid resource handle */
-	DLM_DEADLOCK,             /* deadlock recovery refused this request */
-	DLM_DENIED_NOASTS,        /* failed to allocate AST */
-	DLM_FORWARD,              /* request must wait for primary's response */
-	DLM_TIMEOUT,              /* timeout value for lock has expired */
-	DLM_IVGROUPID,            /* invalid group specification */
-	DLM_VERS_CONFLICT,        /* version conflicts prevent request handling */
-	DLM_BAD_DEVICE_PATH,      /* Locks device does not exist or path wrong */
-	DLM_NO_DEVICE_PERMISSION, /* Client has insufficient pers for device */
-	DLM_NO_CONTROL_DEVICE,    /* Cannot set options on opened device */
-	DLM_MAXSTATS,             /* upper limit for return code validation */
+	DLM_NORMAL,               /*  0: request in progress */
+	DLM_GRANTED,              /*  1: request granted */
+	DLM_DENIED,               /*  2: request denied */
+	DLM_DENIED_NOLOCKS,       /*  3: request denied, out of system resources */
+	DLM_WORKING,              /*  4: async request in progress */
+	DLM_BLOCKED,              /*  5: lock request blocked */
+	DLM_BLOCKED_ORPHAN,       /*  6: lock request blocked by a orphan lock*/
+	DLM_DENIED_GRACE_PERIOD,  /*  7: topological change in progress */
+	DLM_SYSERR,               /*  8: system error */
+	DLM_NOSUPPORT,            /*  9: unsupported */
+	DLM_CANCELGRANT,          /* 10: can't cancel convert: already granted */
+	DLM_IVLOCKID,             /* 11: bad lockid */
+	DLM_SYNC,                 /* 12: synchronous request granted */
+	DLM_BADTYPE,              /* 13: bad resource type */
+	DLM_BADRESOURCE,          /* 14: bad resource handle */
+	DLM_MAXHANDLES,           /* 15: no more resource handles */
+	DLM_NOCLINFO,             /* 16: can't contact cluster manager */
+	DLM_NOLOCKMGR,            /* 17: can't contact lock manager */
+	DLM_NOPURGED,             /* 18: can't contact purge daemon */
+	DLM_BADARGS,              /* 19: bad api args */
+	DLM_VOID,                 /* 20: no status */
+	DLM_NOTQUEUED,            /* 21: NOQUEUE was specified and request failed */
+	DLM_IVBUFLEN,             /* 22: invalid resource name length */
+	DLM_CVTUNGRANT,           /* 23: attempted to convert ungranted lock */
+	DLM_BADPARAM,             /* 24: invalid lock mode specified */
+	DLM_VALNOTVALID,          /* 25: value block has been invalidated */
+	DLM_REJECTED,             /* 26: request rejected, unrecognized client */
+	DLM_ABORT,                /* 27: blocked lock request cancelled */
+	DLM_CANCEL,               /* 28: conversion request cancelled */
+	DLM_IVRESHANDLE,          /* 29: invalid resource handle */
+	DLM_DEADLOCK,             /* 30: deadlock recovery refused this request */
+	DLM_DENIED_NOASTS,        /* 31: failed to allocate AST */
+	DLM_FORWARD,              /* 32: request must wait for primary's response */
+	DLM_TIMEOUT,              /* 33: timeout value for lock has expired */
+	DLM_IVGROUPID,            /* 34: invalid group specification */
+	DLM_VERS_CONFLICT,        /* 35: version conflicts prevent request handling */
+	DLM_BAD_DEVICE_PATH,      /* 36: Locks device does not exist or path wrong */
+	DLM_NO_DEVICE_PERMISSION, /* 37: Client has insufficient pers for device */
+	DLM_NO_CONTROL_DEVICE,    /* 38: Cannot set options on opened device */
+	DLM_MAXSTATS,             /* 39: upper limit for return code validation */
 	
-	DLM_RECOVERING            /* our lame addition to allow caller to fail a lock 
+	DLM_RECOVERING            /* 40: our lame addition to allow caller to fail a lock 
 				     request if it is being recovered */
 } dlm_status;
 

Modified: trunk/cluster/heartbeat.c
===================================================================
--- trunk/cluster/heartbeat.c	2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/cluster/heartbeat.c	2004-12-08 01:34:04 UTC (rev 1696)
@@ -191,6 +191,11 @@
 			bh = slot->bh;
 			node = slot->inode;
 
+			if (!node) {
+				printk("no inode in slot %d!\n", idx);
+				idx++;
+				continue;
+			}
 			ino = nm_get_node_global_index(node);
 
 			if (ino == nm_this_node(group)) {
@@ -235,6 +240,11 @@
 		while ((slot = nm_iterate_group_disk_slots(group, &idx))) {
 			bh = slot->bh;
 			node = slot->inode;
+			if (!node) {
+				printk("no inode in slot %d!\n", idx);
+				idx++;
+				continue;
+			}
 
 			ino = nm_get_node_global_index(node);
 

Modified: trunk/cluster/tcp.c
===================================================================
--- trunk/cluster/tcp.c	2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/cluster/tcp.c	2004-12-08 01:34:04 UTC (rev 1696)
@@ -1059,6 +1059,7 @@
 			netprintk0("no handler for message.\n");
 			goto error;
 		}
+printk("about to dispatch message\n");		
 rdtsc(u1.hilo[0], u1.hilo[1]);
 		err = net_dispatch_message(inode, sock, &hdr, hnd);
 rdtsc(u2.hilo[0], u2.hilo[1]);
@@ -1073,6 +1074,7 @@
 #endif
 			hdr.status = err;
 			hdr.magic = NET_MSG_STATUS_MAGIC;  // twiddle the magic
+printk("about to send return message, status=%d\n", err);
 rdtsc(u3.hilo[0], u3.hilo[1]);
 			tmperr = net_send_tcp_msg(inode, sock, &hdr, sizeof(net_msg));
 rdtsc(u4.hilo[0], u4.hilo[1]);
@@ -1089,6 +1091,7 @@
 		spin_lock(&net_list_lock);
 		list_add_tail(&net->list, &net_recv_list);
 		spin_unlock(&net_list_lock);
+printk("all done with this round, starting over\n");		
 		goto start_over;
 
 error:

Modified: trunk/cluster/tcp.h
===================================================================
--- trunk/cluster/tcp.h	2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/cluster/tcp.h	2004-12-08 01:34:04 UTC (rev 1696)
@@ -112,8 +112,9 @@
 #define NET_MAX_MSG_LEN  (8192)
 	
 
-#define NET_ALREADY_CONNECTED   2
-#define NET_UNKNOWN_HOST        3
+/* RESERVED */
+#define NET_ALREADY_CONNECTED   (0xfff0)
+#define NET_UNKNOWN_HOST        (0xfff1)
 	
 
 static inline int net_is_valid_error_type(u32 err_type)

Modified: trunk/src/vote.c
===================================================================
--- trunk/src/vote.c	2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/src/vote.c	2004-12-08 01:34:04 UTC (rev 1696)
@@ -473,6 +473,7 @@
 	int status, i, remote_err;
 	ocfs2_net_wait_ctxt *w = NULL;
 	struct inode *remote_node;
+	int dequeued = 0;
 
 	w = ocfs2_new_net_wait_ctxt(osb, response_id);
 	if (!w) {
@@ -486,6 +487,7 @@
 	ocfs2_queue_net_wait_ctxt(osb, w);
 
 	i = ocfs_node_map_iterate(osb, &osb->mounted_map, 0);
+	printk("node map iterate: starting at %d, i am %d\n", i, osb->node_num);
 	while (i != OCFS_INVALID_NODE_NUM) {
 		if (i != osb->node_num) {
 			ocfs_node_map_set_bit(osb, &w->n_node_map, i);
@@ -521,8 +523,11 @@
 				goto bail;
 			}
 		}
+		i++;
 		i = ocfs_node_map_iterate(osb, &osb->mounted_map, i);
+		printk("node map iterate: next is %d, i am %d\n", i, osb->node_num);
 	}
+	printk("done sending, now waiting on responses...\n");
 
 	status = ocfs2_wait_on_vote_responses(osb, w);
 	if (status < 0) {
@@ -532,10 +537,12 @@
 	}
 
 	ocfs2_dequeue_net_wait_ctxt(osb, w);
+	dequeued = 1;
 	status = w->n_response;
 bail:
 	if (w) {
-		ocfs2_dequeue_net_wait_ctxt(osb, w);
+		if (!dequeued)
+			ocfs2_dequeue_net_wait_ctxt(osb, w);
 		kfree(w);
 	}
 
@@ -554,7 +561,9 @@
 
 	OCFS_ASSERT(type == OCFS2_VOTE_REQ_DELETE ||
 		    type == OCFS2_VOTE_REQ_UNLINK ||
-		    type == OCFS2_VOTE_REQ_RENAME);
+		    type == OCFS2_VOTE_REQ_RENAME ||
+		    type == OCFS2_VOTE_REQ_UMOUNT ||
+		    type == OCFS2_VOTE_REQ_MOUNT);
 
 	request = kmalloc(sizeof(*request), GFP_KERNEL);
 	if (!request) {



More information about the Ocfs2-commits mailing list