[Ocfs2-commits] khackel commits r1718 - trunk/cluster

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Thu Dec 23 15:48:19 CST 2004


Author: khackel
Date: 2004-12-23 15:48:17 -0600 (Thu, 23 Dec 2004)
New Revision: 1718

Modified:
   trunk/cluster/dlmmaster.c
   trunk/cluster/dlmmod.c
   trunk/cluster/dlmmod.h
   trunk/cluster/dlmthread.c
   trunk/cluster/nodemanager.c
   trunk/cluster/nodemanager.h
   trunk/cluster/tcp.c
   trunk/cluster/tcp.h
Log:
* made lksb consistent on both the lock request node and the master node;
  this means that a placeholder lksb is created for the requesting node when   
  the lock is created; used to copy status and/or lvb in/out from network
* made lksb->status updates consistent with this; fewer special cases now
* added lvb to convert, unlock and proxy ast structures; these are now
  variable length network messages (may contain the extra 64 bytes for lvb)
* added flags field to lksb; this is not to be used by the caller; these flags
  are used to determine if an lvb update needs to take place on this lock
  (whether it is a DLM_LKSB_PUT_LVB or a DLM_LKSB_GET_LVB) and whether or not
  this lksb was allocated by the kernel or by the caller of dlmlock
* added a couple of LKM_* FLAGS and made reserved values for the remaining
* in tcp, made it possible to send a multipart message by giving a net_data
  array of pointer/len and calling net_send_message_arr
* fixed dumb bug with variable length net messages
* fixed dumb bug with net status messages and EINTR



Modified: trunk/cluster/dlmmaster.c
===================================================================
--- trunk/cluster/dlmmaster.c	2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/dlmmaster.c	2004-12-23 21:48:17 UTC (rev 1718)
@@ -414,6 +414,15 @@
 
 		atomic_set(&mle->woken, 0);
 		ret = util_wait_atomic_eq(&mle->wq, &mle->woken, 1, 5000);
+		if (ret == -EINTR) {
+			dlmprintk0("interrupted during lock mastery!\n");
+			break;
+		}
+		if (ret == -ETIMEDOUT) {
+			dlmprintk("timed out during lock mastery: vote_map=%08x, response_map=%08x\n",
+				  mle->vote_map[0], mle->response_map[0]);
+			continue;
+		}
 	}
 	dlm_put_mle(mle);
 

Modified: trunk/cluster/dlmmod.c
===================================================================
--- trunk/cluster/dlmmod.c	2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/dlmmod.c	2004-12-23 21:48:17 UTC (rev 1718)
@@ -114,7 +114,9 @@
 	u8 namelen;
 	u8 name[NM_MAX_NAME_LEN];
 	u64 cookie;
+	s8 lvb[0];
 } dlm_convert_lock;
+#define DLM_CONVERT_LOCK_MAX_LEN  (sizeof(dlm_convert_lock) + DLM_LVB_LEN)
 
 typedef struct _dlm_unlock_lock
 {
@@ -123,18 +125,25 @@
 	u8 namelen;
 	u8 name[NM_MAX_NAME_LEN];
 	u64 cookie;
+	s8 lvb[0];
 } dlm_unlock_lock;
+#define DLM_UNLOCK_LOCK_MAX_LEN  (sizeof(dlm_unlock_lock) + DLM_LVB_LEN)
 
+
 typedef struct _dlm_proxy_ast
 {
+	u32 flags;   // TODO: reduce the size of this
 	u16 node_idx;
 	u8 type;
 	u8 blocked_type;
 	u8 namelen;
 	u8 name[NM_MAX_NAME_LEN];
 	u64 cookie;
+	s8 lvb[0];
 } dlm_proxy_ast;
+#define DLM_PROXY_AST_MAX_LEN  (sizeof(dlm_proxy_ast) + DLM_LVB_LEN)
 
+
 int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
 int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
 int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
@@ -192,7 +201,6 @@
 	return;
 }				/* dlm_driver_exit */
 
