[Ocfs2-commits] khackel commits r791 - in trunk/src: . inc

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Fri Mar 19 01:37:44 CST 2004


Author: khackel
Date: 2004-03-19 01:37:42 -0600 (Fri, 19 Mar 2004)
New Revision: 791

Modified:
   trunk/src/dir.c
   trunk/src/dlm.c
   trunk/src/file.c
   trunk/src/inc/ocfs.h
   trunk/src/inc/proto.h
   trunk/src/namei.c
   trunk/src/nm.c
   trunk/src/oin.c
Log:
big change to add support for readonly cache locks

Modified: trunk/src/dir.c
===================================================================
--- trunk/src/dir.c	2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/dir.c	2004-03-19 07:37:42 UTC (rev 791)
@@ -157,7 +157,7 @@
 }				/* ocfs_readdir */
 
 /* ocfs_find_files_on_disk()
- *
+ * NOTE: this should always be called with inode->i_sem taken!
  */
 int ocfs_find_files_on_disk (ocfs_super * osb, __u64 parent_off, struct qstr * file_name, struct buffer_head ** fe_bh, ocfs_file * ofile, struct inode *inode, bool take_lock)
 {
@@ -171,10 +171,8 @@
 	struct buffer_head *bh = NULL;
 	struct buffer_head **bhs = NULL;
 	int bufsz, nbhs, i;
-	__u32 lock_type = OCFS_DLM_SHARED_LOCK;
+	__u32 lock_type = OCFS_DLM_ENABLE_CACHE_LOCK;
 
-	/* TODO: change this to take a buffer head instead of fe */
-
 	LOG_ENTRY_ARGS ("(osb=%p, parent=%u.%u, fname=%p, fe_bh=%p, ofile=%p, inode=%p)\n", osb, parent_off, file_name, fe_bh, ofile, inode);
 
 	nbhs = osb->vol_layout.dir_node_size >> 9;
@@ -197,12 +195,11 @@
 	}
 	OCFS_ASSERT(bhs);
 
