[Ocfs2-commits] khackel commits r848 - in trunk/src: . inc

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Fri Apr 16 20:48:32 CDT 2004


Author: khackel
Date: 2004-04-16 19:48:30 -0500 (Fri, 16 Apr 2004)
New Revision: 848

Modified:
   trunk/src/alloc.c
   trunk/src/dcache.c
   trunk/src/dlm.c
   trunk/src/file.c
   trunk/src/hash.c
   trunk/src/inc/io.h
   trunk/src/inc/journal.h
   trunk/src/inc/ocfs.h
   trunk/src/inc/proto.h
   trunk/src/journal.c
   trunk/src/namei.c
   trunk/src/nm.c
   trunk/src/oin.c
   trunk/src/osb.c
   trunk/src/proc.c
   trunk/src/super.c
   trunk/src/vote.c
Log:
Big change for ocfs_vote_obj and lockres.
Moved all temporary state fields from lockres to the vote obj.
Made a queue of these objects that can be run by the vote processing 
thread, and also a /proc interface to inspect them.
Changed name of almost every ocfs_lock_res to "lockres".



Modified: trunk/src/alloc.c
===================================================================
--- trunk/src/alloc.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/alloc.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -2723,7 +2723,7 @@
 	static __u32 LargeAllocOffset = 0;
 	static __u32 SmallAllocOffset = 0;
 	bool bLockAcquired = false;
-	ocfs_lock_res *pLockResource = NULL;
+	ocfs_lock_res *lockres = NULL;
         struct buffer_head *bh = NULL;
         ocfs_bitmap_lock *bm_lock = NULL;
 	__u32 bitmapblocks; /* we only care about the valid blocks */
@@ -2744,7 +2744,7 @@
 		/* Get the allocation lock here */
 		status = ocfs_acquire_lock (osb, OCFS_BITMAP_LOCK_OFFSET,
 					    OCFS_DLM_EXCLUSIVE_LOCK, 0, 
-					    &pLockResource, &bh, NULL);
+					    &lockres, &bh, NULL);
 		if (status < 0) {
 			if (status != -EINTR)
 				LOG_ERROR_STATUS (status);
@@ -2885,14 +2885,14 @@
 			tmpstat = ocfs_release_lock(osb, 
 						    OCFS_BITMAP_LOCK_OFFSET,
 						    OCFS_DLM_EXCLUSIVE_LOCK, 
-						    0, pLockResource, bh, 
+						    0, lockres, bh, 
 						    NULL);
 			if (tmpstat < 0)
 				LOG_ERROR_STATUS (tmpstat);
 		}
 		if (bh != NULL)
 			brelse(bh);
-		ocfs_put_lockres (pLockResource);
+		ocfs_put_lockres (lockres);
 	}
 
 	LOG_EXIT_STATUS (status);
@@ -2922,7 +2922,7 @@
 	__u32 foundBit = -1;
 	__u32 blockSize = 0;
 	bool bLockAcquired = false;
-	ocfs_lock_res *pLockResource = NULL;
+	ocfs_lock_res *lockres = NULL;
 	__u32 bm_file = 0;
 	__u32 alloc_file = 0;
 	struct buffer_head *bh = NULL;
@@ -2954,7 +2954,7 @@
 
 	/* Get a lock on the file */
 	status = ocfs_acquire_lock (osb, lockId, OCFS_DLM_EXCLUSIVE_LOCK,
-			     FLAG_FILE_CREATE, &pLockResource, &bh, NULL);
+			     FLAG_FILE_CREATE, &lockres, &bh, NULL);
 	if (status < 0) {
 		LOG_ERROR_STATUS (status);
 		goto leave;
@@ -3119,14 +3119,14 @@
 		ocfs_journal_add_lock(handle, lockId, 
 				      OCFS_DLM_EXCLUSIVE_LOCK, 
 				      FLAG_FILE_CREATE, 
-				      pLockResource, bh, NULL);
+				      lockres, bh, NULL);
 		tmpstat = 0;
 	} else 	if (bLockAcquired) {
 		tmpstat =
 		    ocfs_release_lock (osb, lockId, OCFS_DLM_EXCLUSIVE_LOCK,
-				     FLAG_FILE_CREATE, pLockResource, bh, NULL);
+				     FLAG_FILE_CREATE, lockres, bh, NULL);
 
-		ocfs_put_lockres (pLockResource);
+		ocfs_put_lockres (lockres);
 	}
 
 	if (tmpstat < 0)
@@ -3395,7 +3395,7 @@
 				   bool in_recovery)
 {
 	int status = 0, tmpstat;
-	ocfs_lock_res *bm_lock_res = NULL;
+	ocfs_lock_res *lockres = NULL;
 	__u32 bitmapblocks;
 	struct buffer_head *bh = NULL;
 	int bit_off, left;
@@ -3429,7 +3429,7 @@
 					    OCFS_DLM_EXCLUSIVE_LOCK, 
 					    (in_recovery) ? FLAG_FILE_RECOVERY 
 					    : 0, 
-					    &bm_lock_res, &bh, NULL);
+					    &lockres, &bh, NULL);
 		if (status < 0) {
 			if (status != -EINTR)
 				LOG_ERROR_STATUS (status);
@@ -3491,14 +3491,14 @@
 	if (local_lock) {
 		ocfs_up_sem (&(osb->vol_alloc_lock));
 
-		if (bm_lock_res) {
+		if (lockres) {
 			tmpstat = ocfs_release_lock (osb, 
 						     OCFS_BITMAP_LOCK_OFFSET,
 						     OCFS_DLM_EXCLUSIVE_LOCK, 
-						     0, bm_lock_res, bh, NULL);
+						     0, lockres, bh, NULL);
 			if (tmpstat < 0)
 				LOG_ERROR_STATUS (tmpstat);
-			ocfs_put_lockres (bm_lock_res);
+			ocfs_put_lockres (lockres);
 		}
 		if (bh != NULL)
 			brelse(bh);
@@ -3651,7 +3651,7 @@
 	__u32 tmpwanted;
 	/* main bitmap variables. */
 	struct buffer_head *main_bm_bh = NULL;
-	ocfs_lock_res *bm_lock_res = NULL;
+	ocfs_lock_res *lockres = NULL;
 	void *bitmap;
 
 	LOG_ENTRY_ARGS("(bitswanted = %u)\n", bitswanted);
@@ -3725,7 +3725,7 @@
 		/* Get the allocation lock here */
 		status = ocfs_acquire_lock (osb, OCFS_BITMAP_LOCK_OFFSET,
 					    OCFS_DLM_EXCLUSIVE_LOCK, 0, 
-					    &bm_lock_res, &main_bm_bh, NULL);
+					    &lockres, &main_bm_bh, NULL);
 		if (status < 0) {
 			ocfs_up_sem (&(osb->vol_alloc_lock));
 			if (status != -EINTR)
@@ -3791,10 +3791,10 @@
 		tmpstat = ocfs_release_lock (osb, 
 					     OCFS_BITMAP_LOCK_OFFSET,
 					     OCFS_DLM_EXCLUSIVE_LOCK, 
-					     0, bm_lock_res, main_bm_bh, NULL);
+					     0, lockres, main_bm_bh, NULL);
 		if (tmpstat < 0)
 			LOG_ERROR_STATUS (tmpstat);
-		ocfs_put_lockres (bm_lock_res);
+		ocfs_put_lockres (lockres);
 		brelse(main_bm_bh);
 	}
 