-
 dlm_status dlmlock(dlm_ctxt *dlm, int mode, dlm_lockstatus *lksb, int flags, char *name, 
 		   dlm_astlockfunc_t *ast, void *data, dlm_bastlockfunc_t *bast)
 {
@@ -281,16 +289,7 @@
 
 		if (!recovery)		
 			down_read(&dlm->recovery_sem);
-{
-	union {
-		u64 q;
-		u32 hilo[2];
-	} u1, u2;
-	rdtsc(u1.hilo[0], u1.hilo[1]);
 		res = dlm_get_lock_resource(dlm, &q, flags);
-	rdtsc(u2.hilo[0], u2.hilo[1]);
-	dlmprintk("dlm_get_lock_resource took %llu cycles\n", u2.q-u1.q);
-}
 		if (!res) {
 			status = DLM_IVLOCKID;
 			goto up_error;
@@ -338,6 +337,7 @@
 	INIT_LIST_HEAD(&tmplock->ast_list);
 	spin_lock_init(&tmplock->spinlock);
 	tmplock->lockres = res;
+dlmprintk("creating lock: lock=%p res=%p\n", tmplock, res);
 	tmplock->type = type;
 	tmplock->convert_type = LKM_IVMODE;
 	tmplock->highest_blocked = LKM_IVMODE;
@@ -384,14 +384,11 @@
 	dlm_lock *tmplock;
 	int got_it = 0;
 
-	BUG_ON(!lock);
-	BUG_ON(!res);
-	BUG_ON(!dlm);
+	DLM_ASSERT(lock);
+	DLM_ASSERT(res);
+	DLM_ASSERT(dlm);
+	DLM_ASSERT(lock->lksb);
 
-	if (lock->node == dlm->group_index) {
-		BUG_ON(!lock->lksb);
-	}
-
 	dlmprintk("type=%d\n", lock->type);
 
 	list_for_each(iter, &res->granted) {
@@ -412,10 +409,7 @@
 
 	/* got it right away */
 
-	/* if it is a remote request, proxy 
-	 * handler will set the lksb status */
-	if (lock->node == dlm->group_index)
-		lock->lksb->status = DLM_NORMAL;
+	lock->lksb->status = DLM_NORMAL;
 
 	list_add_tail(&lock->list, &res->granted);
 
@@ -475,24 +469,28 @@
 {
 	dlm_status status;
 
-{
-	union {
-		u64 q;
-		u32 hilo[2];
-	} u1, u2;
-	rdtsc(u1.hilo[0], u1.hilo[1]);
-
 	if (res->owner == dlm->group_index)
 		status = dlmconvert_local(dlm, res, lock, flags, type);
 	else 
 		status = dlmconvert_remote(dlm, res, lock, flags, type);
 
-	rdtsc(u2.hilo[0], u2.hilo[1]);
-	dlmprintk("dlmconvert took %llu cycles\n", u2.q-u1.q);
-}
 	return status;
 }
 
+static inline const char * dlm_lock_mode_name(int mode)
+{
+	switch (mode) {
+		case LKM_EXMODE:
+			return "EX";
+		case LKM_PRMODE:
+			return "PR";
+		case LKM_NLMODE:
+			return "NL";
+	}
+	return "UNKNOWN";
+}
+
+
 /* must be already holding lockres->spinlock */
 dlm_status dlmconvert_local(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags, int type)
 {
@@ -519,8 +517,32 @@
 		spin_unlock(&res->spinlock);
 		return DLM_DENIED;
 	}
+
+	if (flags & LKM_VALBLK) {
+		switch (lock->type) {
+			case LKM_EXMODE:
+				/* EX + LKM_VALBLK + convert == set lvb */
+				dlmprintk("will set lvb: converting %s->%s\n",
+					dlm_lock_mode_name(lock->type), dlm_lock_mode_name(type));
+				lock->lksb->flags |= DLM_LKSB_PUT_LVB;
+				break;
+			case LKM_PRMODE:
+			case LKM_NLMODE:
+				/* refetch if new level is not NL */
+				if (type > LKM_NLMODE) {
+					dlmprintk("will fetch new value into lvb: converting %s->%s\n",
+						dlm_lock_mode_name(lock->type), dlm_lock_mode_name(type));
+					lock->lksb->flags |= DLM_LKSB_GET_LVB;
+				} else {
+					dlmprintk("will NOT fetch new value into lvb: converting %s->%s\n",
+						dlm_lock_mode_name(lock->type), dlm_lock_mode_name(type));
+					flags &= ~(LKM_VALBLK);
+				}
+				break;
+		}
+	}
+
 	
-	
 	/* in-place downconvert? */
 	if (type <= lock->type)
 		goto grant;
@@ -547,20 +569,16 @@
 	/* fall thru to grant */
 
 grant:
-	if (lock->node != dlm->group_index)
+	/* immediately grant the new lock type */
+
+	lock->lksb->status = DLM_NORMAL;
+	if (lock->node == dlm->group_index)
 		dlmprintk0("doing in-place convert for nonlocal lock\n");
 
-	/* immediately grant the new lock type */
-	//dlmprintk("doing in-place %sconvert from %d to %d\n", 
-	//       type > lock->type ? "up" : "down", lock->type, type);
+
 	lock->type = type;
 	status = DLM_NORMAL;
 
-	/* if it is a remote request, proxy 
-	 * handler will set the lksb status */
-	if (lock->node == dlm->group_index)
-		lock->lksb->status = DLM_NORMAL;
-
 	if (dlm_do_ast(dlm, res, lock) < 0)
 		dlmprintk0("eek\n");
 
@@ -619,6 +637,21 @@
 		BUG();
 	}
 	lock->convert_type = type;
+
+	if (flags & LKM_VALBLK) {
+		if (lock->type == LKM_EXMODE) {
+			flags |= LKM_PUT_LVB;
+			lock->lksb->flags |= DLM_LKSB_PUT_LVB;
+		} else {
+			if (lock->convert_type == LKM_NLMODE) {
+				dlmprintk0("erm, no point in specifying LKM_VALBLK if converting to NL\n");
+				flags &= ~LKM_VALBLK;
+			} else {
+				flags |= LKM_GET_LVB;
+				lock->lksb->flags |= DLM_LKSB_GET_LVB;
+			}
+		}
+	}
 	spin_unlock(&res->spinlock);
 
 	/* spec seems to say that you will get DLM_NORMAL when the lock 
@@ -633,6 +666,7 @@
 		list_del(&lock->list);
 		list_add_tail(&lock->list, &res->granted);
 		lock->convert_type = LKM_IVMODE;
+		lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
 	}
 bail:
 	spin_unlock(&res->spinlock);
@@ -671,6 +705,10 @@
 
 	lock = lksb->lockid;
 	res = lock->lockres;
+	
+	DLM_ASSERT(lock);
+	DLM_ASSERT(res);
+dlmprintk("lock=%p res=%p\n", lock, res);
 
 	spin_lock(&res->spinlock);
 	spin_lock(&lock->spinlock);
@@ -766,11 +804,16 @@
 			*call_ast = 1;
 			remove = 1;
 			regrant = 0;
+			
+			/* make the final update to the lvb */
+			if (local && lksb->flags & DLM_LKSB_PUT_LVB)
+				memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
 		}
 	}
 
 	dlmprintk0("checking local/remote\n");
 
+	/* lockres mastered locally or remote? */
 	if (!local) {
 		dlmprintk0("nonlocal\n");
 		/* safe since nothing can change on this 
@@ -809,6 +852,7 @@
 		wake_up(&res->wq);
 	}
 
+
 	if (remove) {
 		dlmprintk0("removing lock from list\n");
 		list_del(&lock->list);
@@ -835,6 +879,11 @@
 		lksb->lockid = NULL;
 		dlmprintk0("done freeing\n");
 	}
+
+	/* if cancel or unlock succeeded, lvb work is done */
+	if (status == DLM_NORMAL)
+		lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
+
 	dlmprintk("aha done with everything, returning %d\n", status);
 	return status;
 }
@@ -848,6 +897,10 @@
 	dlm_status ret;
 	int status = 0;
 
+	int arrsz = 1, msgsz = sizeof(dlm_unlock_lock);
+	net_data nd[2];
+
+
 	dlmprintk0("\n");
 
 	memset(&unlock, 0, sizeof(unlock));
@@ -857,11 +910,22 @@
 	unlock.namelen = res->lockname.len;
 	strncpy(unlock.name, res->lockname.name, unlock.namelen);
 