+	sync = false;
 	if (take_lock) {
-		/* Get a shared lock on the directory... */
-		// temp change... try this out
-		lock_type = OCFS_DLM_ENABLE_CACHE_LOCK;
-		status = ocfs_acquire_lock (osb, parent_off, lock_type, FLAG_DIR,
-				    	&lockres, &bh, inode);
+		/* Get a lock on the directory... */
+		status = ocfs_acquire_lock (osb, parent_off, lock_type, FLAG_DIR|FLAG_READDIR, 
+					    &lockres, &bh, inode);
 		if (status < 0) {
 			/* Volume should be disabled in this case */
 			if (status != -EINTR)
@@ -210,14 +207,9 @@
 			goto leave;
 		}
 		lock_acq = true;
-		if (lockres->master_node_num == osb->node_num &&
-		    lockres->lock_type > OCFS_DLM_SHARED_LOCK)
-			sync = false;
-		else 
+		if (lockres->master_node_num != osb->node_num ||
+		    lockres->lock_type < OCFS_DLM_EXCLUSIVE_LOCK)
 			sync = true;
-	} else {
-		/* calling function has already taken a cache or exclusive lock */
-		sync = false;
 	}
 
 	if (bhs[0]==NULL || bhs[0]->b_blocknr != (thisDirNode >> 9)) {
@@ -247,7 +239,7 @@
 	if (take_lock && lock_acq)
 	{
 		tmpstat = ocfs_release_lock (osb, parent_off, lock_type,
-					     FLAG_DIR, lockres, bh, inode);
+					     FLAG_DIR|FLAG_READDIR, lockres, bh, inode);
 		if (tmpstat < 0) {
 			LOG_ERROR_STATUS (tmpstat);
 			/* Volume should be disabled in this case */

Modified: trunk/src/dlm.c
===================================================================
--- trunk/src/dlm.c	2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/dlm.c	2004-03-19 07:37:42 UTC (rev 791)
@@ -33,17 +33,19 @@
 /* Tracing */
 #define OCFS_DEBUG_CONTEXT OCFS_DEBUG_CONTEXT_DLM
 
+int new_lock_function(ocfs_super * osb, __u32 requested_lock, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, bool *disk_vote, struct inode *inode);
+
+static inline int ocfs_wait_for_readonly_drop(ocfs_super *osb, ocfs_lock_res *lockres);
+
 static int ocfs_insert_cache_link (ocfs_super * osb, ocfs_lock_res * lockres);
-static int ocfs_update_lock_state (ocfs_super * osb, ocfs_lock_res * lockres, __u32 flags, bool *disk_vote, struct inode *inode);
 static int ocfs_send_dlm_request_msg (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, __u64 vote_map);
 static int ocfs_request_vote (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, __u64 vote_map, __u64 * lock_seq_num, struct inode *inode);
 static int ocfs_wait_for_vote (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, __u64 vote_map, __u32 time_to_wait, __u64 lock_seq_num, ocfs_lock_res * lockres);
-static int ocfs_reset_voting (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u64 vote_map);
+static int ocfs_reset_voting (ocfs_super * osb);
 static int ocfs_wait_for_lock_release (ocfs_super * osb, __u64 offset, __u32 time_to_wait, ocfs_lock_res * lockres, __u32 lock_type, struct inode *inode);
 static int ocfs_get_vote_on_disk (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, __u64 * got_vote_map, __u64 vote_map, __u64 lock_seq_num, __u64 * oin_open_map);
 static int ocfs_break_cache_lock (ocfs_super * osb, ocfs_lock_res * lockres, struct inode *inode);
 static int ocfs_disk_request_vote (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, __u64 vote_map, __u64 * lock_seq_num);
-int ocfs_make_lock_master (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, bool *disk_vote, struct inode *inode);
 int ocfs_update_disk_lock (ocfs_super * osb, ocfs_lock_res * lockres, __u32 flags, struct buffer_head **bh, struct inode *inode);
 static int ocfs_update_master_on_open (ocfs_super * osb, ocfs_lock_res * lockres, struct inode *inode);
 int ocfs_disk_release_lock (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, struct inode *inode);
@@ -67,88 +69,8 @@
 	return status;
 }				/* ocfs_insert_cache_link */
 
-/*
- * ocfs_update_lock_state()
- *
- */
-static int ocfs_update_lock_state (ocfs_super * osb, ocfs_lock_res * lockres, __u32 flags, bool *disk_vote, struct inode *inode)
-{
-	__u32 votemap;
-	int status = 0;
-	int tmpstat;
-	__u64 lockseqno = 0;
-	unsigned long jif = 0;
 
-	LOG_ENTRY_ARGS ("(0x%08x, 0x%08x, %u)\n", osb, lockres, flags);
 
-	ocfs_acquire_lockres (lockres);
-	votemap = (1 << lockres->master_node_num);
-
-	if (votemap == (1 << osb->node_num)) {
-		status = 0;
-		goto vote_success;
-	}
-
-	if (comm_voting && !*disk_vote) {
-		LOG_TRACE_STR ("Network vote");
-		jif = jiffies;
-		status = ocfs_send_dlm_request_msg (osb, lockres->sector_num,
-				lockres->lock_type, flags, lockres, votemap);
-		if (status >= 0) {
-			status = lockres->vote_status;
-			if (status >= 0)
-				goto vote_success;
-			else
-				goto finito;
-		} else if (status == -ETIMEDOUT) {
-			LOG_TRACE_STR ("Network voting timed out");
-		}
-		else
-			LOG_ERROR_STATUS (status);
-		lockres->vote_state = 0;
-	}
-
-	LOG_TRACE_STR ("Disk vote");
-	*disk_vote = true;
-	jif = jiffies;
-	status = ocfs_request_vote (osb, lockres->sector_num,
-			lockres->lock_type, flags, votemap, &lockseqno, inode);
-	if (status < 0) {
-		if (status != -EAGAIN)
-			LOG_ERROR_STATUS (status);
-		goto finito;
-	}
-
-	status = ocfs_wait_for_vote (osb, lockres->sector_num,
-				     lockres->lock_type, flags, votemap, 5000,
-				     lockseqno, lockres);
-	if (status < 0) {
-		if (status != -EAGAIN)
-			LOG_ERROR_STATUS (status);
-		goto finito;
-	}
-
-vote_success:
-	ocfs_break_cache_lock_zap_buffers(osb, inode);
-
-	jif = jiffies - jif;
-	LOG_TRACE_ARGS ("Lock time:%u\n", jif);
-
-	if (flags & FLAG_CHANGE_MASTER)
-		lockres->master_node_num = osb->node_num;
-finito:
-	if (*disk_vote) {
-		tmpstat = ocfs_reset_voting (osb, lockres->sector_num,
-					     lockres->lock_type, votemap);
-		if (tmpstat < 0)
-			LOG_ERROR_STATUS (tmpstat);
-	}
-	ocfs_release_lockres (lockres);
-	
-	LOG_EXIT_STATUS (status);
-	return status;
-}				/* ocfs_update_lock_state */
-
 /*
  * ocfs_disk_request_vote()
  *
@@ -360,6 +282,7 @@
 	struct buffer_head *bh = NULL;
 	__u32 curr_master;
 	__u8 lock_level;
+	bool is_dir = false, disk_vote = false;
 
 	LOG_ENTRY_ARGS ("(0x%08x, %u.%u, %u, 0x%08x, %u)\n", osb,
 			HI (offset), LO (offset), time_to_wait,
@@ -377,6 +300,7 @@
 		fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(bh); /* read */
 		curr_master = DISK_LOCK_CURRENT_MASTER (fe);
 		lock_level = DISK_LOCK_FILE_LOCK (fe);
+		is_dir = (fe->attribs & OCFS_ATTRIB_DIRECTORY);
 		OCFS_BH_PUT_DATA(bh);
 
 		if ((curr_master == OCFS_INVALID_NODE_NUM) ||
@@ -384,7 +308,8 @@
 			goto got_it;
 		}
 		
-		if ((!IS_NODE_ALIVE (osb->publ_map, curr_master, OCFS_MAXIMUM_NODES)) && (!TEST_NODE_IN_RECOVERY(osb, curr_master))) {
+		if ((!IS_NODE_ALIVE (osb->publ_map, curr_master, OCFS_MAXIMUM_NODES)) && 
+		    (!TEST_NODE_IN_RECOVERY(osb, curr_master))) {
 			/* Reset the lock as not owned and return success?? */
 			/* This needs to be under some sort of cluster wide lock, */
 			fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(bh); /* write */
@@ -400,38 +325,81 @@
 
 		/* The local node is not the master */
 		if (lock_level == OCFS_DLM_ENABLE_CACHE_LOCK) {
-			int tmpstat;
-
+			ocfs_acquire_lockres(lockres);
 			lockres->lock_type = lock_level;
 			lockres->master_node_num = curr_master;
-			status = ocfs_break_cache_lock (osb, lockres, inode);
-			if (status < 0) {
-				if (status != -EINTR)
-					LOG_ERROR_STATUS (status);
-				goto finally;
+				
+			if (is_dir) {
+				if (lockres->readonly_node != OCFS_INVALID_NODE_NUM) {
+					if (lockres->readonly_node == curr_master) {
+						// readonly cachelock already on this dir
+						printk("ocfs_wait_for_lock_release: ronode=master=%d\n", curr_master);
+						ocfs_release_lockres(lockres);
+						goto got_it;
+					} else {
+						LOG_ERROR_ARGS("(1) readonly node changed! was %d, now master is %d\n",
+						       	lockres->readonly_node, curr_master);
+						lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+					}
+				}
+
+				// no readonly node, need to alert owner to get readonly access
+				status = new_lock_function(osb, lockres->lock_type, 
+							   FLAG_DIR | FLAG_READDIR | FLAG_ACQUIRE_LOCK,
+							   lockres, bh, &disk_vote, inode);
+				if (status < 0) {
+					ocfs_release_lockres(lockres);
+					if (status == -EAGAIN) {
+						if (ocfs_task_interruptible (osb)) {
+							LOG_TRACE_ARGS("interrupted... lockid=%u.%u\n",
+								HILO(lockres->sector_num));
+							status = -EINTR;
+							goto finally;
+						}
+						goto again;
+					}
+					goto finally;
+				}
+			
+				printk("waitforlockrelease: setting ronode, was=%d, now=%d\n", lockres->readonly_node, lockres->master_node_num);
+				lockres->readonly_node = lockres->master_node_num;	
+				printk("ocfs_wait_for_lock_release: cache->readonly ronode=master=%d\n", curr_master);
+				ocfs_release_lockres(lockres);
+				goto got_it;
+			} else {
+				ocfs_release_lockres(lockres);
+#warning need to deal with this
+				status = ocfs_break_cache_lock (osb, lockres, inode);
+				if (status < 0) {
+					if (status != -EINTR)
+						LOG_ERROR_STATUS (status);
+					goto finally;
+				}
+				tmpstat = ocfs_read_bh (osb, offset, &bh, 0, inode);
+				if (tmpstat < 0) {
+					LOG_ERROR_STATUS (tmpstat);
+					status = tmpstat;
+					goto finally;
+				}
+				LOG_TRACE_ARGS("broke cache lock, setting to NO_LOCK\n");
+				fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(bh); /* write */
+				DISK_LOCK_FILE_LOCK (fe) = OCFS_DLM_NO_LOCK;
+				lock_level = OCFS_DLM_NO_LOCK;
+				OCFS_BH_PUT_DATA(bh);
+				tmpstat = ocfs_write_bh (osb, bh, 0, inode);
+				if (tmpstat < 0) {
+					LOG_ERROR_STATUS (tmpstat);
+					status = tmpstat;
+					goto finally;
+				}
 			}
-			tmpstat = ocfs_read_bh (osb, offset, &bh, 0, inode);
-			if (tmpstat < 0) {
-				LOG_ERROR_STATUS (tmpstat);
-				status = tmpstat;
-				goto finally;
-			}
-			LOG_TRACE_ARGS("broke cache lock, setting to NO_LOCK\n");
-			fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(bh); /* write */
-			DISK_LOCK_FILE_LOCK (fe) = OCFS_DLM_NO_LOCK;
-			lock_level = OCFS_DLM_NO_LOCK;
-			OCFS_BH_PUT_DATA(bh);
-			tmpstat = ocfs_write_bh (osb, bh, 0, inode);
-			if (tmpstat < 0) {
-				LOG_ERROR_STATUS (tmpstat);
-				status = tmpstat;
-				goto finally;
-			}
 		}
 
+
 		if (lock_level <= lock_type)
 			goto got_it;
-	
+
+again:	
 		brelse(bh);
 		ocfs_sleep (WAIT_FOR_VOTE_INCREMENT);
 		timewaited += WAIT_FOR_VOTE_INCREMENT;
@@ -449,6 +417,16 @@
 		lockres->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
 		lockres->oin_openmap = DISK_LOCK_OIN_MAP (fe);
 		lockres->last_lock_upd = DISK_LOCK_LAST_WRITE (fe);
+		if (lockres->readonly_node != OCFS_INVALID_NODE_NUM) {
+			if (lockres->readonly_node != lockres->master_node_num) {
+				LOG_ERROR_ARGS("(2) readonly node changed! was %d, now master is %d\n",
+				       lockres->readonly_node, lockres->master_node_num);
+				lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+			} else if (lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK) {
+				LOG_ERROR_ARGS("readonly lock is not a cachelock any more!\n");
+				lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+			}
+		}
 		ocfs_release_lockres (lockres);
 		OCFS_BH_PUT_DATA(bh);
 	}
@@ -581,63 +559,7 @@
 	return (status);
 }				/* ocfs_get_vote_on_disk */
 
-/*
- * ocfs_disk_reset_voting()
- *
- */
-int ocfs_disk_reset_voting (ocfs_super * osb, __u64 lock_id, __u32 lock_type)
-{
-	int status = 0;
-	ocfs_publish *pubsect = NULL;
-	__u64 offset = 0;
-	struct buffer_head *bh = NULL;
 
-	LOG_ENTRY_ARGS ("(0x%08x, %u.%u, %u)\n", osb, HI (lock_id),
-			LO (lock_id), lock_type);
-
-	LOG_TRACE_ARGS ("0x%08x, %u.%u, %u\n", osb, HI (lock_id),
-			LO (lock_id), lock_type);
-
-	/* take lock to prevent publish overwrites by vote_req and nm thread */
-	down (&(osb->publish_lock));
-
-	/* Read node's publish sector */
-	offset = osb->vol_layout.publ_sect_off + (osb->node_num * osb->sect_size);
-
-	status = ocfs_read_bh (osb, offset, &bh, 0, NULL);
-	if (status < 0) {
-		LOG_ERROR_STATUS (status);
-		goto finally;
-	}
-	pubsect = (ocfs_publish *)OCFS_BH_GET_DATA_WRITE(bh); /* write */
-
-	pubsect->dirty = false;
-	pubsect->vote = 0;
-	pubsect->vote_type = 0;
-	pubsect->vote_map = 0;
-	pubsect->dir_ent = 0;
-
-	/* Write it back */
-	OCFS_BH_PUT_DATA(bh);
-	status = ocfs_write_bh (osb, bh, 0, NULL);
-	if (status < 0) {
-		LOG_ERROR_STATUS (status);
-		goto finally;
-	}
-
-
-	osb->publish_dirty = false;
-
-	atomic_set (&osb->node_req_vote, 0);
-
-finally:
-	if (bh != NULL)
-		brelse(bh);
-	up (&(osb->publish_lock));
-	LOG_EXIT_STATUS (status);
-	return (status);
-}				/* ocfs_disk_reset_voting */
-
 /*
  * ocfs_wait_for_vote()
  *
@@ -709,16 +631,53 @@
  * ocfs_reset_voting()
  *
  */
-static int ocfs_reset_voting (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u64 vote_map)
+static int ocfs_reset_voting (ocfs_super * osb)
 {
-	int status;
+	int status = 0;
+	ocfs_publish *pubsect = NULL;
+	__u64 offset = 0;
+	struct buffer_head *bh = NULL;
 
 	LOG_ENTRY ();
 
-	status = ocfs_disk_reset_voting (osb, lock_id, lock_type);
+	/* take lock to prevent publish overwrites by vote_req and nm thread */
+	down (&(osb->publish_lock));
 
+	/* Read node's publish sector */
+	offset = osb->vol_layout.publ_sect_off + (osb->node_num * osb->sect_size);
+
+	status = ocfs_read_bh (osb, offset, &bh, 0, NULL);
+	if (status < 0) {
+		LOG_ERROR_STATUS (status);
+		goto finally;
+	}
+	pubsect = (ocfs_publish *)OCFS_BH_GET_DATA_WRITE(bh); /* write */
+
+	pubsect->dirty = false;
+	pubsect->vote = 0;
+	pubsect->vote_type = 0;
+	pubsect->vote_map = 0;
+	pubsect->dir_ent = 0;
+
+	/* Write it back */
+	OCFS_BH_PUT_DATA(bh);
+	status = ocfs_write_bh (osb, bh, 0, NULL);
+	if (status < 0) {
+		LOG_ERROR_STATUS (status);
+		goto finally;
+	}
+
+
+	osb->publish_dirty = false;
+
+	atomic_set (&osb->node_req_vote, 0);
+
+finally:
+	if (bh != NULL)
+		brelse(bh);
+	up (&(osb->publish_lock));
 	LOG_EXIT_STATUS (status);
-	return status;
+	return (status);
 }				/* ocfs_reset_voting */
 
 /*
@@ -822,119 +781,7 @@
 	return status;
 }				/* ocfs_send_dlm_request_msg */
 
-/*
- * ocfs_make_lock_master()
- *
- */
-int ocfs_make_lock_master (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, bool *disk_vote, struct inode *inode)
-{
-	__u64 vote_map = 0;
-	__u64 lockseqnum = 0;
-	int status = 0;
-	int tmpstat;
-	unsigned long jif;
-	ocfs_file_entry *fe = NULL;
 
-	LOG_ENTRY ();
-
-	ocfs_acquire_lockres (lockres);
-	fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(bh); /* read */
-
-	vote_map = osb->publ_map;
-	if (((flags & FLAG_FILE_DELETE) || (flags & FLAG_FILE_RENAME)) &&
-	    (!(flags & FLAG_DIR)) &&
-	    (DISK_LOCK_CURRENT_MASTER (fe) == osb->node_num)) {
-		vote_map = DISK_LOCK_OIN_MAP (fe);
-		vote_map &= osb->publ_map;	/* remove all dead nodes */
-	}
-	vote_map &= ~(1 << osb->node_num);
-	OCFS_BH_PUT_DATA(bh);
-
-	if (vote_map == 0) {
-		/* As this is the only node alive, make it master of the lock */
-		if (lockres->lock_type <= lock_type)
-			lockres->lock_type = (__u8) lock_type;
-		lockres->master_node_num = osb->node_num;
-
-		status = ocfs_update_disk_lock (osb, lockres,
-				DLOCK_FLAG_MASTER | DLOCK_FLAG_LOCK, &bh, inode);
-		if (status < 0) {
-			LOG_ERROR_STATUS (status);
-			goto bail;
-		}
-		goto bail;
-	}
-
-
-	if (comm_voting && !*disk_vote) {
-		LOG_TRACE_STR ("Network vote");
-		jif = jiffies;
-		status = ocfs_send_dlm_request_msg (osb, lock_id, lock_type,
-						    flags, lockres, vote_map);
-		if (status >= 0) {
-			status = lockres->vote_status;
-			if (status >= 0)
-				goto vote_success;
-			else
-				goto bail;
-		} else if (status == -ETIMEDOUT) {
-			LOG_TRACE_STR ("Network voting timed out");
-		}
-		else
-			LOG_ERROR_STATUS (status);
-		lockres->vote_state = 0;
-	}
-
-	LOG_TRACE_STR ("Disk vote");
-	*disk_vote = true;
-	jif = jiffies;
-	status = ocfs_request_vote (osb, lock_id, lock_type, flags, vote_map,
-				    &lockseqnum, inode);
-	if (status < 0) {
-		if (status != -EAGAIN)
-			LOG_ERROR_STATUS (status);
-		goto bail;
-	}
-
-	status = ocfs_wait_for_vote (osb, lock_id, lock_type, flags, vote_map,
-				     5000, lockseqnum, lockres);
-	if (status < 0) {
-		if (status != -EAGAIN)
-			LOG_ERROR_STATUS (status);
-		goto bail;
-	}
-
-vote_success:
-	jif = jiffies - jif;
-	LOG_TRACE_ARGS ("Lock time: %u\n", jif);
-
-	/* Make this node the master of this lock */
-	if (lockres->lock_type <= lock_type)
-		lockres->lock_type = (__u8) lock_type;
-
-	lockres->master_node_num = osb->node_num;
-
-	/* Write that we now are the master to the disk */
-	status = ocfs_update_disk_lock (osb, lockres,
-		 DLOCK_FLAG_MASTER | DLOCK_FLAG_LOCK | DLOCK_FLAG_OPEN_MAP, &bh, inode);
-	if (status < 0) {
-		LOG_ERROR_STATUS (status);
-		goto bail;
-	}
-
-bail:
-
-	if (*disk_vote) {
-		tmpstat = ocfs_reset_voting (osb, lock_id, lock_type, vote_map);
-		if (tmpstat < 0)
-			LOG_ERROR_STATUS (tmpstat);
-	}
-	ocfs_release_lockres (lockres);
-
-	LOG_EXIT_STATUS (status);
-	return status;
-}				/* ocfs_make_lock_master */
-
 /*
  * ocfs_acquire_lockres_ex()
  *
@@ -1136,8 +983,8 @@
 			}
 			ocfs_release_lockres (lockres);
 		} else {
-			status = ocfs_update_lock_state (osb, lockres,
-						 FLAG_ADD_OIN_MAP, &disk_vote, inode);
+			status = new_lock_function(osb, lockres->lock_type, FLAG_ADD_OIN_MAP, lockres, 
+						   NULL, &disk_vote, inode);
 			if (status < 0) {
 				if (status != -EAGAIN)
 					LOG_ERROR_STATUS (status);
@@ -1198,6 +1045,10 @@
 	lockres->writer_node_num = OCFS_INVALID_NODE_NUM;
 	lockres->reader_node_num = OCFS_INVALID_NODE_NUM;
 
+	lockres->readonly_map = 0ULL;
+	lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+	lockres->readonly_dropping = false;
+
 	lockres->lock_holders = 0;
 	LOG_TRACE_ARGS("lockres->lock_holders = %u\n", lockres->lock_holders);
 
@@ -1385,13 +1236,17 @@
 	int lockflags = (lock_id >= osb->vol_layout.bitmap_off ? OCFS_BH_CACHED : 0);
 	/* TODO: 40 bytes of "bool" sitting on the stack for now. move    */
 	/*       mutually exclusive flags into an enum and switch on them */
-	bool disk_vote = false, keep_exclusive = false, local_lock = false;
+	bool disk_vote = false;
 	bool no_owner = false, owner_dead = false, wait_on_recovery = false;
-	bool truncate_extend = false, have_cache_already = false;
 	int lock_path = invalid_path;
+	__u32 extra_lock_flags = 0;
 
 	LOG_ENTRY_ARGS ("(0x%08x, %u.%u, %u, %u, 0x%08x, 0x%08x)\n", osb,
 			HI (lock_id), LO (lock_id), lock_type, flags, lr, bh);
+	
+	
+	OCFS_ASSERT(lock_type != OCFS_DLM_NO_LOCK);
+	OCFS_ASSERT(lock_type != OCFS_DLM_SHARED_LOCK);
 
 	if (bh != NULL)
 		b = bh;
@@ -1407,44 +1262,12 @@
 		LOG_ERROR_STATUS (status);
 		goto bail;
 	}
-
-	/* NO_LOCK */
-	if (lock_type == OCFS_DLM_NO_LOCK)
-		goto bail;
-
-	/* SHARED */
-	if (lock_type == OCFS_DLM_SHARED_LOCK) {
-		if (!(flags & FLAG_DIR))
-			goto bail;
-		ocfs_acquire_lockres (lockres);
-		if (lockres->lock_type == OCFS_DLM_NO_LOCK)
-			lockres->lock_type = OCFS_DLM_SHARED_LOCK;
-		else if ((lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK) &&
-			 (lockres->master_node_num != osb->node_num))
-			status = ocfs_break_cache_lock (osb, lockres, inode);
-		if (status < 0) {
-			if (status != -EINTR)
-				LOG_ERROR_STATUS (status);
-			ocfs_release_lockres (lockres);
-			goto bail;
-		}
-		lockres->lock_holders++;
-		LOG_TRACE_ARGS("lockres->lock_holders = %u\n", 
-			       lockres->lock_holders);
-		atomic_inc (&(lockres->lr_share_cnt));
-		ocfs_release_lockres (lockres);
-		goto bail;
-	}
-
-	/* EXCLUSIVE or CACHE */
-	status = 0;
 	ocfs_get_lockres (lockres);
 
 again:
 	ocfs_acquire_lockres (lockres);
 
-	k++;
-	LOG_TRACE_ARGS("attempting to get lock, pass: %d\n", k);
+	LOG_TRACE_ARGS("attempting to get lock, pass: %d\n", ++k);
 
 	if (lockres->master_node_num == osb->node_num)
 		updated = true;
@@ -1458,18 +1281,6 @@
 		}
 		disklock = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(*b); /* read */
 