Modified: trunk/src/dcache.c
===================================================================
--- trunk/src/dcache.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/dcache.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -81,18 +81,18 @@
 
         /* check for oin */
 	if (inode_data_is_oin (inode)) {
-                ocfs_lock_res *res  = NULL;
+                ocfs_lock_res *lockres  = NULL;
                 ret = 1;  /* with an oin we cannot fail revalidate */
 		oin = GET_INODE_OIN(inode);
 
-                if (ocfs_lookup_sector_node (osb, GET_INODE_FEOFF(inode), &res)==0) {
+                if (ocfs_lookup_sector_node (osb, GET_INODE_FEOFF(inode), &lockres)==0) {
                         /* if I hold cache lock, no revalidate needed */
-                        if (res->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK &&
-                            res->master_node_num == osb->node_num) {
-				ocfs_put_lockres (res);
+                        if (lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK &&
+                            lockres->master_node_num == osb->node_num) {
+				ocfs_put_lockres (lockres);
                                 goto bail;
                         } else
-				ocfs_put_lockres (res);
+				ocfs_put_lockres (lockres);
                 }
                 /* hit the disk */
                 /* TODO: optimize */

Modified: trunk/src/dlm.c
===================================================================
--- trunk/src/dlm.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/dlm.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -37,7 +37,6 @@
 
 static inline int ocfs_wait_for_readonly_drop(ocfs_super *osb, ocfs_lock_res *lockres, struct inode *inode);
 
-static int ocfs_send_dlm_request_msg (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, __u64 vote_map, struct inode *inode);
 static int ocfs_request_vote (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, __u64 vote_map, __u64 * lock_seq_num, struct inode *inode);
 static int ocfs_wait_for_vote (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, __u64 vote_map, __u32 time_to_wait, __u64 lock_seq_num, ocfs_lock_res * lockres);
 static int ocfs_reset_voting (ocfs_super * osb);
@@ -50,6 +49,7 @@
 int ocfs_disk_release_lock (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, struct inode *inode);
 static int ocfs_zap_child_buffers_func(struct dentry *dentry, void *data);
 
+
 /*
  * ocfs_disk_request_vote()
  *
@@ -404,7 +404,6 @@
 		lockres->lock_type = DISK_LOCK_FILE_LOCK (fe);
 		lockres->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
 		lockres->oin_openmap = DISK_LOCK_OIN_MAP (fe);
-		lockres->last_lock_upd = DISK_LOCK_LAST_WRITE (fe);
 		if (lockres->readonly_node != OCFS_INVALID_NODE_NUM) {
 			if (lockres->readonly_node != lockres->master_node_num) {
 				LOG_ERROR_ARGS("(2) readonly node changed! was %d, now master is %d\n",
@@ -427,50 +426,6 @@
 }				/* ocfs_wait_for_lock_release */
 
 
-void ocfs_process_one_vote_reply(ocfs_super *osb, ocfs_vote_reply_ctxt *ctxt, __u32 node_num)
-{
-	int status;
-	int reply_status;
-	int open_handle = 0;
-	__u64 mask = 0;
-
-	if (ctxt->reply_method == DISK_VOTE) {
-		reply_status = ctxt->u.vote->vote[osb->node_num];
-		open_handle = ctxt->u.vote->open_handle;
-	} else {
-		reply_status = ctxt->u.reply->status;
-		open_handle = ctxt->u.reply->h.open_handle;
-	}
-
-	status = 0;
-	mask = 1 << node_num;
-	switch (reply_status) {
-		case FLAG_VOTE_NODE:
-			*(ctxt->got_vote_map) |= mask;
-			if (ctxt->flags & (FLAG_FILE_EXTEND|FLAG_FILE_UPDATE) && 
-			    open_handle)
-				*(ctxt->open_map) |= mask;
-			break;
-		case FLAG_VOTE_OIN_ALREADY_INUSE:
-			*(ctxt->got_vote_map) |= mask;
-			status = -EFAIL;
-			if (ctxt->flags & FLAG_FILE_DELETE)
-				status = -EBUSY;
-			break;
-		case FLAG_VOTE_OIN_UPDATED:
-			status = 0;
-			*(ctxt->got_vote_map) |= mask;
-			break;
-		case FLAG_VOTE_UPDATE_RETRY:
-			status = -EAGAIN;
-			break;
-		case FLAG_VOTE_FILE_DEL:
-			status = -ENOENT;
-			break;
-	}
-	*(ctxt->status) = status;
-}
-
 /*
  * ocfs_get_vote_on_disk()
  *
@@ -686,100 +641,7 @@
 	return status;
 }				/* ocfs_request_vote */
 
-
-
 /*
- * ocfs_init_dlm_msg()
- *
- */
-void ocfs_init_dlm_msg (ocfs_super * osb, ocfs_dlm_msg * dlm_msg, __u32 msg_len)
-{
-	LOG_ENTRY ();
-
-	dlm_msg->magic = OCFS_DLM_MSG_MAGIC;
-	dlm_msg->msg_len = msg_len;
-
-	memcpy (dlm_msg->vol_id, osb->vol_layout.vol_id, MAX_VOL_ID_LENGTH);
-
-	dlm_msg->src_node = osb->node_num;
-
-	LOG_EXIT ();
-	return;
-}				/* ocfs_init_dlm_msg */
-
-
-#define OCFS_DLM_NET_TIMEOUT   (30000)   // 30 seconds
-
-/*
- * ocfs_send_dlm_request_msg()
- *
- */
-static int ocfs_send_dlm_request_msg (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, __u64 vote_map, struct inode *inode)
-{
-	int status = 0;
-	ocfs_dlm_msg *dlm_msg = NULL;
-	__u32 msg_len;
-	ocfs_dlm_msg_hdr *req;
-
-	LOG_ENTRY_ARGS ("(osb=0x%08x, id=%u.%u, ty=%u, fl=%u, vm=0x%08x)\n",
-			osb, HILO (lock_id), lock_type, flags, LO(vote_map));
-
-	msg_len = sizeof (ocfs_dlm_msg) - 1 + sizeof (ocfs_dlm_req_master);
-
-	dlm_msg = ocfs_malloc (msg_len);
-	if (dlm_msg == NULL) {
-		LOG_ERROR_STATUS (status = -ENOMEM);
-		goto finally;
-	}
-
-	ocfs_acquire_lockres (lockres);
-	lockres->vote_state = LOCK_STATE_IN_VOTING;
-	lockres->req_vote_map = vote_map;
-	lockres->got_vote_map = 0;
-	lockres->tmp_openmap = 0;
-	spin_lock (&OcfsGlobalCtxt.comm_seq_lock);
-	OcfsGlobalCtxt.comm_seq_num++;
-	lockres->last_upd_seq_num = OcfsGlobalCtxt.comm_seq_num;
-	spin_unlock (&OcfsGlobalCtxt.comm_seq_lock);
-	ocfs_release_lockres (lockres);
-
-	ocfs_init_dlm_msg (osb, dlm_msg, msg_len);
-
-	dlm_msg->msg_type = OCFS_VOTE_REQUEST;
-
-	req = (ocfs_dlm_msg_hdr *) dlm_msg->msg_buf;
-	req->lock_id = lock_id;
-	req->flags = flags;
-	req->lock_seq_num = lockres->last_upd_seq_num;
-	if (inode)
-		req->fe_off = GET_INODE_FEOFF(inode);
-	else
-		req->fe_off = 0;
-#ifdef VERBOSE_LOCKING_TRACE
-	printk("ocfs_send_dlm_request_msg: inode=%p, lockid = %u.%u, "
-	       "fe_off=%u.%u\n", inode, HILO(lock_id), HILO(req->fe_off));
-
-#endif
-	ocfs_send_bcast (osb, vote_map, dlm_msg);
-	status = ocfs_wait (lockres->voted_event,
-			    atomic_read (&lockres->voted_event_woken), 
-			    OCFS_DLM_NET_TIMEOUT);
-	atomic_set (&lockres->voted_event_woken, 0);
-
-	ocfs_compute_dlm_stats (status, lockres->vote_status,
-			       	&(OcfsGlobalCtxt.net_reqst_stats));
-
-	ocfs_compute_dlm_stats (status, lockres->vote_status,
-			       	&(osb->net_reqst_stats));
-
-finally:
-	ocfs_safefree (dlm_msg);
-	LOG_EXIT_STATUS (status);
-	return status;
-}				/* ocfs_send_dlm_request_msg */
-
-
-/*
  * ocfs_acquire_lockres_ex()
  *
  * @lockres: lockres to acquire
@@ -1053,31 +915,16 @@
 	lockres->sector_num = lock_id;
 	lockres->in_use = 0;
 	lockres->lock_state = 0;
-	lockres->vote_state = 0;
-	lockres->in_cache_list = false;
-	lockres->cache_lock_held = false;
 
 	spin_lock_init (&lockres->lock_mutex);
-	init_waitqueue_head (&lockres->voted_event);
-	atomic_set (&lockres->voted_event_woken, 0);
 	atomic_set (&lockres->lr_ref_cnt, 0);
-	atomic_set (&lockres->lr_share_cnt, 0);
 
-	/* For read/write caching */
-	lockres->last_read_time = 0;
-	lockres->last_write_time = 0;
-	lockres->writer_node_num = OCFS_INVALID_NODE_NUM;
-	lockres->reader_node_num = OCFS_INVALID_NODE_NUM;
-
 	lockres->readonly_map = 0ULL;
 	lockres->readonly_node = OCFS_INVALID_NODE_NUM;
-	lockres->readonly_dropping = false;
 
 	lockres->lock_holders = 0;
 	LOG_TRACE_ARGS("lockres->lock_holders = %u\n", lockres->lock_holders);
 
-	INIT_LIST_HEAD(&lockres->cache_list);
-
 	LOG_EXIT ();
 	return;
 }				/* ocfs_init_lockres */
@@ -1234,7 +1081,7 @@
  */
 int
 ocfs_acquire_lock (ocfs_super * osb, __u64 lock_id, __u32 lock_type,
-		   __u32 flags, ocfs_lock_res ** lr, struct buffer_head **bh, struct inode *inode)
+		   __u32 flags, ocfs_lock_res **ret_lockres, struct buffer_head **bh, struct inode *inode)
 {
 	int status = -EFAIL;
 	ocfs_file_entry *disklock = NULL;
@@ -1249,7 +1096,7 @@
 	__u32 extra_lock_flags = 0;
 
 	LOG_ENTRY_ARGS ("(0x%08x, %u.%u, %u, %u, 0x%08x, 0x%08x)\n", osb,
-			HI (lock_id), LO (lock_id), lock_type, flags, lr, bh);
+			HI (lock_id), LO (lock_id), lock_type, flags, ret_lockres, bh);
 	
 	
 	OCFS_ASSERT(lock_type != OCFS_DLM_NO_LOCK);
@@ -1262,9 +1109,9 @@
 
 	flags |= FLAG_ACQUIRE_LOCK;
 
-	status = ocfs_find_update_res (osb, lock_id, lr, b, &updated, 0, inode);
-	if (lr)
-		lockres = *lr;
+	status = ocfs_find_update_res (osb, lock_id, ret_lockres, b, &updated, 0, inode);
+	if (ret_lockres)
+		lockres = *ret_lockres;
 	if (status < 0) {
 		LOG_ERROR_STATUS (status);
 		goto bail;
@@ -1303,6 +1150,9 @@
 
 	/* master node is an invalid node */
 	if (unlikely(lockres->master_node_num >= OCFS_MAXIMUM_NODES && !no_owner)) {
+		LOG_ERROR_ARGS("lockres: master_node=%d, owner=%s, lockid=%u.%u\n",
+			       lockres->master_node_num, no_owner?"no":"yes",
+			       HILO(lockres->sector_num));
 		LOG_ERROR_STATUS (status = -EINVAL);
 		ocfs_release_lockres (lockres);
 		goto finally;
@@ -1387,17 +1237,15 @@
 			
 
 do_lock:
-	flags |= extra_lock_flags;
-
 	LOG_TRACE_ARGS("lockres: master=%d, locktype=%d, flags: %08x\n",
-		       lockres->master_node_num, lockres->lock_type, flags);
+		       lockres->master_node_num, lockres->lock_type, flags|extra_lock_flags);
 	
 #ifdef VERBOSE_LOCKING_TRACE
 	printk("lockres: lockid=%u.%u, this=%d, master=%d, locktype=%d, flags=%08x, ronode=%d, romap=%08x\n",
-		       lockres->sector_num, osb->node_num, lockres->master_node_num, lockres->lock_type, flags,
-		       lockres->readonly_node, lockres->readonly_map);
+		       lockres->sector_num, osb->node_num, lockres->master_node_num, lockres->lock_type, 
+		       flags|extra_lock_flags, lockres->readonly_node, lockres->readonly_map);
 #endif	
-	if (wait_on_recovery && !(flags & FLAG_FILE_RECOVERY)) {
+	if (wait_on_recovery && !((flags|extra_lock_flags) & FLAG_FILE_RECOVERY)) {
 		int waitcnt = 0;
 		LOG_TRACE_ARGS("Waiting on node %u to be recovered\n",
 			       	lockres->master_node_num);
@@ -1409,7 +1257,8 @@
 		}
 	}
 
-	status = new_lock_function(osb, lock_type, flags, lockres, *b, &disk_vote, inode);
+	status = new_lock_function(osb, lock_type, flags|extra_lock_flags, 
+				   lockres, *b, &disk_vote, inode);
 	if (status < 0) {
 		ocfs_release_lockres (lockres);
 		if (status == -EAGAIN) {
@@ -1454,7 +1303,7 @@
 	__u32 votemap = 0;
 	__u32 tempmap = 0;
 	__u32 i;
-	int status = 0;
+	int status = 0, vote_status = 0;
 	int tmpstat;
 	__u64 lockseqno;
 	bool cachelock = false;
@@ -1530,10 +1379,10 @@
 	while (status == -EAGAIN) {
 		if (comm_voting && !disk_vote) {
 			LOG_TRACE_STR ("Network vote");
-			status = ocfs_send_dlm_request_msg (osb, lock_id, lock_type,
-								flags, lockres, votemap, inode);
+			status = ocfs_send_dlm_request_msg (osb, lock_id, lock_type, flags, 
+							    lockres, votemap, inode, &vote_status);
 			if (status >= 0) {
-				status = lockres->vote_status;
+				status = vote_status;
 				if (status >= 0) {
 					goto finally;
 				} else if (status == -EAGAIN) {
@@ -1545,9 +1394,16 @@
 			} else if (status == -ETIMEDOUT) {
 				LOG_TRACE_STR ("Network voting timed out");
 			}
-			lockres->vote_state = 0;
 		}
 
+		LOG_ERROR_ARGS("DISKVOTE!!: lock_type=%u, flags=%08x, offset=%u.%u, inode=%u\n",
+		       lock_type, flags, HILO(lock_id), inode?inode->i_ino:0);
+		LOG_ERROR_ARGS("DISKVOTE!!: this=%d, master=%d, locktype=%d, ronode=%d, romap=%08x\n",
+		       osb->node_num, lockres->master_node_num, lockres->lock_type, 
+		       lockres->readonly_node, lockres->readonly_map);
+
+		ocfs_show_trace(NULL);
+
 		LOG_TRACE_STR ("Disk vote");
 		disk_vote = true;
 		jif = jiffies;
@@ -1639,18 +1495,27 @@
 int ocfs_release_lock (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, struct inode *inode)
 {
 	int status = 0;
+	int tries = 0;
 
 	LOG_ENTRY_ARGS ("(0x%08x, %u.%u, %u, %u, 0x%08x)\n", osb, HI (lock_id),
 			LO (lock_id), lock_type, flags, lockres);
 
 	flags |= FLAG_RELEASE_LOCK;
 
+again:
 	ocfs_acquire_lockres (lockres);
 
 	if (bh) {
 		/* always get the write lock on the bh */
 		/* make sure to do this AFTER the lockres acquire */
-		OCFS_BH_GET_DATA_WRITE(bh);
+		if (OCFS_BH_GET_DATA_WRITE_TRYLOCK(bh) == NULL) {
+			tries++;
+			ocfs_release_lockres(lockres);
+			LOG_ERROR_ARGS("failed to get bh sem (%lu), attempt %d, trying again\n",
+				       bh->b_blocknr, tries);
+			ocfs_sleep(100);
+			goto again;
+		}
 		OCFS_BH_PUT_DATA(bh);
 	}
 
@@ -1695,28 +1560,13 @@
 }				/* ocfs_release_lock */
 
 /*
- * ocfs_init_dlm()
- *
- */
-int ocfs_init_dlm (void)
-{
-	LOG_ENTRY ();
-
-	OcfsIpcCtxt.init = false;
-	OcfsIpcCtxt.re_init = false;
-
-	LOG_EXIT_STATUS (0);
-	return 0;
-}				/* ocfs_init_dlm */
-
-/*
  * ocfs_break_cache_lock()
  *
  */
 /* TODO: merge down into new lock function */
 static int ocfs_break_cache_lock (ocfs_super * osb, ocfs_lock_res * lockres, struct inode *inode)
 {
-	int status;
+	int status, vote_status = 0;
 	int tmpstat;
 	__u32 votemap;
 	__u64 lockseqno = 0;
@@ -1727,6 +1577,7 @@
 
 	LOG_ENTRY ();
 
+	ocfs_get_lockres(lockres);
 	ocfs_acquire_lockres (lockres);
 
 	/* Ask the node with cache to flush and revert to write thru on this file */
@@ -1751,9 +1602,9 @@
 			status = ocfs_send_dlm_request_msg (osb, lockres->sector_num,
 							lockres->lock_type, 
 							FLAG_ACQUIRE_LOCK|FLAG_FILE_RELEASE_CACHE,
-							lockres, votemap, inode);
+							lockres, votemap, inode, &vote_status);
 			if (status >= 0) {
-				status = lockres->vote_status;
+				status = vote_status;
 				if (status >= 0) {
 					lockres->lock_type = OCFS_DLM_NO_LOCK;
 				} else if (status == -EAGAIN) {
@@ -1765,9 +1616,16 @@
 			} else if (status == -ETIMEDOUT) {
 				LOG_TRACE_STR ("Network voting timed out");
 			}
-			lockres->vote_state = 0;
 		}
-			
+		
+		LOG_ERROR_ARGS("DISKVOTE!!: lock_type=%u, flags=%08x, offset=%u.%u, inode=%u\n",
+		       lockres->lock_type, FLAG_ACQUIRE_LOCK|FLAG_FILE_RELEASE_CACHE, HILO(lockres->sector_num), inode?inode->i_ino:0);
+		LOG_ERROR_ARGS("DISKVOTE!!: this=%d, master=%d, locktype=%d, ronode=%d, romap=%08x\n",
+		       osb->node_num, lockres->master_node_num, lockres->lock_type, 
+		       lockres->readonly_node, lockres->readonly_map);
+
+		ocfs_show_trace(NULL);
+	
 		LOG_TRACE_STR ("Disk vote");
 		disk_vote = true;
 		jif = jiffies;
@@ -1843,6 +1701,7 @@
 	}
 finito:
 	ocfs_release_lockres (lockres);
+	ocfs_put_lockres (lockres);
 	LOG_EXIT_STATUS (status);
 	return (status);
 }				/* ocfs_break_cache_lock */
@@ -1877,7 +1736,7 @@
 /* TODO: merge down into new lock function */
 int ocfs_send_readonly_drop_message(ocfs_super *osb, ocfs_lock_res *lockres, __u64 vote_map, struct inode *inode)
 {
-	int status = 0, tmpstat;
+	int status = 0, tmpstat, vote_status = 0;
 	__u64 lock_id = lockres->sector_num, lockseqnum = 0;
 	bool disk_vote = false;
 
@@ -1885,17 +1744,24 @@
 
 	if (comm_voting) {
 		status = ocfs_send_dlm_request_msg (osb, lock_id, OCFS_DLM_ENABLE_CACHE_LOCK,
-						    FLAG_DROP_READONLY, lockres, vote_map, inode);
+						    FLAG_DROP_READONLY, lockres, vote_map, inode, &vote_status);
 		if (status >= 0) {
-			status = lockres->vote_status;
+			status = vote_status;
 			goto bail;
 		} else if (status == -ETIMEDOUT)
 			LOG_TRACE_STR ("Network voting timed out");
 		else
 			LOG_ERROR_STATUS (status);
-		lockres->vote_state = 0;
 	}
 
+	LOG_ERROR_ARGS("DISKVOTE!!: lock_type=%u, flags=%08x, offset=%u.%u, inode=%u\n",
+	       OCFS_DLM_ENABLE_CACHE_LOCK, FLAG_DROP_READONLY, HILO(lock_id), 0);
+	LOG_ERROR_ARGS("DISKVOTE!!: this=%d, master=%d, locktype=%d, ronode=%d, romap=%08x\n",
+	       osb->node_num, lockres->master_node_num, lockres->lock_type, 
+	       lockres->readonly_node, lockres->readonly_map);
+
+	ocfs_show_trace(NULL);
+
 	disk_vote = true;
 	status = ocfs_request_vote (osb, lock_id, OCFS_DLM_ENABLE_CACHE_LOCK, FLAG_DROP_READONLY, 
 				    vote_map, &lockseqnum, NULL);
@@ -1936,7 +1802,7 @@
 	__u32 lock_type = requested_lock;
 	bool need_to_zap_buffers = false, need_lock_write = true;
 	bool is_readdir = (flags & FLAG_READDIR) ? true : false;
-	int status = 0;
+	int status = 0, vote_status = 0;
 
 	LOG_ENTRY ();
 
@@ -1957,33 +1823,23 @@
 		}
 	}
 
-	/* TODO: take this out when all is ok */
-	if (flags & FLAG_READDIR) {
-		/* only send a message with FLAG_READDIR in it if  
-		 * the recipient already has a cachelock but is not
-		 * currently set as the readonly_node */
-		OCFS_ASSERT(lockres->master_node_num != osb->node_num);
-		OCFS_ASSERT(lockres->master_node_num != OCFS_INVALID_NODE_NUM);
-		OCFS_ASSERT(lockres->readonly_node == OCFS_INVALID_NODE_NUM);
-		OCFS_ASSERT(lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK);
-	}
-
-
 	if (flags & (FLAG_CHANGE_MASTER | FLAG_REMASTER)) {
 		/* on a master change... */
 		need_to_zap_buffers = true; /* need to dump local buffers */
 		need_lock_write = true;     /* and rewrite the lock */
 	} else if (flags & FLAG_ADD_OIN_MAP) {
 		need_lock_write = false;
+		need_to_zap_buffers = false;
 	} else if (flags & FLAG_READDIR) {
 		need_lock_write = false;
 		need_to_zap_buffers = true;
 	} else {
 		fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(bh); /* read */
 		/* may not need to rewrite the lock later if we already have a cachelock */
-		need_lock_write = !(DISK_LOCK_CURRENT_MASTER (fe) == osb->node_num &&
-					DISK_LOCK_FILE_LOCK (fe) == OCFS_DLM_ENABLE_CACHE_LOCK);
+		need_lock_write = (DISK_LOCK_CURRENT_MASTER (fe) != osb->node_num ||
+					DISK_LOCK_FILE_LOCK (fe) != OCFS_DLM_ENABLE_CACHE_LOCK);
 		OCFS_BH_PUT_DATA(bh);
+		need_to_zap_buffers = false; 
 	}
 
 	/* that's why it's called fast path */	
@@ -2012,9 +1868,10 @@
 	/* net voting */
 	if (comm_voting && !*disk_vote) {
 		LOG_TRACE_STR ("Network vote");
-		status = ocfs_send_dlm_request_msg (osb, lock_id, lock_type, flags, lockres, vote_map, inode);
+		status = ocfs_send_dlm_request_msg (osb, lock_id, lock_type, flags, 
+						    lockres, vote_map, inode, &vote_status);
 		if (status >= 0) {
-			status = lockres->vote_status;
+			status = vote_status;
 			if (status >= 0)
 				goto vote_success;
 			else
@@ -2024,11 +1881,17 @@
 		}
 		else
 			LOG_ERROR_STATUS (status);
-		lockres->vote_state = 0;
 	}
 
 
+	LOG_ERROR_ARGS("DISKVOTE!!: req_lock=%u, flags=%08x, offset=%u.%u, inode=%u\n",
+		       requested_lock, flags, HILO(lock_id), inode?inode->i_ino:0);
+	LOG_ERROR_ARGS("DISKVOTE!!: this=%d, master=%d, locktype=%d, ronode=%d, romap=%08x\n",
+		       osb->node_num, lockres->master_node_num, lockres->lock_type, 
+		       lockres->readonly_node, lockres->readonly_map);
 
+	ocfs_show_trace(NULL);
+
 	/* disk voting */
 	LOG_TRACE_STR ("Disk vote");
 	*disk_vote = true;
@@ -2061,11 +1924,6 @@
 	}
 	
 	/* update the lockres */
-#ifdef VERBOSE_LOCKING_TRACE
-	printk("new_lock_function: set lockid=%u.%u, locktype=%d->%d, master=%d->%d\n",
-	       lockres->sector_num, lockres->lock_type, requested_lock,
-	       lockres->master_node_num, osb->node_num);
-#endif
 	lockres->master_node_num = osb->node_num;
 	lockres->lock_type = requested_lock;
 
@@ -2115,13 +1973,17 @@
 
 	LOG_ENTRY();
 	