+	nd[0].bytes = msgsz;
+	nd[0].ptr = &unlock;
+
+	if (flags & LKM_PUT_LVB) {
+		/* extra data to send if we are updating lvb */
+		nd[1].bytes = DLM_LVB_LEN;
+		nd[1].ptr = lock->lksb->lvb;
+		arrsz++;
+		msgsz += DLM_LVB_LEN;
+	}
+
 	ret = DLM_NOLOCKMGR;
 	lksb->status = DLM_NOLOCKMGR;
 	inode = nm_get_group_node_by_index(dlm->group, res->owner);
 	if (inode) {
-		tmpret = net_send_message(DLM_UNLOCK_LOCK_MSG, dlm->key, &unlock, sizeof(unlock), inode, &status);
+		tmpret = net_send_message_arr(DLM_UNLOCK_LOCK_MSG, dlm->key, arrsz, nd, msgsz, inode, &status);
 		if (tmpret >= 0) {
 			// successfully sent and received
 			if (status == DLM_CANCELGRANT)
@@ -889,12 +953,25 @@
 	dlm_lock *lock;
 	dlm_status status = DLM_NORMAL;
 	int found = 0;
-	dlm_lockstatus lksb;
+	dlm_lockstatus *lksb = NULL;
 	int ignore;
 	struct qstr lockname = { .name=unlock->name, .len=unlock->namelen };
+	u32 flags = unlock->flags;
 
-	dlmprintk0("\n");
+	if (flags & LKM_GET_LVB) {
+		dlmprintk0("bad args!  GET_LVB specified on unlock!\n");
+		return DLM_BADARGS;
+	}
 
+	if ((flags & (LKM_PUT_LVB|LKM_CANCEL)) == 
+	        (LKM_PUT_LVB|LKM_CANCEL)) {
+		dlmprintk0("bad args!  cannot modify lvb on a CANCEL request!\n");
+		return DLM_BADARGS;
+	}
+
+
+	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : "none");
+
 	lockname.hash = full_name_hash(lockname.name, lockname.len);
 
 	status = DLM_IVLOCKID;
@@ -908,8 +985,15 @@
 			if (lock->cookie == unlock->cookie &&
 			    lock->node == unlock->node_idx) {
 				found = 1;
+				lksb = lock->lksb;
 				/* unlockast only called on originating node */
-				status = dlmunlock_local(dlm, res, lock, &lksb, unlock->flags, &ignore);
+				if (flags & LKM_PUT_LVB) {
+					lksb->flags |= DLM_LKSB_PUT_LVB;
+					memcpy(&lksb->lvb[0], &unlock->lvb[0], DLM_LVB_LEN);
+				}
+				status = dlmunlock_local(dlm, res, lock, lksb, flags, &ignore);
+				if (flags & LKM_PUT_LVB)
+					lksb->flags &= ~DLM_LKSB_PUT_LVB;
 				break;
 			}
 		}
@@ -925,7 +1009,7 @@
 	if (!found)
 		dlmprintk("failed to find lock to unlock!  cookie=%llu\n", unlock->cookie);
 	else
-		status = lksb.status;
+		status = lksb->status;
 
 	return status;
 }
@@ -1178,30 +1262,34 @@
 	if (tmpret)
 		goto error;
 	netbuf += L1_CACHE_ALIGN(sizeof(dlm_create_lock));
-	tmpret = net_register_handler(DLM_CONVERT_LOCK_MSG, key, 0, 
-				      sizeof(dlm_convert_lock), 
+	tmpret = net_register_handler(DLM_CONVERT_LOCK_MSG, key, 
+				      NET_HND_VAR_LEN, 
+				      DLM_CONVERT_LOCK_MAX_LEN,
 				      dlm_convert_lock_handler,
 				      dlm, netbuf);
 	if (tmpret)
 		goto error;
-	netbuf += L1_CACHE_ALIGN(sizeof(dlm_convert_lock));
+	netbuf += L1_CACHE_ALIGN(DLM_CONVERT_LOCK_MAX_LEN);
 
-	tmpret = net_register_handler(DLM_UNLOCK_LOCK_MSG, key, 0,
-				      sizeof(dlm_unlock_lock),
+	tmpret = net_register_handler(DLM_UNLOCK_LOCK_MSG, key, 
+				      NET_HND_VAR_LEN,
+				      DLM_UNLOCK_LOCK_MAX_LEN,
 				      dlm_unlock_lock_handler,
 				      dlm, netbuf);
 	if (tmpret)
 		goto error;
-	netbuf += L1_CACHE_ALIGN(sizeof(dlm_unlock_lock));
+	netbuf += L1_CACHE_ALIGN(DLM_UNLOCK_LOCK_MAX_LEN);
 				
-	tmpret = net_register_handler(DLM_PROXY_AST_MSG, key, 0, 
-				      sizeof(dlm_proxy_ast), 
+	tmpret = net_register_handler(DLM_PROXY_AST_MSG, key, 
+				      NET_HND_VAR_LEN,
+				      DLM_PROXY_AST_MAX_LEN,
 				      dlm_proxy_ast_handler,
 				      dlm, netbuf);
 	if (tmpret)
 		goto error;
-	netbuf += L1_CACHE_ALIGN(sizeof(dlm_proxy_ast));
-// dlmprintk("netbuf=%p net_buf=%p diff=%d\n", netbuf, dlm->net_buf, ((char *)netbuf - (char *)dlm->net_buf));   // currently 768
+	netbuf += L1_CACHE_ALIGN(DLM_PROXY_AST_MAX_LEN);
+
+dlmprintk("netbuf=%p net_buf=%p diff=%d\n", netbuf, dlm->net_buf, ((char *)netbuf - (char *)dlm->net_buf));   // currently 960
 	
 	tmpret = dlm_launch_thread(dlm);
 	if (tmpret == 0)
@@ -1292,20 +1380,48 @@
 
 int dlm_do_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock)
 {
-	dlm_astlockfunc_t *fn = lock->ast;
+	int ret;
 
+	dlm_astlockfunc_t *fn;
+	dlm_lockstatus *lksb;
+
 	dlmprintk0("\n");
 
+	DLM_ASSERT(lock);
+	DLM_ASSERT(res);
+	DLM_ASSERT(lock->lksb);
+
+	lksb = lock->lksb;
+	fn = lock->ast;
+
+	if (res->owner == dlm->group_index) {
+		/* this node is the lockres master */
+		if (lksb->flags & DLM_LKSB_GET_LVB) {
+			dlmprintk("getting lvb from lockres for %s node\n",
+				  lock->node == dlm->group_index ? "master" :
+				  "remote");
+			memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN);
+		} else if (lksb->flags & DLM_LKSB_PUT_LVB) {
+			dlmprintk("setting lvb from lockres for %s node\n",
+				  lock->node == dlm->group_index ? "master" :
+				  "remote");
+			memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
+		}
+	}
+
+	ret = 0;
 	if (lock->node != dlm->group_index) {
-		return dlm_send_proxy_ast(dlm, res, lock, DLM_AST, 0);
+		/* lock request came from another node
+		 * go do the ast over there */
+		ret = dlm_send_proxy_ast(dlm, res, lock, DLM_AST, 0);
+	} else {
+		DLM_ASSERT(fn);
+		(*fn)(lock->astdata);
 	}
-	if (!fn) {
-		dlmprintk("eek! lock has no ast %*s!  cookie=%llu\n", 
-		       res->lockname.len, res->lockname.name, lock->cookie);
-		return -EINVAL;
-	}
-	(*fn)(lock->astdata);
-	return 0;
+
+	/* reset any lvb flags on the lksb */
+	lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
+	return ret;
 }
 
 
@@ -1328,14 +1444,17 @@
 	return 0;
 }
 