-#ifdef SUSPICIOUS_CODE
-		// This code is added to avoid the case when fileentry is not yet updated 
-		// but the lockresource is updated by NMthread and needsflush is set to FALSE. 
-		if (lockres->master_node_num != osb->node_num &&
-		    DISK_LOCK_CURRENT_MASTER (disklock) == osb->node_num) {
-			OCFS_BH_PUT_DATA(*b);
-			ocfs_release_lockres (lockres);
-			ocfs_sleep (1000);
-			goto again;
-		}
-#endif
-
 		if (lockres->master_node_num != osb->node_num || 
 		    lockres->master_node_num != DISK_LOCK_CURRENT_MASTER (disklock)) {
 			lockres->master_node_num = DISK_LOCK_CURRENT_MASTER (disklock);
@@ -1480,6 +1291,7 @@
 		OCFS_BH_PUT_DATA(*b);
 	}
 
+reevaluate:
 	no_owner = (lockres->master_node_num == OCFS_INVALID_NODE_NUM);
 
 	/* master node is an invalid node */
@@ -1489,181 +1301,122 @@
 		goto finally;
 	}
 
-	truncate_extend = (flags & (FLAG_FILE_EXTEND | FLAG_FILE_TRUNCATE));
-	local_lock = (lockres->master_node_num == osb->node_num);
 	wait_on_recovery = TEST_NODE_IN_RECOVERY(osb, lockres->master_node_num);
 	owner_dead = !(no_owner || IS_NODE_ALIVE(osb->publ_map, 
 			 lockres->master_node_num, OCFS_MAXIMUM_NODES));
+	if ((owner_dead || wait_on_recovery) && 
+	    lockres->readonly_node == lockres->master_node_num) {
+		// if owner is dead or in recovery and the lockres 
+		// has the readonly owner set, clear it
+		lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+	}
 