+	// should not be in map, but remove anyway
+	lockres->readonly_map &= ~(1 << osb->node_num);
+
 	if (lockres->readonly_map != 0ULL) {
 		// if this node is the owner, need to alert all nodes 
 		// in map, set map to 0, ro_node=-1, continue as if normal cache lock
 
 		// if there is a readonly_map, we had better be the owner
 		OCFS_ASSERT(lockres->readonly_node == osb->node_num);
-		if (!lockres->readonly_dropping) {
+	
+		if (!(lockres->lock_state & FLAG_READONLY_DROPPING)) {	
 			ocfs_get_lockres(lockres);
 			status = ocfs_drop_readonly_cache_lock(osb, lockres, inode);
 			if (status < 0) {
@@ -2161,3 +2023,36 @@
 	LOG_EXIT_STATUS(status);
 	return status;
 }
+
+void ocfs_compute_dlm_stats(int status, int vote_status, ocfs_dlm_stats *stats)	
+{
+	atomic_inc (&stats->total);
+	if (status == -ETIMEDOUT)
+		atomic_inc (&stats->etimedout);
+	else {
+		switch (vote_status) {
+			case -EAGAIN:
+			case FLAG_VOTE_UPDATE_RETRY:
+				atomic_inc (&stats->eagain);
+				break;
+			case -ENOENT:
+			case FLAG_VOTE_FILE_DEL:
+				atomic_inc (&stats->enoent);
+				break;
+			case -EBUSY:
+			case -EFAIL:
+			case FLAG_VOTE_OIN_ALREADY_INUSE:
+				atomic_inc (&stats->efail);
+				break;
+			case 0:
+			case FLAG_VOTE_NODE:
+			case FLAG_VOTE_OIN_UPDATED:
+				atomic_inc (&stats->okay);
+				break;
+			default:
+				atomic_inc (&stats->def);
+				break;
+		}
+	}
+}
+

Modified: trunk/src/file.c
===================================================================
--- trunk/src/file.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/file.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -447,7 +447,7 @@
 	ocfs_file_entry *fileEntry = NULL;
 	__u64 dirOffset = 0;
 	bool bAcquiredLock = false;
-	ocfs_lock_res *pLockResource = NULL;
+	ocfs_lock_res *lockres = NULL;
 	__u64 lockId = 0;
 	__u32 lockFlags = 0, locktype = 0;
 	bool bCacheLock = false;
@@ -494,7 +494,7 @@
 	ocfs_handle_set_sync(handle, false);
 
 	locktype = bCacheLock ? OCFS_DLM_ENABLE_CACHE_LOCK : OCFS_DLM_EXCLUSIVE_LOCK;
-	status = ocfs_acquire_lock (osb, lockId, locktype, lockFlags, &pLockResource, 
+	status = ocfs_acquire_lock (osb, lockId, locktype, lockFlags, &lockres, 
 				    &bh, inode);
 	if (status < 0) {
 		if (status != -EINTR)
@@ -544,9 +544,8 @@
 		if (status < 0) {
 			ocfs_abort_trans(handle);
 		} else {
-			ocfs_journal_add_lock(handle, lockId, locktype, 
-					      lockFlags, pLockResource, bh, 
-					      inode);
+			ocfs_journal_add_lock(handle, lockId, locktype, lockFlags, 
+					      lockres, bh, inode);
 			bAcquiredLock = false;
 
 			ocfs_commit_trans(handle);
@@ -554,10 +553,10 @@
 	}
 	if (bAcquiredLock) {
 		tmpstat = ocfs_release_lock (osb, lockId, locktype,
-					     lockFlags, pLockResource, bh, inode);
+					     lockFlags, lockres, bh, inode);
 		if (tmpstat < 0)
 			LOG_ERROR_STATUS (tmpstat);
-		ocfs_put_lockres (pLockResource);
+		ocfs_put_lockres (lockres);
 	}
 
 	if (bh != NULL)
@@ -739,7 +738,9 @@
 
 	ocfs_put_lockres (lockres);
 
+#ifndef BH_SEM_LEAK_CHECKING	
 	if (ret < 0)
+#endif
 		ocfs_bh_sem_hash_cleanup_pid(ocfs_getpid());
 
 	LOG_EXIT_LONG (ret);
@@ -828,7 +829,7 @@
 	__u32 lockFlags = 0, locktype = 0;
 	bool bFileLockAcquired = false;
 	bool bAcquiredLock = false;
-	ocfs_lock_res *pLockResource = NULL;
+	ocfs_lock_res *lockres = NULL;
 	bool bCacheLock = false;
         __u64 new_alloc_size;
         __u32 csize = osb->vol_layout.cluster_size;
@@ -873,7 +874,7 @@
 	OCFS_BH_PUT_DATA(bh);
 
 	locktype = bCacheLock ? OCFS_DLM_ENABLE_CACHE_LOCK : OCFS_DLM_EXCLUSIVE_LOCK;
-	status = ocfs_acquire_lock (osb, lockId, locktype, lockFlags, &pLockResource, 
+	status = ocfs_acquire_lock (osb, lockId, locktype, lockFlags, &lockres, 
 				    &bh, inode);
 	if (status < 0) {
 		if (status != -EINTR)
@@ -991,7 +992,7 @@
 
 		locktype = bCacheLock ? OCFS_DLM_ENABLE_CACHE_LOCK : OCFS_DLM_EXCLUSIVE_LOCK;
 		tmpstat = ocfs_release_lock (osb, lockId, locktype,
-					     lockFlags, pLockResource, bh, inode);
+					     lockFlags, lockres, bh, inode);
 		if (tmpstat < 0)
 			LOG_ERROR_STATUS (tmpstat);
 	}
@@ -999,7 +1000,7 @@
 	if (bh != NULL)
 		brelse(bh);
 
-	ocfs_put_lockres (pLockResource);
+	ocfs_put_lockres (lockres);
 
 	if  (status < 0)
 		if (status != -ENOSPC && status != -EINTR)
@@ -1027,7 +1028,7 @@
 	__u32 lockFlags = 0, locktype = 0;
 	bool bFileLockAcquired = false;
 	bool bAcquiredLock = false;
-	ocfs_lock_res *pLockResource = NULL;
+	ocfs_lock_res *lockres = NULL;
 	__u64 actualDiskOffset = 0;
 	__u64 actualLength = 0;
 	bool bCacheLock = false;
@@ -1096,7 +1097,7 @@
 		bh = NULL;
 	
 		locktype = bCacheLock ? OCFS_DLM_ENABLE_CACHE_LOCK : OCFS_DLM_EXCLUSIVE_LOCK;	
-		status = ocfs_acquire_lock (osb, lockId, locktype, lockFlags, &pLockResource, 
+		status = ocfs_acquire_lock (osb, lockId, locktype, lockFlags, &lockres, 
 					    &bh, inode);
 		if (status < 0) {
 			if (status != -EINTR)
@@ -1237,7 +1238,7 @@
 					lockFlags |= FLAG_FILE_UPDATE_OIN;
 
 				ocfs_journal_add_lock(handle, lockId, locktype,
-						      lockFlags, pLockResource,
+						      lockFlags, lockres,
 						      bh, inode);
 				bAcquiredLock = false;
 
@@ -1252,10 +1253,10 @@
 
 		locktype = bCacheLock ? OCFS_DLM_ENABLE_CACHE_LOCK : OCFS_DLM_EXCLUSIVE_LOCK;	
 		tmpstat = ocfs_release_lock (osb, lockId, OCFS_DLM_EXCLUSIVE_LOCK,
-					     lockFlags, pLockResource, bh, inode);
+					     lockFlags, lockres, bh, inode);
 		if (tmpstat < 0)
 			LOG_ERROR_STATUS (tmpstat);
-		ocfs_put_lockres (pLockResource);
+		ocfs_put_lockres (lockres);
 	}
 
 	if (bh != NULL)
@@ -1297,6 +1298,14 @@
 		goto bail;
 	}
 	if (dentry == inode->i_sb->s_root) {
+#ifdef ALLOW_LOCAL_ROOT_INODE_SETATTR
+		if (!(attr->ia_valid & (ATTR_SIZE| ATTR_MODE))) {
+			error = inode_change_ok (inode, attr);
+			if (!error)
+				inode_setattr (inode, attr);
+			goto bail;
+		}
+#endif
 		LOG_ERROR_STR("changes to root inode not allowed");
 		goto bail;
 	}
@@ -1395,7 +1404,10 @@
 	inode_setattr (inode, attr);
 
 bail:
+	
+#ifndef BH_SEM_LEAK_CHECKING
 	if (error < 0)
+#endif
 		ocfs_bh_sem_hash_cleanup_pid(ocfs_getpid());
 
 	LOG_EXIT_LONG (error);

Modified: trunk/src/hash.c
===================================================================
--- trunk/src/hash.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/hash.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -47,42 +47,42 @@
  * ocfs_insert_sector_node()
  *
  */
-int ocfs_insert_sector_node (ocfs_super * osb, ocfs_lock_res * lock_res, ocfs_lock_res ** found_lock_res)
+int ocfs_insert_sector_node (ocfs_super * osb, ocfs_lock_res * lockres, ocfs_lock_res ** found_lockres)
 {
 	int status = 0;
 	__u32 tmp;
 
-	LOG_ENTRY_ARGS ("(0x%08x, 0x%08x)\n", osb, lock_res);
+	LOG_ENTRY_ARGS ("(0x%08x, 0x%08x)\n", osb, lockres);
 
 	if (HASHTABLE_DESTROYED (&(osb->root_sect_node))) {
 		LOG_TRACE_STATUS (status = -EFAIL);
 		goto bail;
 	}
 
-	if (lock_res->signature != 0x55AA) {
+	if (lockres->signature != 0x55AA) {
 		LOG_ERROR_STATUS (status = -EFAIL);
 		goto bail;
 	}
 
-	if (!ocfs_hash_add (&(osb->root_sect_node), &(lock_res->sector_num),
-			    sizeof (__u64), lock_res, sizeof (ocfs_lock_res *),
-			    (void **)found_lock_res, &tmp)) {
+	if (!ocfs_hash_add (&(osb->root_sect_node), &(lockres->sector_num),
+			    sizeof (__u64), lockres, sizeof (ocfs_lock_res *),
+			    (void **)found_lockres, &tmp)) {
 		LOG_ERROR_STATUS(status = -EFAIL);
 		goto bail;
 	}
 
-	if (*found_lock_res) {
-		ocfs_get_lockres (*found_lock_res);
+	if (*found_lockres) {
+		ocfs_get_lockres (*found_lockres);
 		LOG_TRACE_ARGS ("isn: fres=0x%08x, ref=%d, lid=%u.%u\n",
-				*found_lock_res,
-				atomic_read (&((*found_lock_res)->lr_ref_cnt)),
-				HILO((*found_lock_res)->sector_num));
+				*found_lockres,
+				atomic_read (&((*found_lockres)->lr_ref_cnt)),
+				HILO((*found_lockres)->sector_num));
 	}
 	else {
-		ocfs_get_lockres (lock_res);
-		LOG_TRACE_ARGS ("isn: lres=0x%08x, ref=%d, lid=%u.%u\n", lock_res,
-				atomic_read (&lock_res->lr_ref_cnt),
-				HILO(lock_res->sector_num));
+		ocfs_get_lockres (lockres);
+		LOG_TRACE_ARGS ("isn: lres=0x%08x, ref=%d, lid=%u.%u\n", lockres,
+				atomic_read (&lockres->lr_ref_cnt),
+				HILO(lockres->sector_num));
 	}	
 
 bail:
@@ -94,13 +94,13 @@
  * ocfs_lookup_sector_node()
  *
  */
-int ocfs_lookup_sector_node (ocfs_super * osb, __u64 lock_id, ocfs_lock_res ** lock_res)
+int ocfs_lookup_sector_node (ocfs_super * osb, __u64 lock_id, ocfs_lock_res ** lockres)
 {
 	int status = 0;
 	__u32 len = 0;
 
 	LOG_ENTRY_ARGS ("(0x%08x, %u.%u, 0x%08x)\n", osb, HI (lock_id),
-			LO (lock_id), lock_res);
+			LO (lock_id), lockres);
 
 	if (HASHTABLE_DESTROYED (&(osb->root_sect_node))) {
 		status = -EFAIL;
@@ -109,21 +109,21 @@
 	}
 
 	if (ocfs_hash_get (&(osb->root_sect_node), &(lock_id), sizeof (__u64),
-			 (void **) lock_res, &len)) {
+			 (void **) lockres, &len)) {
 		if (len != sizeof (ocfs_lock_res *)) {
 			LOG_ERROR_STATUS (status = -EFAIL);
 			goto bail;
 		}
 
-		if ((*lock_res)->signature != 0x55AA) {
+		if ((*lockres)->signature != 0x55AA) {
 			LOG_ERROR_STATUS (status = -EFAIL);
 			goto bail;
 		}
 
-		ocfs_get_lockres (*lock_res);
+		ocfs_get_lockres (*lockres);
 		LOG_TRACE_ARGS ("lsn: lid=%u.%u, lres=0x%08x, ref=%d\n",
-				HILO(lock_id), *lock_res,
-				atomic_read (&((*lock_res)->lr_ref_cnt)));
+				HILO(lock_id), *lockres,
+				atomic_read (&((*lockres)->lr_ref_cnt)));
 	} else
 		status = -ENOENT;		
 
@@ -136,28 +136,28 @@
  * ocfs_remove_sector_node()
  *
  */
-void ocfs_remove_sector_node (ocfs_super * osb, ocfs_lock_res * lock_res)
+void ocfs_remove_sector_node (ocfs_super * osb, ocfs_lock_res * lockres)
 {
-	LOG_ENTRY_ARGS ("(0x%08x, 0x%08x)\n", osb, lock_res);
+	LOG_ENTRY_ARGS ("(0x%08x, 0x%08x)\n", osb, lockres);
 
 	if (HASHTABLE_DESTROYED (&(osb->root_sect_node))) {
 		LOG_TRACE_STATUS (-EFAIL);
 		goto bail;
 	}
 
-	if (lock_res->signature != 0x55AA) {
+	if (lockres->signature != 0x55AA) {
 		LOG_ERROR_STATUS (-EFAIL);
 		goto bail;
 	}
 
-	LOG_TRACE_ARGS ("rsn: lres=0x%08x, ref=%d, lid=%u.%u\n", lock_res,
-			atomic_read (&lock_res->lr_ref_cnt),
-			HILO(lock_res->sector_num));
+	LOG_TRACE_ARGS ("rsn: lres=0x%08x, ref=%d, lid=%u.%u\n", lockres,
+			atomic_read (&lockres->lr_ref_cnt),
+			HILO(lockres->sector_num));
 
-	ocfs_hash_del (&(osb->root_sect_node), &(lock_res->sector_num),
+	ocfs_hash_del (&(osb->root_sect_node), &(lockres->sector_num),
 		       sizeof (__u64));
 
-	ocfs_put_lockres (lock_res);
+	ocfs_put_lockres (lockres);
 
 bail:
 	LOG_EXIT ();

Modified: trunk/src/inc/io.h
===================================================================
--- trunk/src/inc/io.h	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/inc/io.h	2004-04-17 00:48:30 UTC (rev 848)
@@ -129,6 +129,9 @@
 	return kaddr;
 }
 
+
+/* BH_SEM_DEBUG needs these to be macros to get at __LINE__ and such */
+
 #ifdef BH_SEM_DEBUG
 #define OCFS_BH_GET_DATA_WRITE(bh) ({ \
 	unsigned char *kaddr; \
@@ -142,6 +145,18 @@
 	kaddr = (kmap((bh)->b_page)) + ((unsigned long)(bh)->b_data & ~PAGE_MASK); \
 	kaddr; \
 })
+
+#define OCFS_BH_GET_DATA_WRITE_TRYLOCK(bh)  \
+({  \
+	unsigned char *kaddr;  \
+	if (ocfs_bh_sem_lock_modify(bh) == OCFS_BH_SEM_WAIT_ON_MODIFY) {  \
+		ocfs_bh_sem_unlock(bh);  \
+		return NULL;  \
+	}  \
+	kaddr = kmap((bh)->b_page);  \
+	kaddr += ((unsigned long)(bh)->b_data & ~PAGE_MASK);  \
+	kaddr;  \
+})
 #else
 static inline void * OCFS_BH_GET_DATA_WRITE(struct buffer_head *bh)
 {
@@ -164,8 +179,29 @@
 	kaddr += ((unsigned long)(bh)->b_data & ~PAGE_MASK);
 	return kaddr;
 }
+
+static inline void * OCFS_BH_GET_DATA_WRITE_TRYLOCK(struct buffer_head *bh)
+{
+	unsigned char *kaddr;
+
+	if (ocfs_bh_sem_lock_modify(bh) == OCFS_BH_SEM_WAIT_ON_MODIFY) {
+#ifdef VERBOSE_BH_SEM
+		printk("ocfs2: called getdata for write but "
+		       "this process is not the lock "
+		       "holder!\n");
 #endif
+		ocfs_bh_sem_unlock(bh);
+		return NULL;
+	}
+	kaddr = kmap((bh)->b_page);
+	kaddr += ((unsigned long)(bh)->b_data & ~PAGE_MASK);
+	return kaddr;
+}
 
+#endif
+
+
+
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
 #define BH_GET_DEVICE(bh) ((bh->b_bdev)->bd_dev)
 #else

Modified: trunk/src/inc/journal.h
===================================================================
--- trunk/src/inc/journal.h	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/inc/journal.h	2004-04-17 00:48:30 UTC (rev 848)
@@ -330,7 +330,7 @@
 					struct buffer_head *bh);
 void                 ocfs_journal_add_lock(ocfs_journal_handle *handle, 
 					   __u64 id,  __u32 type, __u32 flags, 
-					   struct _ocfs_lock_res *res, 
+					   struct _ocfs_lock_res *lockres, 
 					   struct buffer_head *bh, 
 					   struct inode *inode);
 

Modified: trunk/src/inc/ocfs.h
===================================================================
--- trunk/src/inc/ocfs.h	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/inc/ocfs.h	2004-04-17 00:48:30 UTC (rev 848)
@@ -417,10 +417,14 @@
 }
 ocfs_rw_mode;
 
-#define  FLAG_ALWAYS_UPDATE_OPEN       0x1
-#define  LOCK_STATE_INIT               0x2
-#define  LOCK_STATE_IN_VOTING          0x4
 
+/* lockres->lock_state flags */
+#define  FLAG_ALWAYS_UPDATE_OPEN       0x00000001
+#define  LOCK_STATE_INIT               0x00000002
+#define  LOCK_STATE_IN_VOTING          0x00000004
+#define  FLAG_READONLY_DROPPING        0x00000008
+
+/* oin->oin_flags flags */
 #define  OCFS_OIN_IN_TEARDOWN                    (0x00000002)
 #define  OCFS_OIN_DIRECTORY                      (0x00000008)
 #define  OCFS_OIN_ROOT_DIRECTORY                 (0x00000010)
@@ -434,18 +438,19 @@
 #define  OCFS_OIN_OPEN_FOR_WRITE                 (0x00200000)
 #define  OCFS_OIN_IN_RECOVER_LIST                (0x00400000)
 
-
+/* osb->osb_flags flags */
 #define  OCFS_OSB_FLAGS_BEING_DISMOUNTED  (0x00000004)
 #define  OCFS_OSB_FLAGS_SHUTDOWN          (0x00000008)
 #define  OCFS_OSB_FLAGS_OSB_INITIALIZED   (0x00000020)
 
+/* OcfsGlobalCtxt.flags flags */
 #define  OCFS_FLAG_GLBL_CTXT_RESOURCE_INITIALIZED (0x00000001)
 #define  OCFS_FLAG_MEM_LISTS_INITIALIZED          (0x00000002)
 #define  OCFS_FLAG_SHUTDOWN_VOL_THREAD            (0x00000004)
 
-// DIRFLAG MASK
-#define  DIR_NODE_FLAG_ROOT           0x1
-#define  DIR_NODE_FLAG_ORPHAN         0x2
+/* ocfs_dir_node->dir_node_flags flags */
+#define  DIR_NODE_FLAG_ROOT           0x01
+#define  DIR_NODE_FLAG_ORPHAN         0x02
 
 /*
 ** Information on Publish sector of each node
@@ -1724,39 +1729,24 @@
 typedef struct _ocfs_io_runs ocfs_io_runs;
 typedef struct _ocfs_lock_res ocfs_lock_res;
 
+/* XXX: fields that can go if we move this to the inode private */
 struct _ocfs_lock_res
 {
-	__u32 signature;
-	__u8 lock_type;		/* Support only Exclusive & Shared */
-	atomic_t lr_share_cnt;	/* Used in case of Shared resources */
-	atomic_t lr_ref_cnt;	/* When 0, freed */
+	__u32 signature;	// XXX
 	__u32 master_node_num;	/* Master Node */
-	__u64 last_upd_seq_num;
-	__u64 last_lock_upd;
-	__u64 sector_num;
-	__u64 oin_openmap;
-	__u64 tmp_openmap;	/* oin_openmap collected over the comm */
-	__u8 in_use;
-	int thread_id;
-	struct list_head cache_list;
-	bool in_cache_list;
-	bool cache_lock_held;
 	__u32 lock_state;
-	__u32 vote_state;		/* Is the lockres being voted on over ipcdlm */
-	spinlock_t lock_mutex;
-	wait_queue_head_t voted_event;
-	atomic_t voted_event_woken;
-	__u64 req_vote_map;
-	__u64 got_vote_map;
-	__u32 vote_status;
-	__u64 last_write_time;
-	__u64 last_read_time;
-	__u32 writer_node_num;
-	__u32 reader_node_num;
 	__u32 lock_holders;
-	bool readonly_dropping;
+	__u8 in_use;
+	__u8 lock_type;
+	int thread_id;		// XXX
+	atomic_t lr_ref_cnt;	/* When 0, freed */  // XXX
+	ocfs_inode *oin;	// XXX
+	spinlock_t lock_mutex;  // XXX
 	__u32 readonly_node;
 	__u64 readonly_map;
+	__u64 oin_openmap;
+	__u64 last_upd_seq_num;
+	__u64 sector_num;	// XXX
 };
 
 struct _ocfs_inode
@@ -2028,6 +2018,8 @@
 	ocfs_dlm_stats dsk_reply_stats;	/* stats of diskdlm vote reponses */
 	char dev_str[20];		/* "major,minor" of the device */
 	struct semaphore vote_sem; /* protects calls to ocfs_process_vote */
+	struct list_head vote_obj_queue;
+	spinlock_t vote_obj_queue_lock;
 };
 
 typedef struct _ocfs_comm_info
@@ -2091,7 +2083,6 @@
 	__u32 dlm_msg_size;
 	__u16 version;
 	bool init;
-	bool re_init;
 	struct socket *send_sock;
 	struct socket *recv_sock;
 	struct completion complete;
@@ -2346,10 +2337,60 @@
 	__u32 dst_node;
 	__u32 msg_type;
 	__u32 check_sum;
-	__u8 msg_buf[1];
-}
-ocfs_dlm_msg;
+	__u8 msg_buf[0];
+} ocfs_dlm_msg;
 