+
 int dlm_send_proxy_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int type, int blocked_type)
 {
-	int ret = 0;
+	int ret = 0, arrsz = 1, msgsz = sizeof(dlm_proxy_ast);
 	dlm_proxy_ast past;
 	struct inode *inode = NULL;
+	net_data nd[2];
 	
 	dlmprintk("to=%u, type=%d, blocked_type=%d\n", lock->node, type, blocked_type);
 
+	memset(&past, 0, sizeof(dlm_proxy_ast));
 	past.node_idx = dlm->group_index;
 	past.type = type;
 	past.blocked_type = blocked_type;
@@ -1343,10 +1462,20 @@
 	strncpy(past.name, res->lockname.name, past.namelen);
 	past.cookie = lock->cookie;
 
+	nd[0].bytes = msgsz;
+	nd[0].ptr = &past;
+	if (lock->lksb->flags & DLM_LKSB_GET_LVB) {
+		past.flags |= LKM_GET_LVB;
+		nd[1].bytes = DLM_LVB_LEN;
+		nd[1].ptr = lock->lksb->lvb;
+		arrsz++;
+		msgsz += DLM_LVB_LEN;
+	}
+
 	ret = -EINVAL;
 	inode = nm_get_group_node_by_index(dlm->group, lock->node);
 	if (inode) {
-		ret = net_send_message(DLM_PROXY_AST_MSG, dlm->key, &past, sizeof(past), inode, NULL);
+		ret = net_send_message_arr(DLM_PROXY_AST_MSG, dlm->key, arrsz, nd, msgsz, inode, NULL);
 		iput(inode);
 	}
 	if (ret < 0) {
@@ -1365,7 +1494,17 @@
 	struct qstr lockname = { .name=past->name, .len=past->namelen };
 	struct list_head *iter, *head=NULL;
 	u64 cookie = past->cookie;
+	u32 flags = past->flags;
 
+	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
+	     (LKM_PUT_LVB|LKM_GET_LVB)) {
+		dlmprintk("both PUT and GET lvb specified\n");
+		return DLM_BADARGS;
+	}
+
+	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : 
+		  (flags & LKM_GET_LVB ? "get lvb" : "none"));
+
 	lockname.hash = full_name_hash(lockname.name, lockname.len);
 	
 	dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
@@ -1431,6 +1570,11 @@
 		
 		lock->lksb->status = DLM_NORMAL;
 
+		/* if we requested the lvb, fetch it into our lksb now */
+		if (flags & LKM_GET_LVB) {
+			DLM_ASSERT(lock->lksb->flags & DLM_LKSB_GET_LVB);
+			memcpy(lock->lksb->lvb, past->lvb, DLM_LVB_LEN);
+		}
 		status = dlm_do_ast(dlm, res, lock);
 		dlmprintk("ast done: now... type=%d, convert_type=%d\n",
 			  lock->type, lock->convert_type);
@@ -1505,6 +1649,7 @@
 	dlm_create_lock *create = (dlm_create_lock *)msg->buf;
 	dlm_lock_resource *res;
 	dlm_lock *newlock;
+	dlm_lockstatus *lksb;
 	dlm_status status = DLM_NORMAL;
 	struct qstr lockname = { .name=create->name, .len=create->namelen };
 	
@@ -1516,6 +1661,12 @@
 	if (!newlock)
 		return DLM_SYSERR;
 	
+	lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
+	if (!lksb) {
+		kfree(newlock);
+		return DLM_SYSERR;
+	}
+		
 	memset(newlock, 0, sizeof(dlm_lock));
 	INIT_LIST_HEAD(&newlock->list);
 	INIT_LIST_HEAD(&newlock->ast_list);
@@ -1529,6 +1680,11 @@
 	newlock->astdata = NULL;
 	newlock->cookie = create->cookie;
 
+	memset(lksb, 0, sizeof(dlm_lockstatus));
+	newlock->lksb = lksb;
+	lksb->lockid = newlock;
+	lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
+
 	status = DLM_IVLOCKID;
 	res = dlm_lookup_lock(dlm, &lockname);
 	if (res) {
@@ -1557,21 +1713,34 @@
 	int tmpret;
 	dlm_status ret;
 	int status = 0;
+	int arrsz = 1, msgsz = sizeof(dlm_convert_lock);
+	net_data nd[2];
 
 	dlmprintk0("\n");
 
-	memset(&convert, 0, sizeof(convert));
+	memset(&convert, 0, sizeof(dlm_convert_lock));
 	convert.node_idx = dlm->group_index;
 	convert.requested_type = type;
 	convert.cookie = lock->cookie;
 	convert.namelen = res->lockname.len;
 	convert.flags = flags;
 	strncpy(convert.name, res->lockname.name, convert.namelen);
+	
+	nd[0].bytes = msgsz;
+	nd[0].ptr = &convert;
 
+	if (flags & LKM_PUT_LVB) {
+		/* extra data to send if we are updating lvb */
+		nd[1].bytes = DLM_LVB_LEN;
+		nd[1].ptr = lock->lksb->lvb;
+		arrsz++;
+		msgsz += DLM_LVB_LEN;
+	}
+
 	ret = DLM_NOLOCKMGR;
 	inode = nm_get_group_node_by_index(dlm->group, res->owner);
 	if (inode) {
-		tmpret = net_send_message(DLM_CONVERT_LOCK_MSG, dlm->key, &convert, sizeof(convert), inode, &status);
+		tmpret = net_send_message_arr(DLM_CONVERT_LOCK_MSG, dlm->key, arrsz, nd, msgsz, inode, &status);
 		if (tmpret >= 0) {
 			// successfully sent and received
 			ret = status;  // this is already a dlm_status
@@ -1585,6 +1754,8 @@
 	return ret;
 }
 
+
+
 int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data)
 {
 	dlm_ctxt *dlm = data;
@@ -1592,35 +1763,45 @@
 	dlm_lock_resource *res;
 	struct list_head *iter;
 	dlm_lock *lock;
+	dlm_lockstatus *lksb;
 	dlm_status status = DLM_NORMAL;
 	int found = 0;
 	struct qstr lockname = { .name=convert->name, .len=convert->namelen };
-	union {
-		u64 q;
-		u32 hilo[2];
-	} u1, u2, u3, u4, u5, u6, u7;
+	u32 flags = convert->flags;
 
+	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
+	     (LKM_PUT_LVB|LKM_GET_LVB)) {
+		dlmprintk("both PUT and GET lvb specified\n");
+		return DLM_BADARGS;
+	}
 
-	dlmprintk0("\n");
-	rdtsc(u1.hilo[0], u1.hilo[1]);
-
+	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : 
+		  (flags & LKM_GET_LVB ? "get lvb" : "none"));
+	
 	lockname.hash = full_name_hash(lockname.name, lockname.len);