-	if (!local_lock && (wait_on_recovery || no_owner || owner_dead)) {
-		lock_path = become_master;
+	status = 0;
+	extra_lock_flags = 0;
+
+	if (flags & FLAG_READDIR) {
+		if (lockres->readonly_node != OCFS_INVALID_NODE_NUM)
+			goto skip_lock_write;
+		if (lockres->master_node_num == osb->node_num &&
+		    lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK) {
+			/* local node is master */
+			printk("acquirelock: setting ronode, was=%d, now=%d, master=%d\n", 
+			       lockres->readonly_node, osb->node_num, lockres->master_node_num);
+			lockres->readonly_node = osb->node_num;
+			goto skip_lock_write;
+		}
+
+		if (lockres->master_node_num == OCFS_INVALID_NODE_NUM ||
+			   owner_dead || wait_on_recovery) {
+			/* no master or dead master */
+			extra_lock_flags = FLAG_REMASTER;
+		} else {
+			/* valid master, but either not cachelock or elsewhere */
+			if (lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK) {
+				/* treat just like a normal master change request */
+				extra_lock_flags = FLAG_CHANGE_MASTER;
+			}
+		}
+		goto do_lock;
+	} 
+
+	// anything else is NOT a readdir request
+	if (lockres->readonly_node != osb->node_num)
+		lockres->readonly_node = OCFS_INVALID_NODE_NUM; // clear any owner
+
+	status = ocfs_wait_for_readonly_drop(osb, lockres);
+	if (status < 0) {
+		if (status == -ETIMEDOUT)
+			goto again;
+		if (status == -EAGAIN)
+			goto reevaluate;
+		LOG_ERROR_STATUS(status);
+		goto finally;
+	}
+
+	if (lockres->master_node_num != osb->node_num &&
+	    (wait_on_recovery || no_owner || owner_dead)) {
+		extra_lock_flags = FLAG_REMASTER;
 	} else if (flags & (FLAG_FILE_DELETE | FLAG_FILE_RENAME)) {
-		lock_path = get_x;
-	} else if (local_lock) {
-		if (truncate_extend)
-			lock_path = become_master;
+		if (ocfs_journal_new_file_search(osb, lock_id)!=0) {
+			extra_lock_flags = 0;
+		} else if (lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK)
+			extra_lock_flags = FLAG_FAST_PATH_LOCK;
 		else 
-			lock_path = fast_path;
+			extra_lock_flags = FLAG_CHANGE_MASTER;
+	} else if (lockres->master_node_num == osb->node_num) {
+		if (flags & (FLAG_FILE_EXTEND | FLAG_FILE_TRUNCATE) && 
+		    ocfs_journal_new_file_search(osb, lock_id)!=0)
+			extra_lock_flags = FLAG_REMASTER;
+		else if (lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK)
+			extra_lock_flags = FLAG_FAST_PATH_LOCK;
+		else 
+			extra_lock_flags = FLAG_CHANGE_MASTER;
 	} else {
-		lock_path = master_request;
+		extra_lock_flags = FLAG_CHANGE_MASTER;
 	}
 			
-	/* hack upon hack... if the cachelock is still sitting around, skip voting */
-	if ((lock_path == become_master || lock_path == get_x) &&
-		ocfs_journal_new_file_search(osb, lock_id)==0)
-			lock_path = fast_path;
 
-	if (lock_path == fast_path && lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK) {
-		LOG_TRACE_ARGS("testing testing!!! flipping this fast_path to master_request\n");
-		lock_path = master_request;
-	}
-		
+do_lock:
+	flags |= extra_lock_flags;
 
-	LOG_TRACE_ARGS("lockres: master=%d, locktype=%d, flags: %d, lock_path: %s\n",
-		       lockres->master_node_num, lockres->lock_type, flags, 
-		       lock_path_str(lock_path));
-
-	switch (lock_path) {
-		case fast_path: /* master node is this node */
-		{
-			/* specifically keep an exclusive if we already have one on */
-			/* this node even if we are asking for a cache lock */
-			disklock = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(*b); /* read */
-			keep_exclusive = (DISK_LOCK_FILE_LOCK (disklock) == OCFS_DLM_EXCLUSIVE_LOCK);
-			if (keep_exclusive)
-				LOG_ERROR_STR("keep_exclusive set!");
-			OCFS_BH_PUT_DATA(*b);
-			break;
-		}
-		case become_master:  	/* there is no master, or master needs recovery */
-		case get_x:		/* a delete or rename request */
-		{
-			if (wait_on_recovery && !(flags & FLAG_FILE_RECOVERY)) {
-				int waitcnt = 0;
-				LOG_TRACE_ARGS("Waiting on node %u to be recovered\n",
-				       	lockres->master_node_num);
-				while (1) {
-					LOG_TRACE_ARGS("waitcnt = %d\n", waitcnt);
-					if (!TEST_NODE_IN_RECOVERY(osb, lockres->master_node_num))
-						break;
-					ocfs_sleep(500);
-				}
-			}
+	LOG_TRACE_ARGS("lockres: master=%d, locktype=%d, flags: %08x\n",
+		       lockres->master_node_num, lockres->lock_type, flags);
 	
-			status = ocfs_make_lock_master (osb, lock_id, lock_type,
-					   	flags, lockres, *b, &disk_vote, inode);
+	printk("lockres: lockid=%u.%u, this=%d, master=%d, locktype=%d, flags=%08x, ronode=%d, romap=%08x\n",
+		       lockres->sector_num, osb->node_num, lockres->master_node_num, lockres->lock_type, flags,
+		       lockres->readonly_node, lockres->readonly_map);
 	
-			if (status < 0) {
-				ocfs_release_lockres (lockres);
-				if (status == -EAGAIN) {
-					ocfs_sleep (500);
-					if (ocfs_task_interruptible (osb)) {
-						LOG_TRACE_ARGS("interrupted... lockid=%u.%u\n", HILO(lock_id));
-						status = -EINTR;
-						goto finally;
-					}
-	
-					updated = false;
-					goto again;
-				}
-				goto finally;
-			}
-	
-			/* make lock master succeeded */
-			/* so why, if get_x and the make lock master do the same thing,
-		 	* does the make lock master path need to rewrite the stuff to disk
-		 	* but the get_x path doesn't ???? */
-			if (get_x)
-				goto skip_lock_write;
-			keep_exclusive = false;
-			break;
+	if (wait_on_recovery && !(flags & FLAG_FILE_RECOVERY)) {
+		int waitcnt = 0;
+		LOG_TRACE_ARGS("Waiting on node %u to be recovered\n",
+			       	lockres->master_node_num);
+		while (1) {
+			LOG_TRACE_ARGS("waitcnt = %d\n", waitcnt);
+			if (!TEST_NODE_IN_RECOVERY(osb, lockres->master_node_num))
+				break;
+			ocfs_sleep(500);
 		}
-#if 0   // we never hit this case anymore. lets not bloat things...
-		case wait_for_release: 	/* there is a valid, live master and it's not this node */
-					/* if the lock is acquired already by the master wait */
-					/* for release, else change master */
-		{
-			ocfs_release_lockres(lockres);
-			status = ocfs_wait_for_lock_release (osb, lock_id, 30000, lockres,
-			     	((flags & FLAG_DIR) ?  OCFS_DLM_SHARED_LOCK : OCFS_DLM_NO_LOCK), inode);
-			if (status == 0 || status == -ETIMEDOUT) {
-				/* lock released or waited too long, back to top */
-				if (status == -ETIMEDOUT) {
-					LOG_TRACE_ARGS("lock %u.%u, level %d, not being freed by node %u\n", 
-					       	HILO(lock_id), lockres->lock_type, lockres->master_node_num);
-				}
-				updated = false;
-				goto again;
-			}
-			if (status != -EINTR) {
-				LOG_ERROR_STR ("Lock owner is alive and taking too much time");
-						LOG_ERROR_STATUS(status);
-			}
-			goto finally;
-		}
-#endif
-		case master_request:
-		{
-			status = ocfs_update_lock_state (osb, lockres, flags | FLAG_CHANGE_MASTER, 
-							 &disk_vote, inode);
-			if (status < 0) {
-				ocfs_release_lockres (lockres);
-				if (status == -EAGAIN) {
-					ocfs_sleep (500);
-					if (ocfs_task_interruptible (osb)) {
-						LOG_TRACE_ARGS("interrupted... lockid=%u.%u\n",
-							HILO(lockres->sector_num));
-						status = -EINTR;
-						goto finally;
-					}
-					updated = false;
-					goto again;
-				}
+	}
+
+	status = new_lock_function(osb, lock_type, flags, lockres, *b, &disk_vote, inode);
+	if (status < 0) {
+		ocfs_release_lockres (lockres);
+		if (status == -EAGAIN) {
+			ocfs_sleep (500);
+			if (ocfs_task_interruptible (osb)) {
+				LOG_TRACE_ARGS("interrupted... lockid=%u.%u\n",
+					HILO(lockres->sector_num));
+				status = -EINTR;
 				goto finally;
 			}
-			// successfully got vote to change master
-			status = ocfs_read_bh (osb, lock_id, b, lockflags, inode);
-			if (status < 0) {
-				LOG_ERROR_STATUS (status);
-				goto finally;
-			}
-		
-			keep_exclusive = false;
-			break;
+			updated = false;
+			goto again;
 		}
-		default:
-		{
-			LOG_ERROR_ARGS("unknown lock type (path=%d)\n", 
-				       lock_path);
-			status = -EINVAL;
-			goto finally;
-		}
-	}
-
-	disklock = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(*b); /* read */
-	have_cache_already = (DISK_LOCK_CURRENT_MASTER (disklock) == osb->node_num &&
-			      DISK_LOCK_FILE_LOCK (disklock) == OCFS_DLM_ENABLE_CACHE_LOCK);
-	OCFS_BH_PUT_DATA(*b);
-
-	if (!keep_exclusive && !have_cache_already) {
-		disklock = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(*b); /* write */
-		DISK_LOCK_CURRENT_MASTER (disklock) = osb->node_num;
-		DISK_LOCK_FILE_LOCK (disklock) = lock_type;
-		OCFS_BH_PUT_DATA(*b);
-
-		status = ocfs_write_bh (osb, *b, 0, inode);
-		if (status < 0) {
-			LOG_ERROR_STATUS (status);
-			goto finally;
-		}
-	}
-
+		goto finally;
+	} 
+			
 	/* We got the lock */
-	disklock = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(*b); /* read */
-	lockres->lock_type = lock_type;
-	lockres->master_node_num = osb->node_num;
-	lockres->oin_openmap = DISK_LOCK_OIN_MAP (disklock);
-	OCFS_BH_PUT_DATA(*b);
 	status = 0;
 
 skip_lock_write:
@@ -1763,23 +1516,6 @@
 	if (!(flags & FLAG_FILE_UPDATE_OIN) && !(flags & FLAG_FILE_DELETE))
 		goto finally;
 
-#if 0
-	if (comm_voting) {
-		LOG_TRACE_STR ("Network vote");
-		status = ocfs_send_dlm_request_msg (osb, lock_id, lock_type,
-						    flags, lockres, votemap);
-		if (status >= 0)
-			goto finally;
-		if (status == -ETIMEDOUT) {
-			LOG_TRACE_STR ("Network voting timed out");
-			lockres->vote_state = 0;
-		}
-	}
-
-	LOG_TRACE_STR ("Disk vote");
-	disk_vote = true;
-	jif = jiffies;
-#endif
 	status = -EAGAIN;
 	while (status == -EAGAIN) {
 		if (comm_voting && !disk_vote) {
@@ -1820,7 +1556,7 @@
 			goto finito;
 		}
 
-		tmpstat = ocfs_reset_voting (osb, lock_id, lock_type, oin_node_map);
+		tmpstat = ocfs_reset_voting (osb);
 		if (tmpstat < 0) {
 			LOG_ERROR_STATUS (status = tmpstat);
 			goto finito;
@@ -1841,7 +1577,7 @@
 	LOG_TRACE_ARGS ("Lock time: %u\n", jif);
 
 	if (disk_vote && !disk_reset) {
-		tmpstat = ocfs_reset_voting (osb, lock_id, lock_type, oin_node_map);
+		tmpstat = ocfs_reset_voting (osb);
 		if (tmpstat < 0)
 			LOG_ERROR_STATUS (tmpstat);
 	}
@@ -1849,14 +1585,15 @@
 	fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(*b); /* write */
 
 	LOG_TRACE_ARGS("writing lock now... releasemaster: %s, level: %d, master: %d\n",
-		       flags & FLAG_FILE_RELEASE_MASTER, DISK_LOCK_FILE_LOCK (fe),
+		       flags & FLAG_FILE_RELEASE_MASTER ? "yes" : "no", 
+		       DISK_LOCK_FILE_LOCK (fe),
 		       DISK_LOCK_CURRENT_MASTER (fe));
 	if (flags & FLAG_FILE_RELEASE_MASTER)
 		DISK_LOCK_CURRENT_MASTER (fe) = OCFS_INVALID_NODE_NUM;
 
 	if ((DISK_LOCK_FILE_LOCK (fe) == OCFS_DLM_ENABLE_CACHE_LOCK) &&
 	    (DISK_LOCK_CURRENT_MASTER (fe) == osb->node_num)) {
-		lockres->lock_state = OCFS_DLM_ENABLE_CACHE_LOCK; 
+		lockres->lock_type = OCFS_DLM_ENABLE_CACHE_LOCK; 
 		cachelock = true;
 		LOG_TRACE_STR("keeping at CACHE_LOCK");
 	}
@@ -1907,26 +1644,21 @@
 		OCFS_BH_PUT_DATA(bh);
 	}
 
+	OCFS_ASSERT(lock_type != OCFS_DLM_SHARED_LOCK);
 
-	if (lock_type == OCFS_DLM_SHARED_LOCK) {
-		if (atomic_dec_and_test (&lockres->lr_share_cnt)) {
-			if (lockres->lock_type == OCFS_DLM_SHARED_LOCK)
-				lockres->lock_type = OCFS_DLM_NO_LOCK;
-		}
+	if ((lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK) &&
+	    (lockres->master_node_num == osb->node_num) &&
+	    !(flags & FLAG_FILE_DELETE)) {
 		status = 0;
 		goto finally;
 	}
-	/*
-	 * Change flags based on which kind of lock we are releasing
-	 * For directory we need special handling of oin updates when the release
-	 * is for XBcast
-	 * For file we need to update oin's
-	 * For Shared we need to update the lock state locally only
-	 */
 
-	if ((lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK) &&
-	    (lockres->master_node_num == osb->node_num) &&
-	    !(flags & FLAG_FILE_DELETE)) {
+	if (flags & FLAG_READDIR) {
+		if (lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK ||
+		    lockres->master_node_num != lockres->readonly_node ||
+		    lockres->master_node_num == OCFS_INVALID_NODE_NUM)
+			LOG_ERROR_ARGS("READDIR release has issues! type=%d, master=%d, ronode=%d\n",
+				       lockres->lock_type, lockres->master_node_num, lockres->readonly_node);
 		status = 0;
 		goto finally;
 	}
@@ -1968,96 +1700,10 @@
 }				/* ocfs_init_dlm */
 
 /*
- * ocfs_create_log_extent_map()
- *
- */
-int ocfs_create_log_extent_map (ocfs_super * osb, __u64 diskOffset, __u64 ByteCount)
-{
-	int status = 0;
-	__u32 i;
-	__u64 remainingLength;
-	__u32 length, byte_cnt;
-	__u32 num_runs = 0;
-	__s64 to_find = 0, found_foff = 0, found_doff = 0;
-
-	LOG_ENTRY ();
-
-	to_find = diskOffset;
-	remainingLength = ByteCount;
-
-	ocfs_down_sem (&(osb->map_lock), true);
-
-	num_runs = osb->metadata_map.count;
-
-	for (i = 0; i < num_runs; i++) {
-		if (!ocfs_get_next_extent_map_entry
-		    (osb, &osb->metadata_map, i, &found_foff,
-		     &found_doff, &length))
-			continue;
-
-		if (found_doff >= (to_find + remainingLength))
-			break;
-
-		if (to_find >= (found_doff + length)) {
-			/* This meta data run is before the relevant stf */
-			continue;
-		}
-	
-		if (to_find < found_doff) {
-			if (to_find + remainingLength > found_doff) {
-				/* We have a data run and a metadata run */
-				remainingLength -= found_doff - to_find;
-				to_find = found_doff;
-				byte_cnt = remainingLength<length?remainingLength:length;
-				osb->log_file_size += byte_cnt;
-				ocfs_add_extent_map_entry (osb, &osb->trans_map, 
-				   	found_doff, found_doff, byte_cnt);
-				remainingLength -= byte_cnt;
-				to_find += byte_cnt;
-				if (remainingLength > 0)
-					continue;
-				else
-					break;
-			}
-		} else { /* if ((to_find >= found_doff)) */
-			if (to_find + remainingLength <= found_doff + length) {
-				/* It is only metadata */				
-				osb->log_file_size += remainingLength;
-				ocfs_add_extent_map_entry (osb, &osb->trans_map, 
-				   	to_find, to_find, remainingLength);
-				remainingLength -= remainingLength;
-				to_find += remainingLength;
-				break;
-			} else {
-				/* Meta data and as yet unknown data */
-				byte_cnt = length - (to_find - found_doff);
-				osb->log_file_size += byte_cnt;
-				ocfs_add_extent_map_entry (osb, &osb->trans_map, 
-				   	to_find, to_find, byte_cnt);
-				remainingLength -= byte_cnt;
-				to_find += byte_cnt;
-				continue;
-			}
-		}
-	}
-
-	ocfs_up_sem (&(osb->map_lock));
-
-	/* Create new extent map from real runs */
-
-
-	if (osb->log_file_size >= OCFS_TRANS_FLUSH_LIMIT) {
-		osb->needs_flush = true;
-	}
-
-	LOG_EXIT_STATUS (status);
-	return status;
-}				/* ocfs_create_log_extent_map */
-
-/*
  * ocfs_break_cache_lock()
  *
  */
+/* TODO: merge down into new lock function */
 static int ocfs_break_cache_lock (ocfs_super * osb, ocfs_lock_res * lockres, struct inode *inode)
 {
 	int status;
@@ -2078,27 +1724,6 @@
 
 	jif = jiffies;
 
-#if 0
-	if (comm_voting) {
-		LOG_TRACE_STR ("Network vote");
-		status = ocfs_send_dlm_request_msg (osb, lockres->sector_num,
-						    lockres->lock_type,
-						    FLAG_FILE_RELEASE_CACHE,
-						    lockres, votemap);
-		if (status >= 0) {
-			lockres->lock_type = OCFS_DLM_NO_LOCK;
-			goto finally;
-		}
-		if (status == -ETIMEDOUT) {
-			LOG_TRACE_STR ("Network voting timed out");
-			lockres->vote_state = 0;
-		}
-	}
-
-	LOG_TRACE_STR ("Disk vote");
-	disk_vote = true;
-	jif = jiffies;
-#endif
 	status = -EAGAIN;
 	while (status == -EAGAIN) {
 		if (!IS_NODE_ALIVE (osb->publ_map, lockres->master_node_num,
@@ -2165,8 +1790,7 @@
 		}
 
 reset:
-		tmpstat = ocfs_reset_voting (osb, lockres->sector_num,
-					    lockres->lock_type, votemap);
+		tmpstat = ocfs_reset_voting (osb);
 		if (tmpstat < 0) {
 			LOG_ERROR_STATUS (status = tmpstat);
 			goto finally;
@@ -2203,8 +1827,7 @@
 	LOG_TRACE_ARGS ("Lock time: %u\n", jif);
 
 	if (disk_vote && !disk_reset) {
-		tmpstat = ocfs_reset_voting (osb, lockres->sector_num,
-					    lockres->lock_type, votemap);
+		tmpstat = ocfs_reset_voting (osb);
 		if (tmpstat < 0)
 			LOG_ERROR_STATUS (tmpstat);
 	}
@@ -2239,3 +1862,290 @@
 	ocfs_inc_inode_seq(osb, inode, false);
 	return 0;
 }
+
+
+/* TODO: merge down into new lock function */
+int ocfs_send_readonly_drop_message(ocfs_super *osb, ocfs_lock_res *lockres, __u64 vote_map)
+{
+	int status = 0, tmpstat;
+	__u64 lock_id = lockres->sector_num, lockseqnum = 0;
+	bool disk_vote = false;
+
+	LOG_ENTRY ();
+
+	if (comm_voting) {
+		status = ocfs_send_dlm_request_msg (osb, lock_id, OCFS_DLM_ENABLE_CACHE_LOCK,
+						    FLAG_DROP_READONLY, lockres, vote_map);
+		if (status >= 0) {
+			status = lockres->vote_status;
+			goto bail;
+		} else if (status == -ETIMEDOUT)
+			LOG_TRACE_STR ("Network voting timed out");
+		else
+			LOG_ERROR_STATUS (status);
+		lockres->vote_state = 0;
+	}
+
+	disk_vote = true;
+	status = ocfs_request_vote (osb, lock_id, OCFS_DLM_ENABLE_CACHE_LOCK, FLAG_DROP_READONLY, 
+				    vote_map, &lockseqnum, NULL);
+	if (status < 0) {
+		if (status != -EAGAIN)
+			LOG_ERROR_STATUS (status);
+		goto bail;
+	}
+
+	status = ocfs_wait_for_vote (osb, lock_id, OCFS_DLM_ENABLE_CACHE_LOCK, FLAG_DROP_READONLY, 
+				     vote_map, 5000, lockseqnum, lockres);
+	if (status < 0) {
+		if (status != -EAGAIN)
+			LOG_ERROR_STATUS (status);
+		goto bail;
+	}
+
+bail:
+	if (disk_vote) {
+		tmpstat = ocfs_reset_voting (osb);
+		if (tmpstat < 0)
+			LOG_ERROR_STATUS (tmpstat);
+	}
+
+	LOG_EXIT_STATUS (status);
+	return status;
+}
+
+
+int new_lock_function(ocfs_super * osb, __u32 requested_lock, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, bool *disk_vote, struct inode *inode)	
+{
+	__u64 vote_map = 0;
+	__u64 lockseqnum = 0;
+	int tmpstat;
+	ocfs_file_entry *fe = NULL;
+	__u64 lock_id;
+	__u32 lock_write_flags = DLOCK_FLAG_MASTER | DLOCK_FLAG_LOCK | DLOCK_FLAG_OPEN_MAP;
+	__u32 lock_type = requested_lock;
+	bool need_to_zap_buffers = false, need_lock_write = true;
+	bool is_readdir = (flags & FLAG_READDIR) ? true : false;
+	int status = 0;
+
+	LOG_ENTRY ();
+
+	ocfs_acquire_lockres (lockres);
+	lock_id = lockres->sector_num;
+
+	if (flags & FLAG_READDIR) {
+	        if (flags & (FLAG_CHANGE_MASTER | FLAG_REMASTER)) {
+			/* there is no readonly_node.  treat like normal change master. */
+			flags &= ~FLAG_READDIR;
+		}
+	} else if (flags & FLAG_CHANGE_MASTER) {
+		/* non-readdir with CHANGE_MASTER should have no readonly_node */
+		if (lockres->readonly_node != OCFS_INVALID_NODE_NUM) {
+			LOG_ERROR_ARGS("change_master but readonly_node was %d\n", 
+				       lockres->readonly_node);
+			lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+		}
+	}
+
+	/* TODO: take this out when all is ok */
+	if (flags & FLAG_READDIR) {
+		/* only send a message with FLAG_READDIR in it if  
+		 * the recipient already has a cachelock but is not
+		 * currently set as the readonly_node */
+		OCFS_ASSERT(lockres->master_node_num != osb->node_num);
+		OCFS_ASSERT(lockres->master_node_num != OCFS_INVALID_NODE_NUM);
+		OCFS_ASSERT(lockres->readonly_node == OCFS_INVALID_NODE_NUM);
+		OCFS_ASSERT(lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK);
+	}
+
+
+	if (flags & (FLAG_CHANGE_MASTER | FLAG_REMASTER)) {
+		/* on a master change... */
+		need_to_zap_buffers = true; /* need to dump local buffers */
+		need_lock_write = true;     /* and rewrite the lock */
+	} else if (flags & FLAG_ADD_OIN_MAP) {
+		need_lock_write = false;
+	} else if (flags & FLAG_READDIR) {
+		need_lock_write = false;
+		need_to_zap_buffers = true;
+	} else {
+		fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(bh); /* read */
+		/* may not need to rewrite the lock later if we already have a cachelock */
+		need_lock_write = !(DISK_LOCK_CURRENT_MASTER (fe) == osb->node_num &&
+					DISK_LOCK_FILE_LOCK (fe) == OCFS_DLM_ENABLE_CACHE_LOCK);
+		OCFS_BH_PUT_DATA(bh);
+	}
+
+	/* that's why it's called fast path */	
+	if (flags & FLAG_FAST_PATH_LOCK)
+		goto vote_success;
+
+
+	
+	/* figure out who to vote with */
+	if (flags & (FLAG_REMASTER | FLAG_FILE_DELETE | FLAG_FILE_RENAME))
+		vote_map = osb->publ_map; /* broadcast */
+	else {
+		vote_map = (1 << lockres->master_node_num);  /* just owner */
+		lock_type = lockres->lock_type;
+	}
+	vote_map &= ~(1 << osb->node_num); // remove this node
+	if (vote_map == 0) {
+		/* As this is the only node alive, make it master of the lock */
+		/* no need to update open map */
+		lock_write_flags &= ~(DLOCK_FLAG_OPEN_MAP);
+		goto vote_success;
+	}
+
+
+
+	/* net voting */
+	if (comm_voting && !*disk_vote) {
+		LOG_TRACE_STR ("Network vote");
+		status = ocfs_send_dlm_request_msg (osb, lock_id, lock_type, flags, lockres, vote_map);
+		if (status >= 0) {
+			status = lockres->vote_status;
+			if (status >= 0)
+				goto vote_success;
+			else
+				goto bail;
+		} else if (status == -ETIMEDOUT) {
+			LOG_TRACE_STR ("Network voting timed out");
+		}
+		else
+			LOG_ERROR_STATUS (status);
+		lockres->vote_state = 0;
+	}
+
+
+
+	/* disk voting */
+	LOG_TRACE_STR ("Disk vote");
+	*disk_vote = true;
+	status = ocfs_request_vote (osb, lock_id, lock_type, flags, vote_map, &lockseqnum, inode);
+	if (status < 0) {
+		if (status != -EAGAIN)
+			LOG_ERROR_STATUS (status);
+		goto bail;
+	}
+
+	status = ocfs_wait_for_vote (osb, lock_id, lock_type, flags, vote_map, 5000, lockseqnum, lockres);
+	if (status < 0) {
+		if (status != -EAGAIN)
+			LOG_ERROR_STATUS (status);
+		goto bail;
+	}
+
+vote_success:
+	if (need_to_zap_buffers)
+		ocfs_break_cache_lock_zap_buffers(osb, inode);
+	
+	/* just alerting owner on open */
+	if (flags & FLAG_ADD_OIN_MAP) 
+		goto bail;
+
+	/* converted cachelock to readonly cachelock */
+	if (flags & FLAG_READDIR) {
+		lockres->readonly_node = lockres->master_node_num;
+		goto bail;
+	}
+	
+	/* update the lockres */
+	printk("new_lock_function: set lockid=%u.%u, locktype=%d->%d, master=%d->%d\n",
+	       lockres->sector_num, lockres->lock_type, requested_lock,
+	       lockres->master_node_num, osb->node_num);
+	lockres->master_node_num = osb->node_num;
+	lockres->lock_type = requested_lock;
+
+	/* update the disk lock */
+	if (need_lock_write) {
+		status = ocfs_update_disk_lock (osb, lockres, lock_write_flags, &bh, inode);
+		if (status < 0)
+			LOG_ERROR_STATUS (status);
+	}
+
+	fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(bh); /* read */
+	lockres->oin_openmap = DISK_LOCK_OIN_MAP (fe);
+	OCFS_BH_PUT_DATA(bh);
+
+bail:
+	if (*disk_vote) {
+		tmpstat = ocfs_reset_voting (osb);
+		if (tmpstat < 0)
+			LOG_ERROR_STATUS (tmpstat);
+	}
+
+	/* if we removed FLAG_READDIR above, set the readonly_node now */
+	if (is_readdir && !(flags & FLAG_READDIR)) {
+		lockres->readonly_node = lockres->master_node_num;
+	}
+	
+	ocfs_release_lockres (lockres);
+
+	LOG_EXIT_STATUS (status);
+	return status;
+}
+
+static int _ocfs_wait_for_readonly_drop(ocfs_super *osb, ocfs_lock_res *lockres);
+
+static inline int ocfs_wait_for_readonly_drop(ocfs_super *osb, ocfs_lock_res *lockres)
+{
+	if (lockres->readonly_map == 0ULL)
+		return 0;
+	return _ocfs_wait_for_readonly_drop(osb, lockres);
+}
+	
+#define READONLY_DROP_TRIES 5
+static int _ocfs_wait_for_readonly_drop(ocfs_super *osb, ocfs_lock_res *lockres)
+{
+	int tries = 0;
+	int status = 0;
+
+	LOG_ENTRY();
+	
+	if (lockres->readonly_map != 0ULL) {
+		// if this node is the owner, need to alert all nodes 
+		// in map, set map to 0, ro_node=-1, continue as if normal cache lock
+
+		// if there is a readonly_map, we had better be the owner
+		OCFS_ASSERT(lockres->readonly_node == osb->node_num);
+		if (!lockres->readonly_dropping) {
+			ocfs_get_lockres(lockres);
+			status = ocfs_drop_readonly_cache_lock(osb, lockres);
+			if (status < 0) {
+				LOG_ERROR_STATUS (status);
+				ocfs_release_lockres (lockres);
+				goto exit;
+			}
+		}
+		while (tries < READONLY_DROP_TRIES) {
+			if (lockres->readonly_node != osb->node_num) {
+				if (lockres->readonly_map != 0ULL)
+					LOG_ERROR_STR("readonly_node is not this node, but map is still set");
+				lockres->readonly_map = 0ULL;
+				status = -EAGAIN;
+				goto exit;
+			} else { 
+				OCFS_ASSERT(lockres->master_node_num == osb->node_num);
+				if (lockres->readonly_map == 0ULL) {
+					lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+					status = -EAGAIN;
+					goto exit;
+				}
+			}
+
+			ocfs_release_lockres(lockres);
+			ocfs_sleep (OCFS_NM_HEARTBEAT_TIME / 10);
+			ocfs_acquire_lockres(lockres);
+		}
+
+		// not good.  could not get everyone to release in time.
+		// ????: what do we do here?!
+		ocfs_release_lockres(lockres);
+		status = -ETIMEDOUT;
+	}
+exit:
+
+	LOG_EXIT_STATUS(status);
+	return status;
+}

Modified: trunk/src/file.c
===================================================================
--- trunk/src/file.c	2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/file.c	2004-03-19 07:37:42 UTC (rev 791)
@@ -150,8 +150,11 @@
 		}
 
 		/*  Look on the disk now ... */
+		down(&parent->i_sem);
 		status = ocfs_find_files_on_disk (osb, parent_off, &(dentry->d_name),
 					  &fe_bh, NULL, parent, true);
+		up(&parent->i_sem);
+
 		if (status >= 0) {
 			oin = NULL;
 			ocfs_down_sem (&(osb->osb_res), true);
@@ -665,7 +668,7 @@
 	ocfs_get_lockres (lockres);
 
 	if ((lockres->master_node_num != osb->node_num) ||
-	    (lockres->lock_state != OCFS_DLM_ENABLE_CACHE_LOCK)) {
+	    (lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK)) {
 		oin->cache_enabled = false;
 	} else {
 		oin->cache_enabled = true;

Modified: trunk/src/inc/ocfs.h
===================================================================
--- trunk/src/inc/ocfs.h	2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/inc/ocfs.h	2004-03-19 07:37:42 UTC (rev 791)
@@ -138,7 +138,9 @@
 	ADD_OIN_MAP,          // add requestor into oin map
 	NOT_MASTER,           // I am not master, retry
 	REMASTER_THIS,        // remaster lock to me
-	REMASTER_REQUESTOR    // remaster lock to requestor
+	REMASTER_REQUESTOR,   // remaster lock to requestor
+	DROP_READONLY,	      // RO cachelock needs to convert to RW
+	READONLY	      // a RW or RO cachelock, requesting RO
 };
 
 enum {
@@ -335,8 +337,8 @@
 #define  FLAG_CHANGE_MASTER       0x00000400
 #define  FLAG_ADD_OIN_MAP         0x00000800
 #define  FLAG_DIR                 0x00001000
-#define  FLAG_FILE_UNUSED3        0x00002000
-#define  FLAG_FILE_UNUSED4        0x00004000
+#define  FLAG_REMASTER            0x00002000
+#define  FLAG_FAST_PATH_LOCK      0x00004000
 #define  FLAG_FILE_UNUSED5        0x00008000
 #define  FLAG_FILE_UNUSED6        0x00010000
 #define  FLAG_DEL_NAME            0x00020000
@@ -350,8 +352,8 @@
 #define  FLAG_FILE_UNUSED12       0x02000000
 #define  FLAG_FILE_UNUSED13       0x04000000
 #define  FLAG_FILE_TRUNCATE       0x08000000
-#define  FLAG_FILE_UNUSED14       0x10000000 
-#define  FLAG_FILE_UNUSED15       0x20000000 
+#define  FLAG_DROP_READONLY       0x10000000 
+#define  FLAG_READDIR             0x20000000 
 #define  FLAG_ACQUIRE_LOCK        0x40000000 
 #define  FLAG_RELEASE_LOCK        0x80000000 
 									    
@@ -1831,6 +1833,9 @@
 	__u32 writer_node_num;
 	__u32 reader_node_num;
 	__u32 lock_holders;
+	bool readonly_dropping;
+	__u32 readonly_node;
+	__u64 readonly_map;
 };
 
 struct _ocfs_inode

Modified: trunk/src/inc/proto.h
===================================================================
--- trunk/src/inc/proto.h	2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/inc/proto.h	2004-03-19 07:37:42 UTC (rev 791)
@@ -51,6 +51,7 @@
 int ocfs_init_dlm (void);
 void ocfs_process_one_vote_reply(ocfs_super *osb, ocfs_vote_reply_ctxt *ctxt, __u32 node_num);
 int ocfs_break_cache_lock_zap_buffers(ocfs_super * osb, struct inode * inode);
+int ocfs_send_readonly_drop_message(ocfs_super *osb, ocfs_lock_res *lockres, __u64 vote_map);
 
 
 int ocfs_create_log_extent_map (ocfs_super * osb, __u64 diskOffset, __u64 ByteCount);
@@ -208,6 +209,7 @@
 int ocfs_recv_udp_msg (ocfs_recv_ctxt * recv_ctxt);
 int ocfs_send_dismount_msg (ocfs_super * osb, __u64 vote_map);
 int ocfs_send_vote_reply (ocfs_super * osb, ocfs_dlm_msg * dlm_msg, __u32 vote_status, bool inode_open);
+int ocfs_drop_readonly_cache_lock(ocfs_super *osb, ocfs_lock_res *lockres);
 
 
 void ocfs_initialize_bitmap (ocfs_alloc_bm * bitmap, __u32 validbits, __u32 allocbits);

Modified: trunk/src/namei.c
===================================================================
--- trunk/src/namei.c	2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/namei.c	2004-03-19 07:37:42 UTC (rev 791)
@@ -273,10 +273,19 @@
 
 	fe = (ocfs_file_entry *) OCFS_BH_GET_DATA_READ(new_fe_bh); /* read */
 
-	/* is this safe if we no longer have it locked? */
 	if (oin->lock_res != NULL) {
-		oin->lock_res->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
-		oin->lock_res->lock_state = DISK_LOCK_FILE_LOCK (fe);
+		ocfs_lock_res *lockres = oin->lock_res;
+		ocfs_acquire_lockres(lockres);
+		lockres->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
+		lockres->lock_type = DISK_LOCK_FILE_LOCK (fe);
+		if (lockres->readonly_node != OCFS_INVALID_NODE_NUM &&
+	    	    lockres->readonly_node != lockres->master_node_num) {
+			LOG_ERROR_ARGS("no longer readonly! ronode=%d, master=%d, lockid=%u.%u\n",
+					lockres->readonly_node, lockres->master_node_num,
+					lockres->sector_num);
+			lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+		}
+		ocfs_release_lockres(lockres);
 	}
 	
         /*  Insert the OFile on the OIN list */

Modified: trunk/src/nm.c
===================================================================
--- trunk/src/nm.c	2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/nm.c	2004-03-19 07:37:42 UTC (rev 791)
@@ -37,22 +37,30 @@
 static int ocfs_search_commited(ocfs_super *osb, ocfs_lock_res *lockres);
 static int ocfs_schedule_process_vote(ocfs_super *osb, struct buffer_head *bh, int vote_node);
 
+static int _ocfs_drop_readonly_cache_lock(void *arg);
+
+typedef struct _ocfs_ro_cache_drop_ctxt
+{
+	ocfs_super *osb;
+	ocfs_lock_res *lockres;
+} ocfs_ro_cache_drop_ctxt;
+
+
 void ocfs_process_vote_worker(void *val);
 
-#ifdef VERBOSE_PROCESS_VOTE
 static const char *process_vote_strings[] = {
 	"INVALID_REQUEST",      // reply with a NO vote
 	"UPDATE_OIN_INODE",     // update both oin and inode
-	"UPDATE_INODE",	      // no oin, so only update inode
+	"UPDATE_INODE",	        // no oin, so only update inode
 	"DELETE_RENAME",        // delete or rename request (EX)
 	"RELEASE_CACHE",        // release a cache lock I hold
 	"CHANGE_MASTER",        // request to change master to requestor
 	"ADD_OIN_MAP",          // add requestor into oin map
 	"NOT_MASTER",           // I am not master, retry
 	"REMASTER_THIS",        // remaster lock to me
-	"REMASTER_REQUESTOR"    // remaster lock to requestor
+	"REMASTER_REQUESTOR",   // remaster lock to requestor
+	"DROP_READONLY"	        // RO cachelock needs to convert to RW
 };
-#endif
 
 /*
  * ocfs_recv_thread()
@@ -485,6 +493,15 @@
 	lock_res->lock_type = DISK_LOCK_FILE_LOCK (fe);
 	lock_res->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
 	lock_res->oin_openmap = DISK_LOCK_OIN_MAP (fe);
+
+	if (lock_res->readonly_node != OCFS_INVALID_NODE_NUM &&
+    	    lock_res->readonly_node != lock_res->master_node_num) {
+		LOG_ERROR_ARGS("no longer readonly! ronode=%d, master=%d, lockid=%u.%u\n",
+				lock_res->readonly_node, lock_res->master_node_num,
+				lock_res->sector_num);
+		lock_res->readonly_node = OCFS_INVALID_NODE_NUM;
+	}
+
 	OCFS_BH_PUT_DATA(*bh);
 
 	ocfs_release_lockres (lock_res);
@@ -620,8 +637,10 @@
 {
 	int vote_type = INVALID_REQUEST;
 	bool my_node_wins = false;
+	__u64 lockid = lockres ? lockres->sector_num : 0ULL;
 
-	LOG_ENTRY_ARGS("(status=%d)\n", status);
+	LOG_ENTRY_ARGS("(status=%d, lockid=%u.%u, node_num=%d, flags=%08x)\n", status,
+		       HILO(lockid), node_num, flags);
 
 	*oin = NULL;
 	*master_alive = true;
@@ -640,6 +659,18 @@
 			*oin = lockres->oin;
 	}
 
+	if (flags & FLAG_DROP_READONLY) {
+		vote_type = DROP_READONLY;
+		goto done;
+	} else if (flags & FLAG_READDIR) {
+		if (lockres->master_node_num == osb->node_num &&
+		    lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK) {
+			vote_type = READONLY;
+		} else 
+			vote_type = INVALID_REQUEST;
+		goto done;
+	}
+		
 	if (flags & (FLAG_FILE_DELETE | FLAG_FILE_RENAME))
 		vote_type = DELETE_RENAME;
 	else if (flags & FLAG_FILE_RELEASE_CACHE)
@@ -666,7 +697,7 @@
 		else
 			vote_type = REMASTER_REQUESTOR;
 	}
-	
+done:
 	/* the only allowable action if we failed to */
 	/* get the lockres is a simple inode update */
 	if (status < 0 && vote_type != UPDATE_INODE) {
@@ -738,11 +769,10 @@
 	struct inode *inode = NULL;
 	bool master_alive = true, is_dir = false;
 	bool is_locked, open_handle;
-	int lockflags = 0, in_cache = 0;
+	int lockflags = 0;
 	bool inc_inode_seq = false;
 	bool disk_vote = (ctxt->request_method == DISK_VOTE);
 	bool comm_vote = (ctxt->request_method == COMM_VOTE);
-	bool have_trans_lock = false;
 	bool have_i_sem = false;
 	ocfs_publish *publish = (disk_vote ? ctxt->u.publish : NULL);
 	ocfs_dlm_msg *dlm_msg = (comm_vote ? ctxt->u.dlm_msg : NULL);
@@ -814,14 +844,39 @@
 
 	vote_type = get_process_vote_action(osb, lockres, node_num, flags, 
 					    status, &master_alive, &oin);
-	
+
 #ifdef VERBOSE_PROCESS_VOTE
 	printk("(%u) ocfs_process_vote: %s request for lockid: %u.%u, action: %s, type: %s\n", ocfs_getpid(),
 	       flags & FLAG_RELEASE_LOCK ? "RELEASE" : 
 	       (flags & FLAG_ACQUIRE_LOCK ? "ACQUIRE" : "MODIFY"), lock_id,
 	       process_vote_strings[vote_type], disk_vote ? "disk vote" : "net vote" );
 #endif
+	printk("process_vote: this=%d, master=%d, locktype=%d, flags=%08x, ronode=%d, romap=%08x\n",
+		       osb->node_num, lockres->master_node_num, lockres->lock_type, flags,
+		       lockres->readonly_node, lockres->readonly_map);
 
+	/* get_process_vote_action will only allow CHANGE_MASTER, RELEASE_CACHE, and
+	 * ADD_OIN_MAP on a CACHE lock held by this node.  the CHANGE_MASTER/RELEASE_CACHE
+	 * path needs to check the readonly map to see if any nodes need to be updated.  this
+	 * is not necessary for the ADD_OIN_MAP path since it cannot actually modify any
+	 * data or metadata under the lock.
+	 */
+
+#if 0
+/* TODO: REMOVEME! */
+if (flags & FLAG_READDIR) {
+	printk("ocfs_process_vote: READDIR %s request for lockid: %u.%u, action: %s, type: %s\n",
+       		flags & FLAG_RELEASE_LOCK ? "RELEASE" : 
+       		(flags & FLAG_ACQUIRE_LOCK ? "ACQUIRE" : "MODIFY"), lock_id,
+       		process_vote_strings[vote_type], disk_vote ? "disk vote" : "net vote" );
+} else if (vote_type == DROP_READONLY) {
+	printk("ocfs_process_vote: DROP_READONLY %s request for lockid: %u.%u, action: %s, type: %s\n",
+       		flags & FLAG_RELEASE_LOCK ? "RELEASE" : 
+       		(flags & FLAG_ACQUIRE_LOCK ? "ACQUIRE" : "MODIFY"), lock_id,
+       		process_vote_strings[vote_type], disk_vote ? "disk vote" : "net vote" );
+}
+#endif
+
 	if (inode && (vote_type != DELETE_RENAME)) {
 		/* Ok, for all operations where we no longer need
 		 * isem, drop it now. */
@@ -969,7 +1024,7 @@
 			
 			/* Change the master if there is no lock */
 			if (lockres->master_node_num == osb->node_num &&
-			    lockres->lock_state <= OCFS_DLM_SHARED_LOCK) {
+			    lockres->lock_type < OCFS_DLM_EXCLUSIVE_LOCK) {
 				/* Change the lock ownership to the node asking for vote */
 				/* and write new master on the disk */
 	
@@ -998,6 +1053,32 @@
 
 			break;
 
+		case READONLY:
+			LOG_TRACE_STR("READONLY");
+			OCFS_ASSERT(lockres->readonly_node==osb->node_num ||
+				    lockres->readonly_node==OCFS_INVALID_NODE_NUM);
+
+			// if the requestor just wants to do readdir, we 
+			// drop our buffers, so switch to readonly and done
+			if (inode) {
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
+				sync_mapping_buffers(inode->i_mapping);
+#else
+				fsync_inode_buffers(inode);
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,4,18)
+				fsync_inode_data_buffers(inode);
+#endif
+#endif
+			}
+			printk("do i need a zap buffers here?\n");
+//			ocfs_break_cache_lock_zap_buffers(osb, inode);
+			lockres->readonly_map |= (1 << node_num);
+			printk("READONLY: setting ronode, was=%d, now=%d, master=%d\n", lockres->readonly_node, osb->node_num, lockres->master_node_num);
+			lockres->readonly_node = osb->node_num;
+			vote_response = FLAG_VOTE_NODE;
+			status = 0;
+			break;
+
 		case RELEASE_CACHE:
 		case CHANGE_MASTER:
 			if (vote_type == RELEASE_CACHE)
@@ -1007,66 +1088,48 @@
 
 			status = -EFAIL;
 
-			/* If nobody currently owns the lock, then
-			 * fastpath it. */
-			if (lockres->lock_holders == 0)
-				goto give_lock;
-
-			/* Slow path. We might still be able to give
-			 * him the lock if it's part of the cache and
-			 * we can flush it... */
-
-			LOG_TRACE_ARGS("Lock id (%u.%u) has %u holders\n", 
-				       HILO(lockres->sector_num), 
-				       lockres->lock_holders);
-
-			/* Try to take the trans_lock. We try a couple
-			 * times, with some sleep just in case a
-			 * transaction is about to complete. */
-			have_trans_lock = false;
-			for(i = 0; i < 2; i++) {
-				if (down_trylock(&osb->trans_lock) == 0) {
-					have_trans_lock = true;
-					break;
+			/* requestor will need to retry if anyone is using the lockres */
+			if (lockres->lock_holders > 0) {
+				LOG_TRACE_ARGS("Lock id (%u.%u) has %u holders\n", 
+				       HILO(lockres->sector_num), lockres->lock_holders);
+				down(&(osb->journal.commit_sem));
+				if (ocfs_search_commited(osb, lockres)) {
+					// kick the commit thread
+					atomic_set (&osb->flush_event_woken, 1);
+					wake_up (&osb->flush_event);
 				}
-				ocfs_sleep(100);
-			}
-
-			/* We couldn't get the trans_lock. There's no
-			 * point in going any further. */
-			if (!have_trans_lock) {
-				LOG_TRACE_STR("FLAG_VOTE_UPDATE_RETRY (2)");
+				up(&(osb->journal.commit_sem));
 				vote_response = FLAG_VOTE_UPDATE_RETRY;
 				status = 0;
 				break;
 			}
 
-			/* We have the trans_lock! If it's in the
-			 * commited list, then kick the commit thread
-			 * and vote RETRY this time. Otherwise, it's
-			 * currently in use by another transaction. */
-			down(&(osb->journal.commit_sem));
-			in_cache = ocfs_search_commited(osb, lockres);
-			up(&(osb->journal.commit_sem));
-
-			if (in_cache) {
-				atomic_set (&osb->flush_event_woken, 1);
-				wake_up (&osb->flush_event);
+			/* this is currently a readonly cache lock.
+			 * need to communicate to all the nodes in the 
+			 * map that lock will be changing to RW before we
+			 * continue.  RETRY this request while we spawn 
+			 * off a thread to collect up the communication */
+			if (lockres->readonly_map != 0ULL) {
+				// assumption: node asking for vote has already dropped readonly_node
+				lockres->readonly_map &= ~(1 << node_num);
+				if (lockres->readonly_map != 0ULL) {
+					OCFS_ASSERT(lockres->readonly_node == osb->node_num);
+					status = 0;
+					if (!lockres->readonly_dropping) {
+						ocfs_get_lockres(lockres);
+						if (ocfs_drop_readonly_cache_lock(osb, lockres) < 0) {
+							LOG_ERROR_STATUS(status = -ENOMEM);
+							ocfs_put_lockres(lockres);
+						}
+					}
+					vote_response = FLAG_VOTE_UPDATE_RETRY;
+					break;
+				}
+				// noone left in map, so continue
+				printk("noone left in map, so continue...\n");
+				lockres->readonly_node = OCFS_INVALID_NODE_NUM;
 			}
-			up(&osb->trans_lock);
 
-			/* Ok, either we couldn't find it in the
-			 * cache, or it became busy again while we
-			 * were dumping cache. */
-			LOG_TRACE_STR("FLAG_VOTE_UPDATE_RETRY (3)");
-			vote_response = FLAG_VOTE_UPDATE_RETRY;
-			status = 0;
-			break;
-
-give_lock:
-			if (vote_type == CHANGE_MASTER)	
-				lockres->master_node_num = node_num;
-
 			if (inode) {
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
 				sync_mapping_buffers(inode->i_mapping);
@@ -1077,8 +1140,15 @@
 #endif
 #endif
 			}
+
+			/* nobody currently owns the lock so fastpath it */
+			if (vote_type == CHANGE_MASTER)
+				lockres->master_node_num = node_num;
+
+
 			if (oin != NULL) {
-				lockres->lock_type = lockres->lock_state = OCFS_DLM_NO_LOCK;
+				printk("setting locktype to nolock\n");
+				lockres->lock_type = OCFS_DLM_NO_LOCK;
 				lockres->cache_lock_held = false;
 			}
 
@@ -1115,8 +1185,10 @@
 					brelse(fe_bh);
 					break;
 				}
-				if (vote_type == RELEASE_CACHE)
-					lockres->lock_type = lockres->lock_state = OCFS_DLM_NO_LOCK;
+				if (vote_type == RELEASE_CACHE) {
+					printk("setting locktype to nolock\n");
+					lockres->lock_type = OCFS_DLM_NO_LOCK;
+				}
 				else // CHANGE_MASTER
 					lockres->master_node_num = node_num;
 			} else {
@@ -1162,6 +1234,35 @@
 			}
 			brelse(fe_bh);
 			break;
+
+		case DROP_READONLY:
+			/* TODO: may need locking in here to lock out 
+			 * the actual IO that a readdir may have in 
+			 * progress, if it's possible to have a corrupt 
+			 * readdir.  for now, skip it.
+			 * NOTE: can't just take i_sem because lock order
+			 * needs to be i_sem->lockres... would have to 
+			 * drop lockres, take i_sem, take lockres, then 
+			 * recheck all the conditions to see if still 
+			 * appropriate, then do the work and drop both.
+			 * seems like a lot of work.  almost as many lines
+			 * of code as there are lines of comments right here.
+			 */
+
+			/* this path should always succeed on the vote *
+			 * even in the error case.  do nothing for error. */	
+			if (lockres->master_node_num != node_num ||
+			    lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK ||
+			    lockres->readonly_map != 0ULL)
+				LOG_ERROR_ARGS("(drop-ro) master=%d node_num=%d locktype=%d map=%08x.%08x ronode=%d\n",
+				       lockres->master_node_num, node_num, lockres->lock_type, 
+				       HILO(lockres->readonly_map), lockres->readonly_node);
+			else
+				lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+
+			status = 0;
+			vote_response = FLAG_VOTE_NODE;
+			break;
 			
 		case NOT_MASTER:
 			LOG_TRACE_STR("NOT_MASTER");
@@ -1371,3 +1472,89 @@
 
 	LOG_EXIT();
 }
+
+int ocfs_drop_readonly_cache_lock(ocfs_super *osb, ocfs_lock_res *lockres)
+{
+	ocfs_ro_cache_drop_ctxt *arg;
+	arg = kmalloc(sizeof(ocfs_ro_cache_drop_ctxt), GFP_KERNEL);
+	if (arg == NULL) 
+		return -ENOMEM;
+
+	arg->osb = osb;
+	arg->lockres = lockres;
+
+	kernel_thread(_ocfs_drop_readonly_cache_lock, (void *) arg,
+		      CLONE_VM | CLONE_FS | CLONE_FILES);
+	return 0;
+}
+
+static int _ocfs_drop_readonly_cache_lock(void *arg)
+{
+	ocfs_ro_cache_drop_ctxt *ctxt = (ocfs_ro_cache_drop_ctxt *)arg;
+	ocfs_super *osb = ctxt->osb;
+	ocfs_lock_res *lockres = ctxt->lockres;
+	__u64 map;
+	int status = 0;
+
+#define OCFS_DROP_RO_THREAD_NAME   "ocfs2dropro"
+
+	ocfs_daemonize (OCFS_DROP_RO_THREAD_NAME, strlen(OCFS_DROP_RO_THREAD_NAME));
+
+	/* this will wait until process_vote gets to the release */
+	ocfs_acquire_lockres(lockres);
+
+	/* check these under the lock */	
+	if (lockres->readonly_node != osb->node_num ||
+	    lockres->master_node_num != osb->node_num ||
+	    lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK) {
+		LOG_ERROR_ARGS("bad RO lockres!  this=%d, ro_node=%d, master=%d, locktype=%u\n",
+			       osb->node_num, lockres->readonly_node, 
+			       lockres->master_node_num, lockres->lock_type);
+		status = -EINVAL;
+		goto leave;
+	}
+	
+	if (lockres->readonly_dropping) {
+		status = 0;
+		goto leave;
+	}
+
+	lockres->readonly_dropping = true;
+	map = lockres->readonly_map;
+	map &= osb->publ_map;      /* remove all dead nodes */
+	
+	status = 0;
+	while (map != 0ULL && map != (__u64)(1 << osb->node_num)) {
+		// TODO: need to check all members of the map
+		// in each run thru the loop to see if they died
+		// and eliminate them from the map
+
+		/* cannot hold lockres while waiting for vote */
+		ocfs_release_lockres(lockres);
+
+		status = ocfs_send_readonly_drop_message(osb, lockres, map);
+		if (status >= 0) {
+			ocfs_acquire_lockres(lockres);
+			break;
+		} else if (status != -EAGAIN) {
+			LOG_ERROR_STATUS (status);
+			ocfs_acquire_lockres(lockres);
+			break;
+		}
+
+		/* yes, disgusting.  need a waitqueue on lockres */
+		ocfs_sleep (OCFS_NM_HEARTBEAT_TIME / 10);
+		ocfs_acquire_lockres(lockres);
+		map = lockres->readonly_map;
+		map &= osb->publ_map;      /* remove all dead nodes */
+	}
+	if (status >= 0)
+		lockres->readonly_map = 0ULL;
+	lockres->readonly_dropping = false;
+
+leave:
+	ocfs_release_lockres(lockres);
+	ocfs_put_lockres(lockres);
+	kfree(arg);
+	return status;
+}

Modified: trunk/src/oin.c
===================================================================
--- trunk/src/oin.c	2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/oin.c	2004-03-19 07:37:42 UTC (rev 791)
@@ -47,6 +47,7 @@
         struct list_head *iter;
         struct list_head *temp_iter;
         int disk_len;
+	ocfs_disk_lock dlock;   /* ???: is this too much on the stack? */
 
 	/* We are setting the oin Updated flag in the end. */
 	LOG_ENTRY ();
@@ -212,18 +213,30 @@
 
 		/* ??? we need to the lock resource before updating it */
 		if (oin->lock_res) {
-			ocfs_get_lockres(oin->lock_res);
+			/* cannot hold bhsem while taking lockres... baaad */
+			memcpy(&dlock, (ocfs_disk_lock *)fe, sizeof(ocfs_disk_lock));
+			OCFS_BH_PUT_DATA(fe_bh);
+			fe = NULL;
 
 			pLockRes = oin->lock_res;
-			pLockRes->lock_type = DISK_LOCK_FILE_LOCK (fe);
-			pLockRes->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
-			pLockRes->oin_openmap = DISK_LOCK_OIN_MAP (fe);
-			pLockRes->last_write_time = DISK_LOCK_LAST_WRITE (fe);
-			pLockRes->last_read_time = DISK_LOCK_LAST_READ (fe);
-			pLockRes->reader_node_num = DISK_LOCK_READER_NODE (fe);
-			pLockRes->writer_node_num = DISK_LOCK_WRITER_NODE (fe);
+			ocfs_acquire_lockres(pLockRes);
+			pLockRes->lock_type = DISK_LOCK_FILE_LOCK (&dlock);
+			pLockRes->master_node_num = DISK_LOCK_CURRENT_MASTER (&dlock);
+			pLockRes->oin_openmap = DISK_LOCK_OIN_MAP (&dlock);
+			pLockRes->last_write_time = DISK_LOCK_LAST_WRITE (&dlock);
+			pLockRes->last_read_time = DISK_LOCK_LAST_READ (&dlock);
+			pLockRes->reader_node_num = DISK_LOCK_READER_NODE (&dlock);
+			pLockRes->writer_node_num = DISK_LOCK_WRITER_NODE (&dlock);
 
-			ocfs_put_lockres(oin->lock_res);
+			if (pLockRes->readonly_node != OCFS_INVALID_NODE_NUM &&
+    	    		    pLockRes->readonly_node != pLockRes->master_node_num) {
+				LOG_ERROR_ARGS("no longer readonly! ronode=%d, master=%d, lockid=%u.%u\n",
+					pLockRes->readonly_node, pLockRes->master_node_num,
+					pLockRes->sector_num);
+				pLockRes->readonly_node = OCFS_INVALID_NODE_NUM;
+			}
+
+			ocfs_release_lockres(pLockRes);
 		}
 
 		status = 0;



More information about the Ocfs2-commits mailing list