+typedef struct _ocfs_vote_obj
+{
+	struct list_head list;
+	wait_queue_head_t voted_event;
+	atomic_t voted_event_woken;
+	atomic_t refcount;
+	spinlock_t lock;
+	__u32 vote_state;
+	__u32 req_lock_type;
+	__u32 old_lock_type;
+	__u32 vote_status;
+	__u64 req_vote_map;
+	__u64 got_vote_map;
+	__u64 tmp_openmap;
+	__u64 seq_num;
+	pid_t pid;
+	ocfs_dlm_msg m;
+} ocfs_vote_obj;
+
+enum {
+	VOTE_OBJ_STATE_UNSENT,
+	VOTE_OBJ_STATE_SENT,
+	VOTE_OBJ_STATE_PARTIAL_REPLY,
+	VOTE_OBJ_STATE_FULL_REPLY,
+	VOTE_OBJ_STATE_DESTROYING
+};
+
+	
+
+typedef struct _ocfs_vote_obj_lookup_data ocfs_vote_obj_lookup_data;
+
+struct _ocfs_vote_obj_lookup_data
+{
+	union {
+		struct {
+			__u64 seq_num;
+			__u64 lock_id;
+		} s;
+		struct {
+			char *page;
+			int *len;
+			int max;
+		} proc;
+	} u;
+	int (*func) (ocfs_vote_obj *obj, struct _ocfs_vote_obj_lookup_data *data);
+	ocfs_vote_obj **ret;
+};
+
+
+
+
 typedef struct _ocfs_recv_ctxt
 {
 	__s32 msg_len;
@@ -2570,6 +2611,7 @@
 	BUG();
 }
 
+
 struct ocfs_ioc
 {
 	char name[255];		/* "OCFS" */
@@ -2652,37 +2694,6 @@
 #endif				/* !USERSPACE_TOOL */
 
 
-#define ocfs_compute_dlm_stats(__status, __vote_status, __stats)	\
-do {									\
-	atomic_inc (&((__stats)->total));				\
-	if (__status == -ETIMEDOUT)					\
-		atomic_inc (&((__stats)->etimedout));			\
-	else {								\
-		switch (__vote_status) {				\
-		case -EAGAIN:						\
-		case FLAG_VOTE_UPDATE_RETRY:				\
-			atomic_inc (&((__stats)->eagain));		\
-			break;						\
-		case -ENOENT:						\
-		case FLAG_VOTE_FILE_DEL:				\
-			atomic_inc (&((__stats)->enoent));		\
-			break;						\
-		case -EBUSY:						\
-		case -EFAIL:						\
-		case FLAG_VOTE_OIN_ALREADY_INUSE:			\
-			atomic_inc (&((__stats)->efail));		\
-			break;						\
-		case 0:							\
-		case FLAG_VOTE_NODE:					\
-		case FLAG_VOTE_OIN_UPDATED:				\
-			atomic_inc (&((__stats)->okay));		\
-			break;						\
-		default:						\
-			atomic_inc (&((__stats)->def));			\
-			break;						\
-		}							\
-	}								\
-} while (0)
 
 #include "io.h"
 

Modified: trunk/src/inc/proto.h
===================================================================
--- trunk/src/inc/proto.h	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/inc/proto.h	2004-04-17 00:48:30 UTC (rev 848)
@@ -37,21 +37,26 @@
 int ocfs_free_directory_block (ocfs_super * osb, ocfs_file_entry * fe, ocfs_journal_handle *handle, struct inode *inode);
 int ocfs_free_file_extents (ocfs_super * osb, struct buffer_head *fe_bh, ocfs_journal_handle *handle, struct inode *inode);
 
+int ocfs_lookup_obj_for_proc (ocfs_vote_obj *obj, ocfs_vote_obj_lookup_data *data);
+int ocfs_lookup_obj_by_lockid (ocfs_vote_obj *obj, ocfs_vote_obj_lookup_data *data);
+int ocfs_lookup_obj_by_seq (ocfs_vote_obj *obj, ocfs_vote_obj_lookup_data *data);
+int ocfs_lookup_vote_request_obj (ocfs_super *osb, ocfs_vote_obj_lookup_data *data);
 
+
 int ocfs_wait_for_disk_lock_release (ocfs_super * osb, __u64 offset, __u32 time_to_wait, __u32 lock_type);
 int ocfs_disk_reset_voting (ocfs_super * osb, __u64 lock_id, __u32 lock_type);
-void ocfs_init_dlm_msg (ocfs_super * osb, ocfs_dlm_msg * dlm_msg, __u32 msg_len);
+void ocfs_init_dlm_msg (ocfs_super * osb, ocfs_dlm_msg * dlm_msg, __u32 msg_len, __u32 type);
 int ocfs_acquire_lockres_ex (ocfs_lock_res * lockres, __u32 timeout);
 void ocfs_release_lockres (ocfs_lock_res * lockres);
 void ocfs_init_lockres (ocfs_super * osb, ocfs_lock_res * lockres, __u64 lock_id);
 int ocfs_create_update_lock (ocfs_super * osb, __u64 lock_id, __u32 flags, bool new_file, struct inode *inode, ocfs_journal_handle *handle);
 int ocfs_acquire_lock (ocfs_super * osb, __u64 lock_id, __u32 lock_type,
-		   __u32 flags, ocfs_lock_res ** lr, struct buffer_head **bh, struct inode *inode);
+		   __u32 flags, ocfs_lock_res **ret_lockres, struct buffer_head **bh, struct inode *inode);
 int ocfs_release_lock (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, struct inode *inode);
-int ocfs_init_dlm (void);
 void ocfs_process_one_vote_reply(ocfs_super *osb, ocfs_vote_reply_ctxt *ctxt, __u32 node_num);
 int ocfs_break_cache_lock_zap_buffers(ocfs_super * osb, struct inode * inode);
 int ocfs_send_readonly_drop_message(ocfs_super *osb, ocfs_lock_res *lockres, __u64 vote_map, struct inode *inode);
+void ocfs_compute_dlm_stats(int status, int vote_status, ocfs_dlm_stats *stats);
 
 int ocfs_extend_system_file (ocfs_super * osb, __u32 FileId, __u64 FileSize, struct buffer_head *fe_bh, ocfs_journal_handle *handle, bool zero);
 
@@ -65,9 +70,9 @@
 bool ocfs_add_extent_map_entry (ocfs_super * osb, ocfs_extent_map * Map, __s64 Vbo, __s64 Lbo, __u64 ByteCount);
 
 
-int ocfs_insert_sector_node (ocfs_super * osb, ocfs_lock_res * lock_res, ocfs_lock_res ** found_lock_res);
-int ocfs_lookup_sector_node (ocfs_super * osb, __u64 lock_id, ocfs_lock_res ** lock_res);
-void ocfs_remove_sector_node (ocfs_super * osb, ocfs_lock_res * lock_res);
+int ocfs_insert_sector_node (ocfs_super * osb, ocfs_lock_res * lockres, ocfs_lock_res ** found_lockres);
+int ocfs_lookup_sector_node (ocfs_super * osb, __u64 lock_id, ocfs_lock_res ** lockres);
+void ocfs_remove_sector_node (ocfs_super * osb, ocfs_lock_res * lockres);
 
 
 int ocfs_file_open (struct inode *inode, struct file *file);
@@ -197,6 +202,7 @@
 void ocfs_update_publish_map (ocfs_super * osb, struct buffer_head *bhs[], bool first_time);
 
 
+int ocfs_send_dlm_request_msg (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, __u64 vote_map, struct inode *inode, int *vote_status);
 int ocfs_recv_thread (void *unused);
 int ocfs_volume_thread (void *arg);
 int ocfs_init_udp_sock (struct socket **send_sock, struct socket **recv_sock);
@@ -319,4 +325,7 @@
 void ocfs_show_stack(unsigned long * esp);
 void ocfs_show_trace(unsigned long * stack);
 
+void ocfs_put_vote_obj (ocfs_vote_obj *obj);
+void ocfs_get_vote_obj (ocfs_vote_obj *obj);
+
 #endif /* _PROTO_H_ */