-	rdtsc(u2.hilo[0], u2.hilo[1]);
 
 	status = DLM_IVLOCKID;
 	res = dlm_lookup_lock(dlm, &lockname);
-	rdtsc(u3.hilo[0], u3.hilo[1]);
 	if (res) {
 		spin_lock(&res->spinlock);
-	rdtsc(u4.hilo[0], u4.hilo[1]);
 		list_for_each(iter, &res->granted) {
 			lock = list_entry(iter, dlm_lock, list);
 			if (lock->cookie == convert->cookie &&
 			    lock->node == convert->node_idx) {
 				found = 1;
-	rdtsc(u5.hilo[0], u5.hilo[1]);
-				status = dlmconvert_local(dlm, res, lock, convert->flags, convert->requested_type);
-	rdtsc(u6.hilo[0], u6.hilo[1]);
+				lksb = lock->lksb;
+				if (flags & LKM_PUT_LVB) {
+					DLM_ASSERT(!(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)));
+					lksb->flags |= DLM_LKSB_PUT_LVB;
+					memcpy(&lksb->lvb[0], &convert->lvb[0], DLM_LVB_LEN);
+				} else if (flags & LKM_GET_LVB) {
+					DLM_ASSERT(!(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)));
+					lksb->flags |= DLM_LKSB_GET_LVB;
+				}
+				status = dlmconvert_local(dlm, res, lock, flags, convert->requested_type);
+				if (status != DLM_NORMAL) {
+					lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
+				}
 				break;
 			}
 		}
@@ -1629,9 +1810,6 @@
 	if (!found)
 		dlmprintk("failed to find lock to convert on grant queue!  cookie=%llu\n", convert->cookie);
 
-	rdtsc(u7.hilo[0], u7.hilo[1]);
-	dlmprintk("1-2:%llu 2-3:%llu 3-4:%llu 4-5:%llu 5-6:%llu 6-7:%llu\n",
-		  u2.q-u1.q, u3.q-u2.q, u4.q-u3.q, u5.q-u4.q, u6.q-u5.q, u7.q-u6.q);
 	return status;
 }
 

Modified: trunk/cluster/dlmmod.h
===================================================================
--- trunk/cluster/dlmmod.h	2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/dlmmod.h	2004-12-23 21:48:17 UTC (rev 1718)
@@ -74,36 +74,42 @@
 #define LKM_MODEMASK    0xff
 
 
-/* TODO: Flags which OCFS2 will require: 
- *       - LKM_LOCAL
- *       - LKM_VALBLK
- *       - LKM_NOQUEUE
- *       - LKM_CONVERT
- *       - LKM_CANCEL   */
-#define LKM_ORPHAN      0x10            /* this lock is orphanable */    /* unsupported */
-#define LKM_PARENTABLE  0x20            /* this lock was orphaned */    /* unsupported */
-#define LKM_BLOCK       0x40            /* blocking lock request */    /* unsupported */
-#define LKM_LOCAL       0x80            /* local lock request */    
-#define LKM_VALBLK      0x100           /* lock value block request */
-#define LKM_NOQUEUE     0x200           /* non blocking request */
-#define LKM_CONVERT     0x400           /* conversion request */
-#define LKM_NODLCKWT    0x800           /* this lock wont deadlock */    /* unsupported */
-#define LKM_UNLOCK      0x1000          /* deallocate this lock */
-#define LKM_CANCEL      0x2000          /* cancel conversion request */
-#define LKM_DEQALL      0x4000          /* remove all locks held by proc */    /* unsupported */
-#define LKM_INVVALBLK   0x8000          /* invalidate lock value block */
-#define LKM_SYNCSTS     0x10000         /* return synchronous status if poss */    /* unsupported */
-#define LKM_TIMEOUT     0x20000         /* lock request contains timeout */    /* unsupported */
-#define LKM_SNGLDLCK    0x40000         /* request can self-deadlock */    /* unsupported */
-#define LKM_FINDLOCAL   0x80000         /* find local lock request */    /* unsupported */
-#define LKM_PROC_OWNED  0x100000        /* owned by process, not group */    /* unsupported */
-#define LKM_XID         0x200000        /* use transaction id for deadlock */    /* unsupported */
-#define LKM_XID_CONFLICT 0x400000       /* do not allow lock inheritance */    /* unsupported */
-#define LKM_FORCE       0x800000        /* force unlock flag */
-#define LKM_REVVALBLK   0x1000000       /* temporary solution: re-validate lock value block */    /* unsupported */
+/* reserved: flags used by the "real" dlm, only a few are supported by this dlm */
+#define LKM_ORPHAN       0x00000010  /* this lock is orphanable */    /* unsupported */
+#define LKM_PARENTABLE   0x00000020  /* this lock was orphaned */    /* unsupported */
+#define LKM_BLOCK        0x00000040  /* blocking lock request */    /* unsupported */
+#define LKM_LOCAL        0x00000080  /* local lock request */    
+#define LKM_VALBLK       0x00000100  /* lock value block request */
+#define LKM_NOQUEUE      0x00000200  /* non blocking request */
+#define LKM_CONVERT      0x00000400  /* conversion request */
+#define LKM_NODLCKWT     0x00000800  /* this lock wont deadlock */    /* unsupported */
+#define LKM_UNLOCK       0x00001000  /* deallocate this lock */
+#define LKM_CANCEL       0x00002000  /* cancel conversion request */
+#define LKM_DEQALL       0x00004000  /* remove all locks held by proc */    /* unsupported */
+#define LKM_INVVALBLK    0x00008000  /* invalidate lock value block */
+#define LKM_SYNCSTS      0x00010000  /* return synchronous status if poss */    /* unsupported */
+#define LKM_TIMEOUT      0x00020000  /* lock request contains timeout */    /* unsupported */
+#define LKM_SNGLDLCK     0x00040000  /* request can self-deadlock */    /* unsupported */
+#define LKM_FINDLOCAL    0x00080000  /* find local lock request */    /* unsupported */
+#define LKM_PROC_OWNED   0x00100000  /* owned by process, not group */    /* unsupported */
+#define LKM_XID          0x00200000  /* use transaction id for deadlock */    /* unsupported */
+#define LKM_XID_CONFLICT 0x00400000  /* do not allow lock inheritance */    /* unsupported */
+#define LKM_FORCE        0x00800000  /* force unlock flag */
+#define LKM_REVVALBLK    0x01000000  /* temporary solution: re-validate lock value block */    /* unsupported */
+/* unused */
+#define LKM_UNUSED1      0x00000001  /* unused */
+#define LKM_UNUSED2      0x00000002  /* unused */
+#define LKM_UNUSED3      0x00000004  /* unused */
+#define LKM_UNUSED4      0x00000008  /* unused */
+#define LKM_UNUSED5      0x02000000  /* unused */
+#define LKM_UNUSED6      0x04000000  /* unused */
+#define LKM_UNUSED7      0x08000000  /* unused */
+#define LKM_UNUSED8      0x10000000  /* unused */
+/* ocfs2 extensions: internal only; should never be used by caller */
+#define LKM_PUT_LVB      0x20000000  /* extension: lvb is being passed, should be applied to lockres */
+#define LKM_GET_LVB      0x40000000  /* extension: lvb should be copied from lockres when lock granted */
+#define LKM_RECOVERY     0x80000000  /* extension: flag for recovery lock, used to avoid recovery rwsem */
 
-#define LKM_RECOVERY    0x80000000      /* extension: flag for recovery lock, used to avoid recovery rwsem */
-
 #define LKM_VALID_FLAGS (LKM_VALBLK | LKM_CONVERT | LKM_UNLOCK | \
 			 LKM_CANCEL | LKM_INVVALBLK | LKM_FORCE | \
 			 LKM_RECOVERY | LKM_LOCAL | LKM_NOQUEUE)
@@ -247,8 +253,18 @@
 } dlm_lock;
 
 
+#define DLM_LKSB_KERNEL_ALLOCATED  0x01  // allocated on master node on behalf of remote node
+#define DLM_LKSB_PUT_LVB           0x02
+#define DLM_LKSB_GET_LVB           0x04
+#define DLM_LKSB_UNUSED2           0x08
+#define DLM_LKSB_UNUSED3           0x10
+#define DLM_LKSB_UNUSED4           0x20
+#define DLM_LKSB_UNUSED5           0x40
+#define DLM_LKSB_UNUSED6           0x80
+
 struct _dlm_lockstatus {
-	dlm_status status;
+	dlm_status status;  // can we just change this to a u8 or u16?
+	u32 flags;           
 	dlm_lock *lockid;
 	char lvb[DLM_LVB_LEN];
 };

Modified: trunk/cluster/dlmthread.c
===================================================================
--- trunk/cluster/dlmthread.c	2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/dlmthread.c	2004-12-23 21:48:17 UTC (rev 1718)
@@ -136,14 +136,9 @@
 		list_del(&target->list);
 		list_add_tail(&target->list, &res->granted);
 
-		if (target->node == dlm->group_index) {
-			DLM_ASSERT(target->lksb);
+		DLM_ASSERT(target->lksb);
+		target->lksb->status = DLM_NORMAL;
 
-			target->lksb->status = DLM_NORMAL;
-		} else {
-			dlmprintk0("nonlocal lock, not setting DLM_NORMAL in lksb\n");
-		}
-
 		spin_unlock(&target->spinlock);
 
 		if (dlm_do_ast(dlm, res, target) < 0)
@@ -197,14 +192,9 @@
 		list_del(&target->list);
 		list_add_tail(&target->list, &res->granted);
 
-		if (target->node == dlm->group_index) {
-			DLM_ASSERT(target->lksb);
+		DLM_ASSERT(target->lksb);
+		target->lksb->status = DLM_NORMAL;
 		
-			target->lksb->status = DLM_NORMAL;
-		} else {
-			dlmprintk0("nonlocal lock, not setting DLM_NORMAL in lksb\n");
-		}
-		
 		spin_unlock(&target->spinlock);
 
 		if (dlm_do_ast(dlm, res, target) < 0)