Modified: trunk/src/journal.c
===================================================================
--- trunk/src/journal.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/journal.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -770,13 +770,13 @@
  * we use the spin_lock here. You really shouldn't be calling this on
  * other transactions anyway... */
 void ocfs_journal_add_lock(ocfs_journal_handle *handle, __u64 id,  __u32 type, 
-			   __u32 flags, struct _ocfs_lock_res *res, 
+			   __u32 flags, struct _ocfs_lock_res *lockres, 
 			   struct buffer_head *bh, struct inode *inode) 
 {
 	ocfs_journal_lock *lock;
 
 	LOG_ENTRY_ARGS("(id=%u.%u, type=%u, flags=%u, res=0x%08x, " 
-		       "bh=0x%08x)\n", HILO(id), type, flags, res, bh);
+		       "bh=0x%08x)\n", HILO(id), type, flags, lockres, bh);
 
 	lock = ocfs_malloc(sizeof(ocfs_journal_lock));
 	if (lock == NULL) {
@@ -789,7 +789,7 @@
 	lock->id    = id;
 	lock->type  = type;
 	lock->flags = flags;
-	lock->res   = res;
+	lock->res   = lockres;
 	lock->bh    = bh;
 	lock->inode = inode;
 
@@ -865,7 +865,7 @@
 	struct inode *inode = NULL; /* the journal inode */
 	journal_t * k_journal = NULL;
 	ocfs_file_entry *fe = NULL;
-	ocfs_lock_res *lock_res = NULL;
+	ocfs_lock_res *lockres = NULL;
 	__u32 cleanup_file_id = 0;
 	__u64 lock_id = 0;
 	ocfs_inode * oin = NULL;
@@ -905,7 +905,7 @@
 
 	/* TODO: Use another type of lock. */
 	status = ocfs_acquire_lock (osb, lock_id, OCFS_DLM_EXCLUSIVE_LOCK,
-				    FLAG_FILE_CREATE, &lock_res, &bh, inode);
+				    FLAG_FILE_CREATE, &lockres, &bh, inode);
 	if (status < 0) {
 		if (status != -EINTR)
 			LOG_ERROR_STR("Could not get lock on journal!");
@@ -973,14 +973,14 @@
 	osb->journal.k_inode = inode;
 	osb->journal.version = OCFS_JOURNAL_CURRENT_VERSION;
 	osb->journal.lockbh = bh;
-	osb->journal.lock_res = lock_res;
+	osb->journal.lock_res = lockres;
 	osb->journal.lock_id = lock_id;
 	atomic_set(&(osb->journal.num_trans), 0);
 	osb->journal.state = OCFS_JOURNAL_LOADED;
 	status = 0;
 done:
 	if (status < 0) {
-		ocfs_put_lockres (lock_res);
+		ocfs_put_lockres (lockres);
 		if (bh != NULL) {
 			if (fe)
 				OCFS_BH_PUT_DATA(bh);
@@ -1467,7 +1467,7 @@
 	int status = -1;
 	__u64 lock_id = 0;
 	__u32 cleanup_file_id = 0;
-	ocfs_lock_res *lock_res = NULL;
+	ocfs_lock_res *lockres = NULL;
 	ocfs_file_entry *fe = NULL;
 	ocfs_inode *oin = NULL;
 	struct inode *inode = NULL;
@@ -1527,7 +1527,7 @@
 	status = ocfs_acquire_lock (osb, lock_id, 
 				    OCFS_DLM_EXCLUSIVE_LOCK,
 				    FLAG_FILE_CREATE|FLAG_FILE_RECOVERY, 
-				    &lock_res, &bh, inode);
+				    &lockres, &bh, inode);
 	if (status < 0) {
 		LOG_TRACE_ARGS("status returned from acquire_lock=%d\n", 
 			       status);
@@ -1645,19 +1645,19 @@
 		up(&(osb->recovery_lock));
 
 	/* drop the lock on this nodes journal */
-	if (lock_res)
+	if (lockres)
 		status = ocfs_release_lock(osb, lock_id, 
 					   OCFS_DLM_EXCLUSIVE_LOCK, 
 					   FLAG_FILE_CREATE|FLAG_FILE_RECOVERY,
-					   lock_res, bh, NULL);
+					   lockres, bh, NULL);
 	if (bh)
 		brelse(bh);
 
 	if (config_bh)
 		brelse(config_bh);
 
-	if (lock_res)
-		ocfs_put_lockres (lock_res);
+	if (lockres)
+		ocfs_put_lockres (lockres);
 
 	atomic_dec(&osb->num_recovery_threads);
 

Modified: trunk/src/namei.c
===================================================================
--- trunk/src/namei.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/namei.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -169,7 +169,7 @@
 	__u64 file_off = 0;
 	__u64 dirnode_off;
 	__u32 oinflags;
-	ocfs_lock_res *lock_res = NULL;
+	ocfs_lock_res *lockres = NULL;
 	ocfs_journal_handle *handle = NULL;
 	ocfs_super *osb;
 	ocfs_file_entry *fe = NULL;
@@ -221,7 +221,7 @@
 	status = ocfs_acquire_lock (osb, parent_off, 
 				    OCFS_DLM_ENABLE_CACHE_LOCK,
 				    FLAG_FILE_CREATE | FLAG_DIR, 
-				    &lock_res, &lock_bh, dir);
+				    &lockres, &lock_bh, dir);
 	if (status < 0) {
 		if (status != -EINTR)
 			LOG_ERROR_STATUS (status);
@@ -231,8 +231,7 @@
 	/* Ok, we got the lock -- we'd better add it to our transaction */
 	ocfs_journal_add_lock(handle, parent_off, 
 			      OCFS_DLM_ENABLE_CACHE_LOCK, 
-			      FLAG_FILE_CREATE | FLAG_DIR, lock_res, lock_bh, 
-			      dir);
+			      FLAG_FILE_CREATE | FLAG_DIR, lockres, lock_bh, dir);
 
 	/* do the real work now. */
 	status = ocfs_mknod_locked(osb, dir, dentry, mode, dev, lock_bh, 
@@ -276,18 +275,18 @@
         /*  Insert the OFile on the OIN list */
 	OCFS_I(inode)->chng_seq_num = DISK_LOCK_SEQNUM (fe);
 	if (oin->lock_res != NULL) {
-		ocfs_lock_res *lockres = oin->lock_res;
-		ocfs_acquire_lockres(lockres);
-		lockres->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
-		lockres->lock_type = DISK_LOCK_FILE_LOCK (fe);
-		if (lockres->readonly_node != OCFS_INVALID_NODE_NUM &&
-	    	    lockres->readonly_node != lockres->master_node_num) {
+		ocfs_lock_res *tmp_lockres = oin->lock_res;
+		ocfs_acquire_lockres(tmp_lockres);
+		tmp_lockres->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
+		tmp_lockres->lock_type = DISK_LOCK_FILE_LOCK (fe);
+		if (tmp_lockres->readonly_node != OCFS_INVALID_NODE_NUM &&
+	    	    tmp_lockres->readonly_node != tmp_lockres->master_node_num) {
 			LOG_ERROR_ARGS("no longer readonly! ronode=%d, master=%d, lockid=%u.%u\n",
-					lockres->readonly_node, lockres->master_node_num,
-					lockres->sector_num);
-			lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+					tmp_lockres->readonly_node, tmp_lockres->master_node_num,
+					tmp_lockres->sector_num);
+			tmp_lockres->readonly_node = OCFS_INVALID_NODE_NUM;
 		}
-		ocfs_release_lockres(lockres);
+		ocfs_release_lockres(tmp_lockres);
 	}
 	OCFS_BH_PUT_DATA(new_fe_bh);
 	fe = NULL;
@@ -322,7 +321,9 @@
 	if ((status < 0) && inode)
 		iput(inode);
 
+#ifndef BH_SEM_LEAK_CHECKING
 	if (status < 0)
+#endif
 		ocfs_bh_sem_hash_cleanup_pid(ocfs_getpid());
 
 	LOG_EXIT_STATUS(status);
@@ -672,7 +673,9 @@
 	else
 		retval = status;
 
+#ifndef BH_SEM_LEAK_CHECKING
 	if (retval < 0)
+#endif
 		ocfs_bh_sem_hash_cleanup_pid(ocfs_getpid());
 
 	LOG_EXIT_LONG (retval);
@@ -897,10 +900,10 @@
 	ocfs_bitmap_free_head *free_head = NULL;
 	ocfs_journal_handle *handle = NULL;
 	__u32 dir_lock_flags = FLAG_FILE_CREATE | FLAG_DIR;
-	ocfs_lock_res * old_dir_lock = NULL;
-	ocfs_lock_res * new_dir_lock = NULL;
-	ocfs_lock_res * oldfe_lock = NULL;
-	ocfs_lock_res * newfe_lock = NULL;
+	ocfs_lock_res * old_dir_lockres = NULL;
+	ocfs_lock_res * new_dir_lockres = NULL;
+	ocfs_lock_res * oldfe_lockres = NULL;
+	ocfs_lock_res * newfe_lockres = NULL;
 	struct buffer_head *old_dir_bh = NULL;
 	struct buffer_head *new_dir_bh = NULL;
 	__u32 oldfe_flags = 0;
@@ -1000,10 +1003,10 @@
 	/* if old and new are the same, this'll just do one lock. */
 	status = ocfs_double_lock(osb, handle, 
 				  oldDirOff, OCFS_DLM_EXCLUSIVE_LOCK, 
-				  dir_lock_flags, &old_dir_lock, 
+				  dir_lock_flags, &old_dir_lockres, 
 				  &old_dir_bh, old_dir,
 				  newDirOff, OCFS_DLM_EXCLUSIVE_LOCK, 
-				  dir_lock_flags, &new_dir_lock, 
+				  dir_lock_flags, &new_dir_lockres, 
 				  &new_dir_bh, new_dir);
 	if (status < 0) {
 		LOG_ERROR_STATUS(status);
@@ -1033,7 +1036,7 @@
 		oldfe_flags |= FLAG_FILE_RENAME;
 
 	status = ocfs_acquire_lock(osb, oldfe_lockid, OCFS_DLM_EXCLUSIVE_LOCK,
-				   oldfe_flags, &oldfe_lock, NULL, old_inode);
+				   oldfe_flags, &oldfe_lockres, NULL, old_inode);
 	if (status < 0) {
 		LOG_ERROR_STATUS(status);
 		goto finally;
@@ -1076,7 +1079,7 @@
 
 		status = ocfs_acquire_lock(osb, newfe_lockid, 
 					   OCFS_DLM_EXCLUSIVE_LOCK, 
-					   newfe_flags, &newfe_lock, NULL, 
+					   newfe_flags, &newfe_lockres, NULL, 
 					   new_inode);
 		if (status < 0) {
 			LOG_ERROR_STATUS(status);
@@ -1245,18 +1248,18 @@
 	}
 
 bail:
-	if (oldfe_lock) {
+	if (oldfe_lockres) {
 		ocfs_release_lock(osb, oldfe_lockid, 
 				  OCFS_DLM_EXCLUSIVE_LOCK, 
-				  oldfe_flags, oldfe_lock, NULL, old_inode);
-		ocfs_put_lockres(oldfe_lock);
+				  oldfe_flags, oldfe_lockres, NULL, old_inode);
+		ocfs_put_lockres(oldfe_lockres);
 	}
 
-	if (newfe_lock) {
+	if (newfe_lockres) {
 		ocfs_release_lock(osb, newfe_lockid, 
 				  OCFS_DLM_EXCLUSIVE_LOCK, 
-				  newfe_flags, newfe_lock, NULL, new_inode);
-		ocfs_put_lockres(newfe_lock);
+				  newfe_flags, newfe_lockres, NULL, new_inode);
+		ocfs_put_lockres(newfe_lockres);
 	}
 
 	if (new_inode)
@@ -1283,7 +1286,9 @@
 	if (free_head)
 		ocfs_free_bitmap_free_head(free_head);
 
+#ifndef BH_SEM_LEAK_CHECKING
 	if (status < 0)
+#endif
 		ocfs_bh_sem_hash_cleanup_pid(ocfs_getpid());
 
 	LOG_EXIT_STATUS(status);
@@ -1308,7 +1313,7 @@
 	struct buffer_head *lock_bh = NULL;
 	ocfs_file_entry *fe = NULL;
 	ocfs_journal_handle *handle = NULL;
-	ocfs_lock_res *lock_res = NULL;
+	ocfs_lock_res *lockres = NULL;
 
 
 	LOG_ENTRY_ARGS ("(0x%08x, 0x%08x, symname='%s' actual='%*s')\n", dir, 
@@ -1356,7 +1361,7 @@
 	status = ocfs_acquire_lock (osb, parent_off, 
 				    OCFS_DLM_ENABLE_CACHE_LOCK,
 				    FLAG_FILE_CREATE | FLAG_DIR, 
-				    &lock_res, &lock_bh, dir);
+				    &lockres, &lock_bh, dir);
 	if (status < 0) {
 		if (status != -EINTR)
 			LOG_ERROR_STATUS (status);
@@ -1421,14 +1426,14 @@
 			LOG_ERROR_STATUS(status);
 	}
 
-	if (lock_res != NULL) {
+	if (lockres != NULL) {
 		int tmpstat;
 		tmpstat = ocfs_release_lock (osb, parent_off, OCFS_DLM_ENABLE_CACHE_LOCK,
-		     FLAG_FILE_CREATE | FLAG_DIR, lock_res, lock_bh, dir);
+		     FLAG_FILE_CREATE | FLAG_DIR, lockres, lock_bh, dir);
 		if (tmpstat < 0)
 			LOG_ERROR_STATUS (tmpstat);
 	}
-	ocfs_put_lockres (lock_res);
+	ocfs_put_lockres (lockres);
 
 bail:
 	if (new_fe_bh) {
@@ -1436,8 +1441,10 @@
 			OCFS_BH_PUT_DATA(new_fe_bh);
 		brelse(new_fe_bh);
 	}
-
+	
+#ifndef BH_SEM_LEAK_CHECKING
 	if (status < 0)
+#endif
 		ocfs_bh_sem_hash_cleanup_pid(ocfs_getpid());
 
 
@@ -1623,8 +1630,8 @@
 	__u64 lock_id = 0;
 	struct buffer_head *fe_bh = NULL, *lock_bh = NULL;
 	struct buffer_head *lock_node_bh = NULL; /* parent locknode */
-	ocfs_lock_res *parent_lock = NULL;
-	ocfs_lock_res *file_lock = NULL;
+	ocfs_lock_res *parent_lockres = NULL;
+	ocfs_lock_res *file_lockres = NULL;
 	ocfs_journal_handle *handle = NULL;
 	struct inode *inode = dentry->d_inode;
 
@@ -1677,7 +1684,7 @@
 	/* lock parent directory, yes we use FLAG_FILE_CREATE even
 	 * though we're deleting ;) */
 	status = ocfs_acquire_lock(osb, lock_node_off, OCFS_DLM_EXCLUSIVE_LOCK,
-				   FLAG_FILE_CREATE|FLAG_DIR, &parent_lock, 
+				   FLAG_FILE_CREATE|FLAG_DIR, &parent_lockres, 
 				   &lock_node_bh, parent_inode);
 	if (status < 0) {
 		LOG_ERROR_STATUS(status);
@@ -1707,7 +1714,7 @@
 	OCFS_BH_PUT_DATA(fe_bh);
 
 	status = ocfs_acquire_lock (osb, lock_id, OCFS_DLM_EXCLUSIVE_LOCK,
-			lockFlags, &file_lock, &lock_bh, inode);
+			lockFlags, &file_lockres, &lock_bh, inode);
 	if (status < 0) {
 		if (status != -EINTR)
 			LOG_ERROR_STATUS (status);
@@ -1838,26 +1845,26 @@
         /* NEW: adding a fake release lock for the dead file entry here */
         /* need this to alert dentry-owners on other nodes */
         /* Release the file lock if we acquired it */
-	if (file_lock) {
+	if (file_lockres) {
 		tmpstat = ocfs_release_lock(osb, lock_id, 
 					    OCFS_DLM_EXCLUSIVE_LOCK, 
-					    lockFlags, file_lock, 
+					    lockFlags, file_lockres, 
 					    lock_bh, inode);
 		if (tmpstat < 0)
 			LOG_ERROR_STATUS(tmpstat);
 
-		ocfs_put_lockres (file_lock);
+		ocfs_put_lockres (file_lockres);
 	}
 
-	if (parent_lock) {
+	if (parent_lockres) {
 		tmpstat = ocfs_release_lock(osb, lock_node_off, 
 					    OCFS_DLM_EXCLUSIVE_LOCK,
 					    FLAG_FILE_CREATE|FLAG_DIR, 
-					    parent_lock, lock_node_bh, parent_inode);
+					    parent_lockres, lock_node_bh, parent_inode);
 		if (tmpstat < 0)
 			LOG_ERROR_STATUS(tmpstat);
 
-		ocfs_put_lockres (parent_lock);
+		ocfs_put_lockres (parent_lockres);
 	}
 
 	if (lock_bh != NULL && lock_bh != fe_bh)

Modified: trunk/src/nm.c
===================================================================
--- trunk/src/nm.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/nm.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -34,7 +34,7 @@
 struct semaphore recovery_list_sem;
 
 static inline int get_process_vote_action(ocfs_super * osb, ocfs_lock_res *lockres, __u32 node_num, __u32 flags, int status, bool *master_alive, ocfs_inode **oin, struct inode *inode);
-static int ocfs_disk_update_resource (ocfs_super * osb, ocfs_lock_res * lock_res, struct buffer_head **bh, __u32 timeout, struct inode *inode);
+static int ocfs_disk_update_resource (ocfs_super * osb, ocfs_lock_res * lockres, struct buffer_head **bh, __u32 timeout, struct inode *inode);
 static int ocfs_search_commited(ocfs_super *osb, ocfs_lock_res *lockres);
 static int ocfs_schedule_process_vote(ocfs_super *osb, struct buffer_head *bh, int vote_node);
 
@@ -484,7 +484,7 @@
  * ocfs_disk_update_resource()
  *
  * @osb: ocfs super block for the volume
- * @lock_res: lockres to be updated
+ * @lockres: lockres to be updated
  * @file_ent: corresponding file entry
  *
  * Updates the in memory lock resource from the disklock info
@@ -492,46 +492,46 @@
  *
  * Returns 0 if success, < 0 if error.
  */
-static int ocfs_disk_update_resource (ocfs_super * osb, ocfs_lock_res * lock_res, struct buffer_head **bh, __u32 timeout, struct inode *inode)
+static int ocfs_disk_update_resource (ocfs_super * osb, ocfs_lock_res * lockres, struct buffer_head **bh, __u32 timeout, struct inode *inode)
 {
 	int status = 0;
 	ocfs_file_entry *fe;
 
-	LOG_ENTRY_ARGS ("(0x%08x, 0x%08x, 0x%08x)\n", osb, lock_res, bh);
+	LOG_ENTRY_ARGS ("(0x%08x, 0x%08x, 0x%08x)\n", osb, lockres, bh);
 
 	/* Don't sync-read if we already own the lock as it may not
 	 * have hit disk yet. */
-	status = ocfs_read_bh (osb, lock_res->sector_num, bh, 
-			       (lock_res->master_node_num == osb->node_num) ? 
+	status = ocfs_read_bh (osb, lockres->sector_num, bh, 
+			       (lockres->master_node_num == osb->node_num) ? 
 			       OCFS_BH_CACHED : 0, inode);
 	if (status < 0) {
 		LOG_ERROR_STATUS (status);
 		goto finally;
 	}
 
-	status = ocfs_acquire_lockres_ex (lock_res, timeout);
+	status = ocfs_acquire_lockres_ex (lockres, timeout);
 	if (status < 0) {
 		LOG_TRACE_ARGS ("Timedout locking lockres for id: %u.%u\n",
-				HILO (lock_res->sector_num));
+				HILO (lockres->sector_num));
 		goto finally;
 	}
 
 	fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(*bh); /* read */
-	lock_res->lock_type = DISK_LOCK_FILE_LOCK (fe);
-	lock_res->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
-	lock_res->oin_openmap = DISK_LOCK_OIN_MAP (fe);
+	lockres->lock_type = DISK_LOCK_FILE_LOCK (fe);
+	lockres->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
+	lockres->oin_openmap = DISK_LOCK_OIN_MAP (fe);
 
-	if (lock_res->readonly_node != OCFS_INVALID_NODE_NUM &&
-    	    lock_res->readonly_node != lock_res->master_node_num) {
+	if (lockres->readonly_node != OCFS_INVALID_NODE_NUM &&
+    	    lockres->readonly_node != lockres->master_node_num) {
 		LOG_ERROR_ARGS("no longer readonly! ronode=%d, master=%d, lockid=%u.%u\n",
-				lock_res->readonly_node, lock_res->master_node_num,
-				lock_res->sector_num);
-		lock_res->readonly_node = OCFS_INVALID_NODE_NUM;
+				lockres->readonly_node, lockres->master_node_num,
+				lockres->sector_num);
+		lockres->readonly_node = OCFS_INVALID_NODE_NUM;
 	}
 
 	OCFS_BH_PUT_DATA(*bh);
 
-	ocfs_release_lockres (lock_res);
+	ocfs_release_lockres (lockres);
 
 finally:
 	LOG_EXIT_STATUS (status);
@@ -665,6 +665,7 @@
 	int vote_type = INVALID_REQUEST;
 	bool my_node_wins = false;
 	__u64 lockid = lockres ? lockres->sector_num : 0ULL;
+	ocfs_vote_obj_lookup_data data;
 
 	LOG_ENTRY_ARGS("(status=%d, lockid=%u.%u, node_num=%d, flags=%08x)\n", status,
 		       HILO(lockid), node_num, flags);
@@ -683,8 +684,14 @@
 		*master_alive = lockres->master_node_num != OCFS_INVALID_NODE_NUM &&
 			IS_NODE_ALIVE(osb->publ_map, 
 			 lockres->master_node_num, OCFS_MAXIMUM_NODES);
-		my_node_wins = lockres->vote_state & LOCK_STATE_IN_VOTING &&
-			node_num < osb->node_num;
+
+		// if an outstanding vote request is found on this lockid
+		// and this node number is higher, this node wins
+		data.func = ocfs_lookup_obj_by_lockid;
+		data.u.s.lock_id = lockid;
+		data.ret = NULL;
+		if (ocfs_lookup_vote_request_obj (osb, &data) == 0)
+			my_node_wins = (node_num < osb->node_num);
 	}
 
 	if (flags & FLAG_DROP_READONLY) {
@@ -901,21 +908,6 @@
 	 * data or metadata under the lock.
 	 */
 
-#if 0
-/* TODO: REMOVEME! */
-if (flags & FLAG_READDIR) {
-	printk("ocfs_process_vote: READDIR %s request for lockid: %u.%u, action: %s, type: %s\n",
-       		flags & FLAG_RELEASE_LOCK ? "RELEASE" : 
-       		(flags & FLAG_ACQUIRE_LOCK ? "ACQUIRE" : "MODIFY"), lock_id,
-       		process_vote_strings[vote_type], disk_vote ? "disk vote" : "net vote" );
-} else if (vote_type == DROP_READONLY) {
-	printk("ocfs_process_vote: DROP_READONLY %s request for lockid: %u.%u, action: %s, type: %s\n",
-       		flags & FLAG_RELEASE_LOCK ? "RELEASE" : 
-       		(flags & FLAG_ACQUIRE_LOCK ? "ACQUIRE" : "MODIFY"), lock_id,
-       		process_vote_strings[vote_type], disk_vote ? "disk vote" : "net vote" );
-}
-#endif
-
 	if (inode && (vote_type != DELETE_RENAME)) {
 		/* Ok, for all operations where we no longer need
 		 * isem, drop it now. */
@@ -944,11 +936,6 @@
 			LOG_TRACE_STR("UPDATE_INODE");
 			if (inode)
 				ocfs_truncate_inode_pages(inode, 0);
-			/* TODO: figure this out... */
-			if (disk_vote) {
-				if (lockres)
-					ocfs_remove_sector_node (osb, lockres);
-			}
 			vote_response = FLAG_VOTE_OIN_UPDATED;
 			break;
 
@@ -1106,9 +1093,6 @@
 #endif
 			}
 			lockres->readonly_map |= (1 << node_num);
-#ifdef VERBOSE_LOCKING_TRACE
-			printk("READONLY: setting ronode, was=%d, now=%d, master=%d\n", lockres->readonly_node, osb->node_num, lockres->master_node_num);
-#endif
 			lockres->readonly_node = osb->node_num;
 			vote_response = FLAG_VOTE_NODE;
 			status = 0;
@@ -1147,10 +1131,12 @@
 			if (lockres->readonly_map != 0ULL) {
 				// assumption: node asking for vote has already dropped readonly_node
 				lockres->readonly_map &= ~(1 << node_num);
+				// should not be in there, but...
+				lockres->readonly_map &= ~(1 << osb->node_num);
 				if (lockres->readonly_map != 0ULL) {
 					OCFS_ASSERT(lockres->readonly_node == osb->node_num);
 					status = 0;
-					if (!lockres->readonly_dropping) {
+					if (!(lockres->lock_state & FLAG_READONLY_DROPPING)) {
 						ocfs_get_lockres(lockres);
 						if (ocfs_drop_readonly_cache_lock(osb, lockres, inode) < 0) {
 							LOG_ERROR_STATUS(status = -ENOMEM);
@@ -1182,7 +1168,6 @@
 
 			if (oin != NULL) {
 				lockres->lock_type = OCFS_DLM_NO_LOCK;
-				lockres->cache_lock_held = false;
 			}
 
 			status = ocfs_read_bh(osb, lock_id, &fe_bh, lockflags, inode);
@@ -1191,10 +1176,12 @@
 				break;
 			}
 	
-			fe = (ocfs_file_entry *) OCFS_BH_GET_DATA_WRITE(fe_bh); /* write */
+			fe = (ocfs_file_entry *) OCFS_BH_GET_DATA_READ(fe_bh); /* read */
 			is_dir = IS_VALID_DIR_NODE(fe);
 			is_locked = DISK_LOCK_FILE_LOCK (fe) > OCFS_DLM_NO_LOCK;
 			if (vote_type == CHANGE_MASTER) {
+				OCFS_BH_PUT_DATA(fe_bh);
+				fe = (ocfs_file_entry *) OCFS_BH_GET_DATA_WRITE(fe_bh); /* write */
 				if (oin)
 					DISK_LOCK_OIN_MAP (fe) |= (1 << osb->node_num);
 				DISK_LOCK_CURRENT_MASTER (fe) = node_num;
@@ -1240,7 +1227,7 @@
 				LOG_ERROR_STATUS (status);
 				break;
 			}
-			fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(fe_bh); /* write */
+			fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(fe_bh); /* read */
 
 			if ((fe->sync_flags & OCFS_SYNC_FLAG_NAME_DELETED) ||
 			    (!(fe->sync_flags & OCFS_SYNC_FLAG_VALID))) {
@@ -1248,6 +1235,9 @@
 				OCFS_BH_PUT_DATA(fe_bh);
 			} else {
 				__u64 tmpmap;
+			
+				OCFS_BH_PUT_DATA(fe_bh);
+				fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(fe_bh); /* write */
 				DISK_LOCK_OIN_MAP (fe) |= (1 << node_num);
 				tmpmap = DISK_LOCK_OIN_MAP (fe);
 				OCFS_BH_PUT_DATA(fe_bh);
@@ -1393,6 +1383,11 @@
 	} else {
 		status = ocfs_send_vote_reply(osb, dlm_msg, vote_response, open_handle);
 	}
+
+	LOG_TRACE_ARGS("vote: lockid=%u.%u, node=%d, seqnum=%u.%u, response=%d, open_handle=%s\n",
+		       HILO(lock_id), node_num, HILO(seq_num), vote_response, 
+		       open_handle?"yes":"no");
+		       
 	if (status < 0)
 		LOG_ERROR_STATUS (status);
 	else {
@@ -1543,13 +1538,13 @@
 		status = -EINVAL;
 		goto leave;
 	}
-	
-	if (lockres->readonly_dropping) {
+
+	if (lockres->lock_state & FLAG_READONLY_DROPPING) {	
 		status = 0;
 		goto leave;
 	}
 
-	lockres->readonly_dropping = true;
+	lockres->lock_state |= FLAG_READONLY_DROPPING;
 	map = lockres->readonly_map;
 	map &= osb->publ_map;      /* remove all dead nodes */
 	
@@ -1560,6 +1555,8 @@
 		// and eliminate them from the map
 
 		/* cannot hold lockres while waiting for vote */
+		/* BUGBUGBUG!!! MUST hold lockres while waiting BUGBUGBUG!!! */
+#warning MUST FIX THIS WRONG WRONG WRONG
 		ocfs_release_lockres(lockres);
 
 		status = ocfs_send_readonly_drop_message(osb, lockres, map, 
@@ -1581,7 +1578,7 @@
 	}
 	if (status >= 0)
 		lockres->readonly_map = 0ULL;
-	lockres->readonly_dropping = false;
+	lockres->lock_state &= ~FLAG_READONLY_DROPPING;
 
 leave:
 	ocfs_release_lockres(lockres);

Modified: trunk/src/oin.c
===================================================================
--- trunk/src/oin.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/oin.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -38,7 +38,7 @@
 	int status = 0;
 	struct buffer_head *fe_bh = NULL;
 	ocfs_file_entry *fe = NULL;
-	ocfs_lock_res *pLockRes;
+	ocfs_lock_res *lockres;
         struct list_head *iter;
         struct list_head *temp_iter;
         int disk_len;
@@ -215,25 +215,21 @@
 			OCFS_BH_PUT_DATA(fe_bh);
 			fe = NULL;
 
-			pLockRes = oin->lock_res;
-			ocfs_acquire_lockres(pLockRes);
-			pLockRes->lock_type = DISK_LOCK_FILE_LOCK (&dlock);
-			pLockRes->master_node_num = DISK_LOCK_CURRENT_MASTER (&dlock);
-			pLockRes->oin_openmap = DISK_LOCK_OIN_MAP (&dlock);
-			pLockRes->last_write_time = DISK_LOCK_LAST_WRITE (&dlock);
-			pLockRes->last_read_time = DISK_LOCK_LAST_READ (&dlock);
-			pLockRes->reader_node_num = DISK_LOCK_READER_NODE (&dlock);
-			pLockRes->writer_node_num = DISK_LOCK_WRITER_NODE (&dlock);
+			lockres = oin->lock_res;
+			ocfs_acquire_lockres(lockres);
+			lockres->lock_type = DISK_LOCK_FILE_LOCK (&dlock);
+			lockres->master_node_num = DISK_LOCK_CURRENT_MASTER (&dlock);
+			lockres->oin_openmap = DISK_LOCK_OIN_MAP (&dlock);
 
-			if (pLockRes->readonly_node != OCFS_INVALID_NODE_NUM &&
-    	    		    pLockRes->readonly_node != pLockRes->master_node_num) {
+			if (lockres->readonly_node != OCFS_INVALID_NODE_NUM &&
+    	    		    lockres->readonly_node != lockres->master_node_num) {
 				LOG_ERROR_ARGS("no longer readonly! ronode=%d, master=%d, lockid=%u.%u\n",
-					pLockRes->readonly_node, pLockRes->master_node_num,
-					pLockRes->sector_num);
-				pLockRes->readonly_node = OCFS_INVALID_NODE_NUM;
+					lockres->readonly_node, lockres->master_node_num,
+					lockres->sector_num);
+				lockres->readonly_node = OCFS_INVALID_NODE_NUM;
 			}
 
-			ocfs_release_lockres(pLockRes);
+			ocfs_release_lockres(lockres);
 		}
 
 		status = 0;
@@ -465,7 +461,7 @@
 	int tmpstat;
 	ocfs_vol_disk_hdr *volDiskHdr = NULL;
 	struct buffer_head *hdr_bh = NULL;
-	ocfs_lock_res *LockResource = NULL;
+	ocfs_lock_res *lockres = NULL;
 	struct buffer_head *bh = NULL;
 	__u64 root_off, int_off;
 
@@ -481,13 +477,13 @@
 
 		status = ocfs_acquire_lock (osb, OCFS_VOLUME_LOCK_OFFSET,
 				     OCFS_DLM_EXCLUSIVE_LOCK, FLAG_FILE_CREATE,
-				     &LockResource, &bh, NULL);
+				     &lockres, &bh, NULL);
 		if (status >= 0) {
 			status = ocfs_read_bh(osb, 0, &hdr_bh, 0, NULL);
 			if (status < 0)
 				LOG_ERROR_STATUS (status);
 
-			tmpstat = ocfs_release_lock (osb, OCFS_VOLUME_LOCK_OFFSET, OCFS_DLM_EXCLUSIVE_LOCK, 0, LockResource, bh, NULL);
+			tmpstat = ocfs_release_lock (osb, OCFS_VOLUME_LOCK_OFFSET, OCFS_DLM_EXCLUSIVE_LOCK, 0, lockres, bh, NULL);
 	
 			if (bh)
 				brelse(bh);
@@ -535,7 +531,7 @@
 		brelse(hdr_bh);
 	}
 
-	ocfs_put_lockres (LockResource);
+	ocfs_put_lockres (lockres);
 	LOG_EXIT_STATUS (status);
 	return status;
 }				/* ocfs_create_root_oin */

Modified: trunk/src/osb.c
===================================================================
--- trunk/src/osb.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/osb.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -103,7 +103,9 @@
 	atomic_set (&osb->flush_event_woken, 0);
 	atomic_set (&osb->clean_buffer_seq, 1);
 	spin_lock_init (&osb->clean_buffer_lock);
+	spin_lock_init (&osb->vote_obj_queue_lock);
 
+	INIT_LIST_HEAD (&(osb->vote_obj_queue));
 	INIT_LIST_HEAD (&(osb->cache_lock_list));
 	INIT_LIST_HEAD (&(osb->needs_flush_head));
 	for (i=0; i<32; i++) {
@@ -491,7 +493,7 @@
 	int status = 0, tempstat;
 	__u64 bitmapOffset, numClustersAlloc, fileOffset = 0;
 	ocfs_vol_disk_hdr *volDiskHdr = NULL;
-	ocfs_lock_res *LockResource = NULL;
+	ocfs_lock_res *lockres = NULL;
 	bool lock_acq = false;
 	char *buf = NULL, *sect;
 	struct buffer_head *lock_bh = NULL;
@@ -521,7 +523,7 @@
 	/* Acquire volume Lock ...  */
 	status = ocfs_acquire_lock (osb, OCFS_VOLUME_LOCK_OFFSET,
 				    OCFS_DLM_EXCLUSIVE_LOCK, FLAG_FILE_CREATE,
-				    &LockResource, &lock_bh, NULL);
+				    &lockres, &lock_bh, NULL);
 	if (status < 0) {
 		if (status != -EINTR)
 			LOG_ERROR_STATUS (status);
@@ -634,7 +636,7 @@
 	/*  Release Volume Lock */
 	if (lock_acq) {
 		tempstat = ocfs_release_lock (osb, OCFS_VOLUME_LOCK_OFFSET,
-				OCFS_DLM_EXCLUSIVE_LOCK, 0, LockResource, lock_bh, NULL);
+				OCFS_DLM_EXCLUSIVE_LOCK, 0, lockres, lock_bh, NULL);
 		if (tempstat < 0) {
 			LOG_ERROR_STATUS (tempstat);
 			osb->vol_state = VOLUME_DISABLED;
@@ -650,7 +652,7 @@
 	
 	ocfs_safefree(dirnode_bhs);
 	ocfs_safefree (buf);
-	ocfs_put_lockres (LockResource);
+	ocfs_put_lockres (lockres);
 	LOG_EXIT_STATUS (status);
 	return status;
 }				/* ocfs_create_root_dir_node */

Modified: trunk/src/proc.c
===================================================================
--- trunk/src/proc.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/proc.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -44,6 +44,7 @@
 static int ocfs_proc_hash_stats (char *page, char **start, off_t off, int count, int *eof, void *data);
 static int ocfs_proc_device (char *page, char **start, off_t off, int count, int *eof, void *data);
 static int ocfs_proc_nodes (char *page, char **start, off_t off, int count, int *eof, void *data);
+static int ocfs_proc_net_vote_obj (char *page, char **start, off_t off, int count, int *eof, void *data);
 #ifdef OCFS_LINUX_MEM_DEBUG
 static int ocfs_proc_memallocs (char *page, char **start, off_t off, int count, int *eof, void *data);
 #endif
@@ -180,12 +181,12 @@
 				slabname = "unknown";
 
 			if (item->u.slab == OcfsGlobalCtxt.lockres_cache) {
-				ocfs_lock_res *p = item->address;
+				ocfs_lock_res *lockres = item->address;
 				sprintf(tmpstr,
 				       	"%08x  %9s  %-40s  %5d  %u.%u\n",
 				       	item->address, slabname, item->tag,
-				       	atomic_read(&p->lr_ref_cnt),
-				       	HILO(p->sector_num));
+				       	atomic_read(&lockres->lr_ref_cnt),
+				       	HILO(lockres->sector_num));
 			} else
 				sprintf(tmpstr, "%08x  %9s  %-40s\n", item->address,
 				       	slabname, item->tag);
@@ -393,6 +394,26 @@
 	return ret;
 }				/* ocfs_proc_nodename */
 
+typedef struct _ocfs_proc_list
+{
+	char *name;
+	char *data;
+	int (*read_proc) (char *, char **, off_t, int, int *, void *);
+} ocfs_proc_list;
+
+static ocfs_proc_list ProcList[] =
+{
+	{ "nodenum", NULL, ocfs_proc_nodenum },
+	{ "mountpoint", NULL, ocfs_proc_mountpoint },
+	{ "statistics", NULL, ocfs_proc_statistics },
+	{ "hashstat", NULL, ocfs_proc_hash_stats },
+	{ "lockstat", NULL, ocfs_proc_dlm_stats },
+	{ "device", NULL, ocfs_proc_device },
+	{ "nodes", NULL, ocfs_proc_nodes },
+	{ "sent-votes", NULL, ocfs_proc_net_vote_obj },
+	{ NULL, }
+};
+
 /*
  * ocfs_proc_add_volume()
  *
@@ -401,23 +422,7 @@
 {
 	char *newdir = NULL;
 	char *tmp = NULL;
-	static struct
-	{
-		char *name;
-		char *data;
-		int (*read_proc) (char *, char **, off_t, int, int *, void *);
-	}
-	*p, ProcList[] =
-	{
-		{ "nodenum", NULL, ocfs_proc_nodenum },
-		{ "mountpoint", NULL, ocfs_proc_mountpoint },
-		{ "statistics", NULL, ocfs_proc_statistics },
-		{ "hashstat", NULL, ocfs_proc_hash_stats },
-		{ "lockstat", NULL, ocfs_proc_dlm_stats },
-		{ "device", NULL, ocfs_proc_device },
-		{ "nodes", NULL, ocfs_proc_nodes },
-		{ NULL, }
-	};
+	ocfs_proc_list *p;
 
 	LOG_ENTRY ();
 
@@ -435,6 +440,7 @@
 	ProcList[4].data = (char *) osb;
 	ProcList[5].data = (char *) osb;
 	ProcList[6].data = (char *) osb;
+	ProcList[7].data = (char *) osb;
 
 	sprintf (newdir, "ocfs2/%-d", osb->osb_id);
 	proc_mkdir (newdir, 0);
@@ -457,34 +463,27 @@
  */
 void ocfs_proc_remove_volume (ocfs_super * osb)
 {
-	char tmp[50];
+	char *tmp = NULL;
+	ocfs_proc_list *p;
 
 	LOG_ENTRY ();
 
-	sprintf (tmp, "ocfs2/%-d/nodenum", osb->osb_id);
-	remove_proc_entry (tmp, NULL);
+	tmp = ocfs_malloc (100);
+	if (!tmp) {
+		LOG_ERROR_STATUS (-ENOMEM);
+		goto bail;
+	}
+	
+	for (p = ProcList; p->name; p++) {
+		sprintf (tmp, "ocfs2/%-d/%s", osb->osb_id, p->name);
+		remove_proc_entry (tmp, NULL);
+	}
 
-	sprintf (tmp, "ocfs2/%-d/mountpoint", osb->osb_id);
-	remove_proc_entry (tmp, NULL);
-
-	sprintf (tmp, "ocfs2/%-d/statistics", osb->osb_id);
-	remove_proc_entry (tmp, NULL);
-
-	sprintf (tmp, "ocfs/%-d/hashstat", osb->osb_id);
-	remove_proc_entry (tmp, NULL);
-
-	sprintf (tmp, "ocfs/%-d/lockstat", osb->osb_id);
-	remove_proc_entry (tmp, NULL);
-
-	sprintf (tmp, "ocfs/%-d/device", osb->osb_id);
-	remove_proc_entry (tmp, NULL);
-
-	sprintf (tmp, "ocfs/%-d/nodes", osb->osb_id);
-	remove_proc_entry (tmp, NULL);
-
 	sprintf (tmp, "ocfs2/%-d", osb->osb_id);
 	remove_proc_entry (tmp, NULL);
 
+bail:
+	ocfs_safefree (tmp);
 	LOG_EXIT ();
 	return;
 }				/* ocfs_proc_remove_volume */
@@ -664,3 +663,30 @@
 	return ret;
 }				/* ocfs_proc_nodes */
 
+/*
+ * ocfs_proc_net_votes()
+ *
+ */
+static int ocfs_proc_net_vote_obj (char *page, char **start, off_t off,
+			    int count, int *eof, void *data)
+{
+	int len = 0, ret;
+	ocfs_super *osb;
+	ocfs_vote_obj_lookup_data d;  // 24 bytes
+
+	LOG_ENTRY ();
+
+	osb = (ocfs_super *) data;
+
+	d.func = ocfs_lookup_obj_for_proc;
+	d.ret = NULL;
+	d.u.proc.page = page;
+	d.u.proc.len = &len;
+	d.u.proc.max = 4096;
+	ret = ocfs_lookup_vote_request_obj (osb, &d);
+	ret = ocfs_proc_calc_metrics (page, start, off, count, eof, len);
+
+	LOG_EXIT_LONG (ret);
+	return ret;
+}				/* ocfs_proc_net_vote_obj */
+

Modified: trunk/src/super.c
===================================================================
--- trunk/src/super.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/super.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -422,13 +422,6 @@
 		goto leave;
 	}
 
-        /* Initialize the DLM */
-	status = ocfs_init_dlm ();
-	if (status < 0) {
-		LOG_ERROR_STATUS (status);
-		goto leave;
-	}
-
 	OcfsGlobalCtxt.hbm = DISK_HBEAT_COMM_ON;
 
 	spin_lock_init (&osb_id_lock);
@@ -664,7 +657,6 @@
 {
         ocfs_super *osb = NULL;
         __u32 numbits, freebits = 0;
-        // ocfs_lock_res *pLockResource;
         int status = 0;
         ocfs_bitmap_lock *bm_lock = NULL;
 	struct buffer_head *bh = NULL;
@@ -722,23 +714,7 @@
         return 0;
 }                               /* ocfs_statfs */
 
-#ifdef CDTOR_FOR_SLAB
-static void lockres_ctor(void *p, kmem_cache_t *slab, unsigned long flags)
-{
-	ocfs_lock_res *lockres;
-	lockres = p;
-	lockres->signature = 0x55AA;
-}
 
-static void lockres_dtor(void *p, kmem_cache_t *slab, unsigned long flags)
-{
-	ocfs_lock_res *lockres;
-	lockres = p;
-	if (lockres->signature == 0x55AA)
-		ocfs_free_lockres((ocfs_lock_res *)p);
-}
-#endif
-
 /*
  * ocfs_initialize_mem_lists()
  *
@@ -775,7 +751,7 @@
 	OcfsGlobalCtxt.inum_cache = kmem_cache_create("ocfs2_inum", 
 		sizeof(ocfs_inode_num), 0, SLAB_NO_REAP | SLAB_HWCACHE_ALIGN, 
 		NULL, NULL);
-
+	
 	OCFS_SET_FLAG (OcfsGlobalCtxt.flags, OCFS_FLAG_MEM_LISTS_INITIALIZED);
 
 	return 0;
@@ -967,7 +943,6 @@
 			goto leave;
 		}
 
-		OcfsIpcCtxt.init = true;
 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,0)
 		if (mount_cnt_inc == false) {
 			MOD_INC_USE_COUNT;

Modified: trunk/src/vote.c
===================================================================
--- trunk/src/vote.c	2004-04-16 22:38:55 UTC (rev 847)
+++ trunk/src/vote.c	2004-04-17 00:48:30 UTC (rev 848)
@@ -29,15 +29,39 @@
 /* Tracing */
 #define  OCFS_DEBUG_CONTEXT  OCFS_DEBUG_CONTEXT_VOTE
 
+
+
 ocfs_ipc_ctxt OcfsIpcCtxt;
 
+static const char vote_state_str[] = { 'U', 'S', 'P', 'F', 'D' };
 
+static ocfs_vote_obj * ocfs_alloc_vote_obj (int bytes, __u32 reqlock, __u32 oldlock, __u64 votemap);
 static void ocfs_dlm_recv_msg (void *val);
 static bool ocfs_check_ipc_msg (__u8 * msg, __u32 msg_len);
 static int ocfs_comm_process_vote_reply (ocfs_super * osb, ocfs_dlm_msg * dlm_msg);
 static int ocfs_comm_process_msg (__u8 * msg);
 
 
+static spinlock_t vote_obj_lock = SPIN_LOCK_UNLOCKED;
+
+
+void ocfs_put_vote_obj (ocfs_vote_obj *obj)
+{
+	spin_lock(&vote_obj_lock);
+	if (atomic_dec_and_test(&obj->refcount))
+		ocfs_safefree (obj);
+	spin_unlock(&vote_obj_lock);
+}
+
+void ocfs_get_vote_obj (ocfs_vote_obj *obj)
+{
+	spin_lock(&vote_obj_lock);
+	atomic_inc(&obj->refcount);
+	spin_unlock(&vote_obj_lock);
+}
+
+
+
 /*
  * ocfs_recv_udp_msg()
  *
@@ -246,39 +270,58 @@
 	ocfs_dlm_req_master *req_master;
 	ocfs_dlm_reply_master *reply_master;
 	ocfs_dlm_msg *send_dlm_msg;
+	ocfs_vote_obj *obj;
 	__u64 vote_map;
 	int status = 0;
 	__u8 *buf = NULL;
-	__u32 msg_len;
+	__u32 msg_len, obj_len;
 
 	LOG_ENTRY ();
 
-	msg_len = sizeof (ocfs_dlm_msg) - 1 + sizeof (ocfs_dlm_reply_master);
+	req_master = (ocfs_dlm_req_master *) dlm_msg->msg_buf;
 
-	buf = ocfs_malloc (msg_len);
-	if (buf == NULL) {
+	msg_len = sizeof (ocfs_dlm_msg) + sizeof (ocfs_dlm_reply_master);
+	obj_len = sizeof (ocfs_vote_obj) + sizeof (ocfs_dlm_reply_master);
+
+	obj = ocfs_alloc_vote_obj (obj_len, 0, 0, 0ULL);
+	if (obj == NULL) {
 		LOG_ERROR_STATUS (status = -ENOMEM);
 		goto finally;
 	}
-
-	req_master = (ocfs_dlm_req_master *) dlm_msg->msg_buf;
-
+	buf = (__u8 *)&(obj->m);
 	send_dlm_msg = (ocfs_dlm_msg *)buf;
-	ocfs_init_dlm_msg (osb, send_dlm_msg, msg_len);
-	send_dlm_msg->msg_type = OCFS_VOTE_REPLY;
+	reply_master = (ocfs_dlm_reply_master *) send_dlm_msg->msg_buf;
+	
+	ocfs_init_dlm_msg (osb, send_dlm_msg, msg_len, OCFS_VOTE_REPLY);
 
-	reply_master = (ocfs_dlm_reply_master *) send_dlm_msg->msg_buf;
 	reply_master->h.lock_id = req_master->lock_id;
 	reply_master->status = vote_status;
 	reply_master->h.lock_seq_num = req_master->lock_seq_num;
 	reply_master->h.open_handle = inode_open;
 	reply_master->h.flags = req_master->flags;
+	vote_map = (1 << dlm_msg->src_node);
+	obj->req_vote_map = vote_map;
 
-	vote_map = (1 << dlm_msg->src_node);
+	spin_lock(&osb->vote_obj_queue_lock);
+	list_add_tail(&obj->list, &osb->vote_obj_queue);
+	spin_unlock(&osb->vote_obj_queue_lock);
+
 	ocfs_send_bcast (osb, vote_map, send_dlm_msg);
+	spin_lock (&obj->lock);
+	obj->vote_state = VOTE_OBJ_STATE_SENT;
+	spin_unlock (&obj->lock);
 
+	// silly ;-)
+	spin_lock (&obj->lock);
+	obj->vote_state = VOTE_OBJ_STATE_DESTROYING;
+	spin_unlock (&obj->lock);
+
+	spin_lock(&osb->vote_obj_queue_lock);
+	list_del(&obj->list);
+	spin_unlock(&osb->vote_obj_queue_lock);
+
 finally:
-	ocfs_safefree (buf);
+	ocfs_put_vote_obj (obj);
 	LOG_EXIT_STATUS (status);
 	return status;
 }				/* ocfs_send_vote_reply */
@@ -330,17 +373,144 @@
 }				/* ocfs_check_ipc_msg */
 
 
+int ocfs_lookup_obj_for_proc (ocfs_vote_obj *obj, ocfs_vote_obj_lookup_data *data)
+{
+	int status = -ENOENT;
+	ocfs_dlm_msg *dlm_msg = NULL;
+	ocfs_dlm_msg_hdr *request = NULL;
+	ocfs_dlm_reply_master *reply = NULL;
+	int *len = data->u.proc.len;
+	int max = data->u.proc.max - *len;
+	char *p = data->u.proc.page + *len;
+	int ret = 0;
+
+	/* just run thru everything to populate /proc */
+	/* return -ENOENT to keep going */
+	dlm_msg = &(obj->m);
+
+	switch (dlm_msg->msg_type) {
+		case OCFS_VOTE_REQUEST:
+			request = (ocfs_dlm_msg_hdr *) dlm_msg->msg_buf;
+			ret = snprintf(p, max, "REQST: %d %c %3d %08x %10u.%-10u %10u.%-10u %08x | %08x %08x\n",
+				obj->pid, vote_state_str[obj->vote_state], obj->vote_status,
+				LO(obj->req_vote_map), HILO(request->lock_id),
+				HILO(request->lock_seq_num), request->flags, 
+				LO(obj->got_vote_map), LO(obj->tmp_openmap));
+			break;
+		case OCFS_VOTE_REPLY:
+			reply = (ocfs_dlm_reply_master *) dlm_msg->msg_buf;
+			ret = snprintf(p, max, "REPLY: %d %c %3d %08x %10u.%10u %10u.%10u %08x | %3d %c\n",
+				obj->pid, vote_state_str[obj->vote_state], obj->vote_status,
+				LO(obj->req_vote_map), HILO(reply->h.lock_id),
+				HILO(reply->h.lock_seq_num), reply->h.flags, 
+				reply->status, reply->h.open_handle ? 'Y' : 'N');
+
+			break;
+		case OCFS_INFO_DISMOUNT:
+			ret = snprintf(p, max, "UNMNT: %d\n", obj->pid);
+			break;
+		default:
+			ret = snprintf(p, max, "BAD!!: %d\n", obj->pid);
+			break;
+	}
+	(*len) += ret;
+	p[max-1] = '\0';
+	return status;
+}
+
+
+int ocfs_lookup_obj_by_lockid (ocfs_vote_obj *obj, ocfs_vote_obj_lookup_data *data)
+{
+	int status = 0;
+	ocfs_dlm_msg *dlm_msg = NULL;
+	ocfs_dlm_msg_hdr *req = NULL;
+
+	dlm_msg = &(obj->m);
+	req = (ocfs_dlm_msg_hdr *) dlm_msg->msg_buf;
+	if (dlm_msg->msg_type != OCFS_VOTE_REQUEST ||
+	    obj->vote_state == VOTE_OBJ_STATE_DESTROYING ||
+	    req->lock_id != data->u.s.lock_id) {
+		status = -ENOENT;
+	}
+	return status;
+}
+
+int ocfs_lookup_obj_by_seq (ocfs_vote_obj *obj, ocfs_vote_obj_lookup_data *data)
+{
+	int status = -ENOENT;
+	ocfs_dlm_msg *dlm_msg = NULL;
+	ocfs_dlm_msg_hdr *req = NULL;
+
+	if (obj->seq_num == data->u.s.seq_num) {
+		status = 0;
+		dlm_msg = &(obj->m);
+		req = (ocfs_dlm_msg_hdr *) dlm_msg->msg_buf;
+		// error if there is a non-request with a matching seqnum, or
+		// a vote object that is in too early or too late a state, or
+		// a vote object with the right seqnum but wrong lockid
+		if (dlm_msg->msg_type != OCFS_VOTE_REQUEST ||
+		    obj->vote_state == VOTE_OBJ_STATE_DESTROYING ||
+		    obj->vote_state == VOTE_OBJ_STATE_UNSENT ||
+		    req->lock_id != data->u.s.lock_id) {
+			LOG_ERROR_ARGS("bad message: vote_state=%d type=%d "
+				       "lockid=%u.%u expected=%u.%u\n",
+				      obj->vote_state, dlm_msg->msg_type,
+				      HILO(req->lock_id), HILO(data->u.s.lock_id));
+			status = -EINVAL;
+		}
+	}
+	return status;
+}
+
 /*
+ * returns an ocfs_vote_obj with a ref on it or NULL
+ */
+int ocfs_lookup_vote_request_obj (ocfs_super *osb, ocfs_vote_obj_lookup_data *data)
+{
+	int status = -ENOENT;
+	struct list_head *iter;
+	ocfs_vote_obj *obj = NULL;
+
+	spin_lock(&osb->vote_obj_queue_lock);
+
+	list_for_each (iter, &osb->vote_obj_queue) {
+		obj = list_entry (iter, ocfs_vote_obj, list);
+		ocfs_get_vote_obj (obj);
+		spin_lock(&obj->lock);
+		status = data->func(obj, data);
+		spin_unlock(&obj->lock);
+		if (status < 0) {
+			ocfs_put_vote_obj (obj);
+			obj = NULL;
+		}
+		if (status != -ENOENT)
+			break;
+		obj = NULL;
+	}
+
+	spin_unlock(&osb->vote_obj_queue_lock);
+
+	// return the obj, or drop the ref
+	if (data->ret)
+		*(data->ret) = obj;
+	else if (obj)
+		ocfs_put_vote_obj (obj);
+	return status;
+}
+
+
+/*
  * ocfs_comm_process_vote_reply()
  *
  */
 int ocfs_comm_process_vote_reply (ocfs_super * osb, ocfs_dlm_msg * dlm_msg)
 {
 	int status = 0;
-	ocfs_lock_res *lockres = NULL;
 	ocfs_dlm_reply_master *reply;
 	ocfs_dlm_msg_hdr *reply_msg;
 	ocfs_vote_reply_ctxt ctxt;
+	ocfs_vote_obj *obj = NULL;
+	ocfs_vote_obj_lookup_data data;
 
 	LOG_ENTRY ();
 
@@ -349,57 +519,58 @@
 	reply = (ocfs_dlm_reply_master *) dlm_msg->msg_buf;
 	reply_msg = &(reply->h);
 
-	status = ocfs_lookup_sector_node (osb, reply_msg->lock_id, &lockres);
-	if (status < 0) {
-		lockres = NULL;
+	/* find the original request object for this reply */
+	data.u.s.seq_num = reply_msg->lock_seq_num;
+	data.u.s.lock_id = reply_msg->lock_id;
+	data.func = ocfs_lookup_obj_by_seq;
+	data.ret = &obj;
+	status = ocfs_lookup_vote_request_obj (osb, &data);
+	if (status < 0 || obj==NULL) {
 		LOG_ERROR_STATUS (status);
 		goto bail;
 	}
 
-	if (!(lockres->vote_state & LOCK_STATE_IN_VOTING) ||
-	    (lockres->last_upd_seq_num != reply_msg->lock_seq_num)) {
-		LOG_TRACE_ARGS ("Ignoring netdlm message, invoting=%s, msgseq=%u.%u, lockseq=%u.%u\n",
-				lockres->vote_state & LOCK_STATE_IN_VOTING ? "true" : "false",
-				HILO(reply_msg->lock_seq_num), HILO(lockres->last_upd_seq_num));
-		goto bail;
+	spin_lock(&obj->lock);
+	if (obj->vote_state != VOTE_OBJ_STATE_SENT &&
+	    obj->vote_state != VOTE_OBJ_STATE_PARTIAL_REPLY) {
+		LOG_ERROR_ARGS("bad vote reply state=%d, node=%u, lockid=%u.%u, seq=%u.%u, vote=%d\n",
+			       obj->vote_state, dlm_msg->src_node, HI(reply_msg->lock_id),
+			       LO(reply_msg->lock_id), HI(reply_msg->lock_seq_num),
+			       LO(reply_msg->lock_seq_num), reply->status);
+		status = -EINVAL;
+		goto unlock;
 	}
 
 	LOG_TRACE_ARGS("node=%u, lockid=%u.%u, seq=%u.%u, vote=%d\n",
 		       dlm_msg->src_node, HI(reply_msg->lock_id),
 		       LO(reply_msg->lock_id), HI(reply_msg->lock_seq_num),
 		       LO(reply_msg->lock_seq_num), reply->status);
-
+	
 	ctxt.reply_method = COMM_VOTE;
-	ctxt.got_vote_map = &(lockres->got_vote_map);
-	ctxt.open_map = &(lockres->tmp_openmap);
-	ctxt.status = &(lockres->vote_status);
+	ctxt.got_vote_map = &(obj->got_vote_map);
+	ctxt.open_map = &(obj->tmp_openmap);
+	ctxt.status = &(obj->vote_status);
 	ctxt.flags = reply_msg->flags;
 	ctxt.u.reply = reply;
 
 	ocfs_process_one_vote_reply(osb, &ctxt, dlm_msg->src_node);
 
-	if (lockres->vote_status != 0) {
-		lockres->vote_state = 0;
-		atomic_set (&lockres->voted_event_woken, 1);
-		wake_up (&lockres->voted_event);
-		goto bail;
+	if (obj->got_vote_map == obj->req_vote_map)
+		obj->vote_state = VOTE_OBJ_STATE_FULL_REPLY;
+	else 
+		obj->vote_state = VOTE_OBJ_STATE_PARTIAL_REPLY;
+	
+unlock:
+	// wake if complete or error
+	if (obj->vote_status < 0 || status < 0 ||
+	    obj->vote_state == VOTE_OBJ_STATE_FULL_REPLY) {
+		atomic_set (&obj->voted_event_woken, 1);
+		wake_up (&obj->voted_event);
 	}
+	spin_unlock(&obj->lock);
+	ocfs_put_vote_obj (obj);
 
-	if (lockres->got_vote_map == lockres->req_vote_map) {
-		if ((reply_msg->flags & FLAG_FILE_EXTEND) ||
-		    (reply_msg->flags & FLAG_FILE_UPDATE))
-			lockres->oin_openmap = lockres->tmp_openmap;
-		lockres->tmp_openmap = 0;
-		LOG_TRACE_ARGS ("OK vote, lockid=%u.%u, map: 0x%08x\n",
-				HI(lockres->sector_num), LO(lockres->sector_num),
-				LO(lockres->got_vote_map));
-		lockres->vote_state = 0;
-		atomic_set (&lockres->voted_event_woken, 1);
-		wake_up (&lockres->voted_event);
-	}
-
 bail:
-	ocfs_put_lockres (lockres);
 	up (&(osb->comm_lock));
 	LOG_EXIT_STATUS (status);
 	return status;
@@ -476,10 +647,12 @@
 		ctxt.u.dlm_msg = dlm_msg;
 		ctxt.node_num = dlm_msg->src_node;
 		ctxt.status = 0;
+		// this calls acquire_lockres
 		ocfs_process_vote (osb, &ctxt);
 		break;
 
 	case OCFS_VOTE_REPLY:
+		// this DOES NOT call acquire_lockres
 		ocfs_comm_process_vote_reply (osb, dlm_msg);
 		break;
 
@@ -511,33 +684,232 @@
 {
 	int status = 0;
 	ocfs_dlm_msg *dlm_msg = NULL;
-	__u32 msg_len;
 	ocfs_dlm_msg_hdr *req;
+	ocfs_vote_obj *obj;
+	__u32 msg_len, obj_len;
 
 	LOG_ENTRY_ARGS ("(osb=0x%08x, vm=0x%08x)\n", osb, LO(vote_map));
 
-	msg_len = sizeof (ocfs_dlm_msg) - 1 + sizeof (ocfs_dlm_req_master);
+	msg_len = sizeof (ocfs_dlm_msg) + sizeof (ocfs_dlm_req_master);
+	obj_len = sizeof (ocfs_vote_obj) + sizeof (ocfs_dlm_req_master);
 
-	dlm_msg = ocfs_malloc (msg_len);
-	if (dlm_msg == NULL) {
+	obj = ocfs_alloc_vote_obj (obj_len, 0, 0, 0ULL);
+	if (obj == NULL) {
 		LOG_ERROR_STATUS (status = -ENOMEM);
 		goto finally;
 	}
-
-	ocfs_init_dlm_msg (osb, dlm_msg, msg_len);
-
-	dlm_msg->msg_type = OCFS_INFO_DISMOUNT;
-
+	dlm_msg = &(obj->m);
 	req = (ocfs_dlm_msg_hdr *) dlm_msg->msg_buf;
+	ocfs_init_dlm_msg (osb, dlm_msg, msg_len, OCFS_INFO_DISMOUNT);
 	req->lock_id = 0;
 	req->flags = 0;
 	req->lock_seq_num = 0;
 	req->open_handle = 0;
 
+	spin_lock(&osb->vote_obj_queue_lock);
+	list_add_tail(&obj->list, &osb->vote_obj_queue);
+	spin_unlock(&osb->vote_obj_queue_lock);
+
 	ocfs_send_bcast (osb, vote_map, dlm_msg);
+	spin_lock (&obj->lock);
+	obj->vote_state = VOTE_OBJ_STATE_SENT;
+	spin_unlock (&obj->lock);
 
+	// silly ;-)
+	spin_lock (&obj->lock);
+	obj->vote_state = VOTE_OBJ_STATE_DESTROYING;
+	spin_unlock (&obj->lock);
+
+	spin_lock(&osb->vote_obj_queue_lock);
+	list_del(&obj->list);
+	spin_unlock(&osb->vote_obj_queue_lock);
+
 finally:
-	ocfs_safefree (dlm_msg);
+	ocfs_put_vote_obj (obj);
 	LOG_EXIT_STATUS (status);
 	return status;
 }				/* ocfs_send_dismount_msg */
+	
+/*
+ * ocfs_init_dlm_msg()
+ *
+ */
+void ocfs_init_dlm_msg (ocfs_super * osb, ocfs_dlm_msg * dlm_msg, __u32 msg_len, __u32 type)
+{
+	LOG_ENTRY ();
+
+	dlm_msg->magic = OCFS_DLM_MSG_MAGIC;
+	dlm_msg->msg_len = msg_len;
+	dlm_msg->src_node = osb->node_num;
+	dlm_msg->msg_type = type;
+	memcpy (dlm_msg->vol_id, osb->vol_layout.vol_id, MAX_VOL_ID_LENGTH);
+
+	LOG_EXIT ();
+	return;
+}				/* ocfs_init_dlm_msg */
+
+
+static ocfs_vote_obj * ocfs_alloc_vote_obj (int bytes, __u32 reqlock, __u32 oldlock, __u64 votemap)
+{
+	ocfs_vote_obj *obj = NULL;
+	
+	obj = ocfs_malloc (bytes);
+	if (obj == NULL)
+		return NULL;
+
+	memset(obj, 0, bytes);
+	obj->vote_state = VOTE_OBJ_STATE_UNSENT;
+	spin_lock_init (&obj->lock);
+	atomic_set(&obj->refcount, 1);
+	atomic_set(&obj->voted_event_woken, 0);
+	init_waitqueue_head (&obj->voted_event);
+	INIT_LIST_HEAD (&obj->list);
+
+	obj->req_vote_map = votemap;
+	obj->got_vote_map = 0ULL;
+	obj->tmp_openmap = 0ULL;
+	obj->seq_num = 0ULL;
+	obj->req_lock_type = reqlock;
+	obj->old_lock_type = oldlock;
+	obj->vote_status = 0;
+	obj->pid = ocfs_getpid();
+	
+	return obj;
+}
+
+#define OCFS_DLM_NET_TIMEOUT   (30000)   // 30 seconds
+
+/*
+ * ocfs_send_dlm_request_msg()
+ *
+ * lockres should be held when calling this
+ */
+int ocfs_send_dlm_request_msg (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, __u64 vote_map, struct inode *inode, int *vote_status)
+{
+	int status = 0;
+	ocfs_dlm_msg *dlm_msg = NULL;
+	ocfs_dlm_msg_hdr *req;
+	ocfs_vote_obj *obj;
+	__u32 msg_len, obj_len;
+
+	LOG_ENTRY_ARGS ("(osb=0x%08x, id=%u.%u, ty=%u, fl=%u, vm=0x%08x)\n",
+			osb, HILO (lock_id), lock_type, flags, LO(vote_map));
+
+	msg_len = sizeof (ocfs_dlm_msg) + sizeof (ocfs_dlm_req_master);
+	obj_len = sizeof (ocfs_vote_obj) + sizeof (ocfs_dlm_req_master);
+
+	obj = ocfs_alloc_vote_obj (obj_len, lock_type, lockres->lock_type, vote_map);
+	if (obj == NULL) {
+		LOG_ERROR_STATUS (status = -ENOMEM);
+		goto finally;
+	}
+	dlm_msg = &(obj->m);
+	req = (ocfs_dlm_msg_hdr *) dlm_msg->msg_buf;
+	ocfs_init_dlm_msg (osb, dlm_msg, msg_len, OCFS_VOTE_REQUEST);
+
+	spin_lock (&OcfsGlobalCtxt.comm_seq_lock);
+	req->lock_seq_num = ++OcfsGlobalCtxt.comm_seq_num;
+	obj->seq_num = req->lock_seq_num;
+	spin_unlock (&OcfsGlobalCtxt.comm_seq_lock);
+
+	req->lock_id = lock_id;
+	req->flags = flags;
+	if (inode)
+		req->fe_off = GET_INODE_FEOFF(inode);
+	else
+		req->fe_off = 0;
+
+#ifdef VERBOSE_LOCKING_TRACE
+	printk("ocfs_send_dlm_request_msg: inode=%p, lockid = %u.%u, "
+	       "fe_off=%u.%u\n", inode, HILO(lock_id), HILO(req->fe_off));
+#endif
+
+	spin_lock(&osb->vote_obj_queue_lock);
+	list_add_tail(&obj->list, &osb->vote_obj_queue);
+	spin_unlock(&osb->vote_obj_queue_lock);
+
+	ocfs_send_bcast (osb, vote_map, dlm_msg);
+	spin_lock (&obj->lock);
+	obj->vote_state = VOTE_OBJ_STATE_SENT;
+	spin_unlock (&obj->lock);
+	status = ocfs_wait (obj->voted_event,
+			    atomic_read (&obj->voted_event_woken), 
+			    OCFS_DLM_NET_TIMEOUT);
+
+	spin_lock (&obj->lock);
+	if (obj->vote_status >= 0 && obj->vote_state == VOTE_OBJ_STATE_FULL_REPLY) {
+ 	    	if (flags & (FLAG_FILE_EXTEND | FLAG_FILE_UPDATE)) 
+			// extend and update need to update the openmap
+			lockres->oin_openmap = obj->tmp_openmap;
+		LOG_TRACE_ARGS ("OK vote, lockid=%u.%u, map: 0x%08x\n",
+			HILO(lockres->sector_num), LO(obj->got_vote_map));
+	} else {
+		LOG_ERROR_ARGS("vote_status=%d, vote_state=%d, lockid=%u.%u, map=0x%08x, got=0x%08x\n",
+			       obj->vote_status, obj->vote_state, HILO(lockres->sector_num),
+			       LO(obj->req_vote_map), LO(obj->got_vote_map));
+	}
+	*vote_status = obj->vote_status;
+	obj->vote_state = VOTE_OBJ_STATE_DESTROYING;
+	spin_unlock (&obj->lock);
+
+	spin_lock(&osb->vote_obj_queue_lock);
+	list_del(&obj->list);
+	spin_unlock(&osb->vote_obj_queue_lock);
+	
+
+	ocfs_compute_dlm_stats (status, *vote_status,
+			       	&(OcfsGlobalCtxt.net_reqst_stats));
+
+	ocfs_compute_dlm_stats (status, *vote_status,
+			       	&(osb->net_reqst_stats));
+finally:
+	ocfs_put_vote_obj (obj);
+	LOG_EXIT_STATUS (status);
+	return status;
+}				/* ocfs_send_dlm_request_msg */
+
+
+void ocfs_process_one_vote_reply(ocfs_super *osb, ocfs_vote_reply_ctxt *ctxt, __u32 node_num)
+{
+	int status;
+	int reply_status;
+	int open_handle = 0;
+	__u64 mask = 0;
+
+	if (ctxt->reply_method == DISK_VOTE) {
+		reply_status = ctxt->u.vote->vote[osb->node_num];
+		open_handle = ctxt->u.vote->open_handle;
+	} else {
+		reply_status = ctxt->u.reply->status;
+		open_handle = ctxt->u.reply->h.open_handle;
+	}
+
+	status = 0;
+	mask = 1 << node_num;
+
+	switch (reply_status) {
+		case FLAG_VOTE_NODE:
+			*(ctxt->got_vote_map) |= mask;
+			if (ctxt->flags & (FLAG_FILE_EXTEND|FLAG_FILE_UPDATE) && 
+			    open_handle)
+				*(ctxt->open_map) |= mask;
+			break;
+		case FLAG_VOTE_OIN_ALREADY_INUSE:
+			*(ctxt->got_vote_map) |= mask;
+			status = -EFAIL;
+			if (ctxt->flags & FLAG_FILE_DELETE)
+				status = -EBUSY;
+			break;
+		case FLAG_VOTE_OIN_UPDATED:
+			status = 0;
+			*(ctxt->got_vote_map) |= mask;
+			break;
+		case FLAG_VOTE_UPDATE_RETRY:
+			status = -EAGAIN;
+			break;
+		case FLAG_VOTE_FILE_DEL:
+			status = -ENOENT;
+			break;
+	}
+	*(ctxt->status) = status;
+}



More information about the Ocfs2-commits mailing list