Modified: trunk/cluster/nodemanager.c
===================================================================
--- trunk/cluster/nodemanager.c	2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/nodemanager.c	2004-12-23 21:48:17 UTC (rev 1718)
@@ -1077,7 +1077,11 @@
 {
 	struct dentry *dentry = NULL;
 	struct inode *inode = NULL;
-	
+
+	NM_ASSERT(node_name);
+	NM_ASSERT(single_sb);
+	NM_ASSERT(single_sb->s_root);
+
 	dentry = lookup_one_len(node_name, single_sb->s_root, strlen(node_name));
 	if (!IS_ERR(dentry)) {
 		inode = dentry->d_inode;

Modified: trunk/cluster/nodemanager.h
===================================================================
--- trunk/cluster/nodemanager.h	2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/nodemanager.h	2004-12-23 21:48:17 UTC (rev 1718)
@@ -28,6 +28,7 @@
 #ifndef CLUSTER_NODEMANAGER_H
 #define CLUSTER_NODEMANAGER_H
 
+#define NM_ASSERT(x)       ({  if (!(x)) { printk("nm: assert failed! %s:%d\n", __FILE__, __LINE__); BUG(); } })
 
 
 struct _nm_ctxt

Modified: trunk/cluster/tcp.c
===================================================================
--- trunk/cluster/tcp.c	2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/tcp.c	2004-12-23 21:48:17 UTC (rev 1718)
@@ -97,6 +97,12 @@
 static struct completion net_recv_complete;
 
 
+static inline void net_abort_status_return(net_status_ctxt *nsc)
+{
+	spin_lock(&net_status_lock);
+	list_del(&nsc->list);
+	spin_unlock(&net_status_lock);
+}
 
 /////////////////////
 static void net_shutdown(void);
@@ -147,7 +153,7 @@
 {
 	atomic_dec(&nmh->refcnt);
 	if (!atomic_read(&nmh->refcnt)) {
-		if (net_handler_in_use(nmh))
+		if (nmh->flags & NET_HND_IN_USE)
 			netprintk0("EEEEK! killing inuse handler! bugbug!\n");
 		kfree(nmh);
 	}
@@ -156,7 +162,7 @@
 static inline void net_put_handler(net_msg_handler *nmh)
 {
 	if (atomic_dec_and_lock(&nmh->refcnt, &net_handler_lock)) {
-		if (net_handler_in_use(nmh))
+		if (nmh->flags & NET_HND_IN_USE)
 			netprintk0("EEEEK! killing inuse handler! bugbug!\n");
 		kfree(nmh);
 		spin_unlock(&net_handler_lock);
@@ -649,7 +655,7 @@
 		netprintk("no such message type: %u/%u\n", msg_type, key);
 		return NULL;
 	}
-	if (net_handler_msg_len_ok(handler, len)) {
+	if (!net_handler_msg_len_ok(handler, len)) {
 		netprintk("len for message type %u incorrect: %u, should be %u\n", 
 		       msg_type, len, handler->max_len);
 		goto done;
@@ -873,7 +879,7 @@
 		return -EINVAL;
 	}
 
-	if (net_handler_msg_len_ok(handler, len)) {
+	if (!net_handler_msg_len_ok(handler, len)) {
 		netprintk("len for message type %u incorrect: %u, should be %u\n", 
 		       msg_type, len, handler->max_len);
 		ret = -EINVAL;
@@ -925,27 +931,167 @@
 		add_wait_queue(net->sock->sk->sk_sleep, &sleep);
 		spin_unlock(&net->sock_lock); 
 	}
+
+	ret = net_send_tcp_msg(inode, NULL, msg, packet_len);
+
+	if (status) {
+		if (ret >= 0) {
+			/* wait on other node's handler */
+			tmpret = util_wait_atomic_eq(&nsc.wq, &nsc.woken, 1, 0);
+			if (tmpret==0) {
+				*status = nsc.status;
+				netprintk("status return requested, status is %d\n", *status);
+			} else {
+				ret = tmpret;
+				net_abort_status_return(&nsc);
+				netprintk0("net_abort_status_return called\n");
+				netprintk("status return requested, and error occurred while waiting=%d\n", ret);
+				*status = ret;
+			}
+			remove_wait_queue(recv_sock->sk->sk_sleep, &sleep);
+		} else {
+			netprintk("status return requested, and error returned from net_send_tcp_msg=%d\n", ret);
+			/* return bad status right away */
+			*status = ret;
+		}
+	} else if (ret < 0) {
+		netprintk("no status return requested, but error returned from net_send_tcp_msg=%d\n", ret);
+	}
+	
+done:
+	if (handler)
+		net_put_handler(handler);
+	if (msg)
+		kfree(msg);
+	return ret;
+}
+
+
+int net_send_message_arr(u32 msg_type, u32 key, int arrlen, net_data *arr, u32 len, struct inode *inode, int *status)
 {
-	union {
-		u64 q;
-		u32 hilo[2];
-	} u1, u2;
-	rdtsc(u1.hilo[0], u1.hilo[1]);
+	int ret = 0, tmpret, i;
+	net_msg *msg = NULL;
+	net_msg_handler *handler = NULL;
+	u32 packet_len;
+	net_status_ctxt nsc;
+	wait_queue_t sleep;
+	nm_node_inode_private *priv = NULL;
+	net_inode_private *net = NULL;
+	char *src, *dest;
 
+	if (!inode || !inode->u.generic_ip) {
+		netprintk0("bad inode, cannot send message\n");
+		return -EINVAL;
+	}
+	if (arrlen <= 0) {
+		netprintk0("bad data array length\n");
+		return -EINVAL;
+	}
+	priv = (nm_node_inode_private *)inode->u.generic_ip;
+	net = &priv->net;
+	spin_lock(&net->sock_lock); 
+	if (!net->sock) {
+		spin_unlock(&net->sock_lock);
+		ret = net_init_tcp_sock(inode);
+		if (!(ret == 0 || ret == -EEXIST)) {
+			netprintk0("failed to create socket!");
+			return -EINVAL;
+		}
+	}
+	spin_unlock(&net->sock_lock); 
+	
 
+	spin_lock(&net_handler_lock);
+	handler = net_lookup_handler(msg_type, key);
+	spin_unlock(&net_handler_lock);
+	
+	if (!handler) {
+		netprintk("no such message type: %u/%u\n", msg_type, key);
+		return -EINVAL;
+	}
+
+	if (!net_handler_msg_len_ok(handler, len)) {
+		netprintk("len for message type %u incorrect: %u, should be %u\n", 
+		       msg_type, len, handler->max_len);
+		ret = -EINVAL;
+		goto done;
+	}
+	packet_len = len + sizeof(net_msg);
+	msg = kmalloc(packet_len, GFP_KERNEL);
+	if (!msg) {
+		netprintk("failed to allocate %u bytes for message!\n", packet_len);
+		ret = -ENOMEM;
+		goto done;
+	}
+	memset(msg, 0, packet_len);
+	msg->magic = NET_MSG_MAGIC;
+	msg->data_len = len;
+	msg->msg_type = msg_type;
+	msg->key = key;
+	spin_lock(&net_msg_num_lock);
+	msg->msg_num = net_msg_num;
+	if (net_msg_num == NET_MSG_NUM_MAX) {
+		netprintk0("eek!  net_msg_num wrapping to 1 now...\n");
+		net_msg_num = 1;
+	}
+	spin_unlock(&net_msg_num_lock);
+	if (len > 0) {
+		int tmplen = len;
+		dest = &(msg->buf[0]);
+		for (i=0; i<arrlen; i++) {
+			src = arr[i].ptr;
+			if (arr[i].bytes > tmplen) {
+				netprintk0("data array is too large!\n");
+				kfree(msg);
+				return -EINVAL;
+			}
+			memcpy(dest, src, arr[i].bytes);
+			tmplen -= arr[i].bytes;
+			dest += arr[i].bytes;
+		}
+	}
+
+	/* does the caller want to wait for a simple status? */
+	if (status) {
+		msg->status = 1;
+
+		INIT_LIST_HEAD(&nsc.list);
+		init_waitqueue_head(&nsc.wq);
+		atomic_set(&nsc.woken, 0);
+		nsc.msg_num = msg->msg_num;
+		nsc.status = 0;
+		spin_lock(&net_status_lock);
+		list_add(&nsc.list, &net_status_list);
+		spin_unlock(&net_status_lock);
+
+		init_waitqueue_entry(&sleep, current);
+		spin_lock(&net->sock_lock);
+		if (!net->sock) {
+			spin_unlock(&net->sock_lock);
+			netprintk0("caller wanted status return but socket went away!\n");
+			kfree(msg);
+			return -EINVAL;
+		}
+		add_wait_queue(net->sock->sk->sk_sleep, &sleep);
+		spin_unlock(&net->sock_lock); 
+	}
+
 	ret = net_send_tcp_msg(inode, NULL, msg, packet_len);
 
-	rdtsc(u2.hilo[0], u2.hilo[1]);
-	netprintk("net_send_tcp_msg took %llu cycles\n", u2.q-u1.q);
 	if (status) {
 		if (ret >= 0) {
 			/* wait on other node's handler */
-			rdtsc(u1.hilo[0], u1.hilo[1]);
 			tmpret = util_wait_atomic_eq(&nsc.wq, &nsc.woken, 1, 0);
-			rdtsc(u2.hilo[0], u2.hilo[1]);
-			netprintk("waiting on status took %llu cycles\n", u2.q-u1.q);
-			*status = nsc.status;
-			netprintk("status return requested, status is %d\n", *status);
+			if (tmpret==0) {
+				*status = nsc.status;
+				netprintk("status return requested, status is %d\n", *status);
+			} else {
+				ret = tmpret;
+				net_abort_status_return(&nsc);
+				netprintk0("net_abort_status_return called\n");
+				netprintk("status return requested, and error occurred while waiting=%d\n", ret);
+				*status = ret;
+			}
 			remove_wait_queue(recv_sock->sk->sk_sleep, &sleep);
 		} else {
 			netprintk("status return requested, and error returned from net_send_tcp_msg=%d\n", ret);
@@ -955,8 +1101,7 @@
 	} else if (ret < 0) {
 		netprintk("no status return requested, but error returned from net_send_tcp_msg=%d\n", ret);
 	}
-}
-	
+
 done:
 	if (handler)
 		net_put_handler(handler);
@@ -984,10 +1129,6 @@
 	net_msg_handler *hnd = NULL;
 	int err = 0;
 	int tmperr;
-	union {
-		u64 q;
-		u32 hilo[2];
-	} u1, u2, u3, u4;
 
 
 start_over:	
@@ -1031,13 +1172,9 @@
 			  hdr.magic, hdr.msg_type, hdr.key);
 
 		if (hdr.magic == NET_MSG_STATUS_MAGIC) {
-rdtsc(u1.hilo[0], u1.hilo[1]);
 			net_dump_msg(sock, inode);
 			/* special type for returning message status */
-rdtsc(u2.hilo[0], u2.hilo[1]);
 			net_do_status_return(hdr.msg_num, hdr.status);
-rdtsc(u3.hilo[0], u3.hilo[1]);
-netprintk("status return: net_dump_msg took %llu, net_do_status_return took %llu\n", u2.q-u1.q, u3.q-u2.q);
 			err = 0;
 			goto error;
 		} else if (hdr.magic != NET_MSG_MAGIC) {
@@ -1068,11 +1205,7 @@
 			netprintk0("no handler for message.\n");
 			goto error;
 		}
-netprintk0("about to dispatch message\n");		
-rdtsc(u1.hilo[0], u1.hilo[1]);
 		err = net_dispatch_message(inode, sock, &hdr, hnd);
-rdtsc(u2.hilo[0], u2.hilo[1]);
-netprintk("net_dispatch_message took %llu\n", u2.q-u1.q);
 
 		/* if node has requested status return, do it now */
 		if (hdr.status) {
@@ -1083,11 +1216,7 @@
 #endif
 			hdr.status = err;
 			hdr.magic = NET_MSG_STATUS_MAGIC;  // twiddle the magic
-netprintk("about to send return message, status=%d\n", err);
-rdtsc(u3.hilo[0], u3.hilo[1]);
 			tmperr = net_send_tcp_msg(inode, sock, &hdr, sizeof(net_msg));
-rdtsc(u4.hilo[0], u4.hilo[1]);
-netprintk("status return (net_send_tcp_msg) took %llu\n", u4.q-u3.q);
 		} else if (err < 0) {
 			netprintk("dispatch (%u/%u) returned %d\n",
 				  hdr.msg_type, hdr.key, err);
@@ -1100,7 +1229,6 @@
 		spin_lock(&net_list_lock);
 		list_add_tail(&net->list, &net_recv_list);
 		spin_unlock(&net_list_lock);
-netprintk0("all done with this round, starting over\n");		
 		goto start_over;
 
 error:
@@ -1131,6 +1259,8 @@
 }
 
 
+
+
 void net_do_status_return(u64 msg_num, s32 status)
 {
 	net_status_ctxt *nsc;
@@ -1160,7 +1290,7 @@
 	packet_len = len + sizeof(net_msg);
 
 	spin_lock(&hnd->lock);
-	if (net_handler_in_use(hnd)) {
+	if (hnd->flags & NET_HND_IN_USE) {
 		netprintk0("EEEEEK!  handler in use! bugbug\n");
 		spin_unlock(&hnd->lock);
 		return -EINVAL;
@@ -1171,7 +1301,7 @@
 		spin_unlock(&hnd->lock);
 		return -EINVAL;
 	}
-	hnd->flags |= (1 << NET_HND_IN_USE);
+	hnd->flags |= NET_HND_IN_USE;
 	spin_unlock(&hnd->lock);
 
 	memset(hnd->buf, 0, packet_len);
@@ -1184,7 +1314,7 @@
 	}
 	
 	spin_lock(&hnd->lock);
-	hnd->flags &= ~(1 << NET_HND_IN_USE);
+	hnd->flags &= ~NET_HND_IN_USE;
 	spin_unlock(&hnd->lock);
 
 	return ret;

Modified: trunk/cluster/tcp.h
===================================================================
--- trunk/cluster/tcp.h	2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/tcp.h	2004-12-23 21:48:17 UTC (rev 1718)
@@ -126,18 +126,15 @@
 	return 0;
 }
 		       
-enum {
-	NET_HND_VAR_LEN = 0,
-	NET_HND_IN_USE,
-};
 
-#define net_handler_variable_len(h)   ((h)->flags & (1 << NET_HND_VAR_LEN))
-#define net_handler_in_use(h)         ((h)->flags & (1 << NET_HND_IN_USE))
+#define NET_HND_VAR_LEN   0x00000001
+#define NET_HND_IN_USE    0x00000002
 
 static inline int net_handler_msg_len_ok(net_msg_handler *handler, u32 len)
 {
-	return (net_handler_variable_len(handler) ? 
-		len > handler->max_len : len != handler->max_len);
+	return ((handler->flags & NET_HND_VAR_LEN) ? 
+		len <= handler->max_len : 
+		len == handler->max_len);
 }
 
 
@@ -196,6 +193,11 @@
 	NET_DRIVER_READY,
 };
 
+typedef struct _net_data
+{
+	int bytes;
+	void *ptr;
+} net_data;
 
 int net_register_handler(u32 msg_type, u32 key, int flags, 
 			 u32 max_len, net_msg_handler_func *func, void *data, void *buf);
@@ -205,6 +207,7 @@
 int net_send_error(struct socket *sock, u32 err_type);
 int net_init_tcp_sock(struct inode *inode);
 int net_send_message(u32 msg_type, u32 key, void *data, u32 len, struct inode *inode, int *status);
+int net_send_message_arr(u32 msg_type, u32 key, int arrlen, net_data *arr, u32 len, struct inode *inode, int *status);
 int net_broadcast_message(u32 msg_type, u32 key, void *data, u32 len, struct inode *group);
 net_msg_handler * net_lookup_handler(u32 msg_type, u32 key);
 



More information about the Ocfs2-commits mailing list