[Ocfs2-commits] rev 7 - in trunk: . inc

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Thu Dec 4 21:45:28 CST 2003


Author: manish
Date: 2003-12-04 21:45:26 -0600 (Thu, 04 Dec 2003)
New Revision: 7

Modified:
   trunk/Config.make
   trunk/dlm.c
   trunk/heartbeat.c
   trunk/inc/journal.h
   trunk/inc/ocfs.h
   trunk/journal.c
   trunk/nm.c
   trunk/osb.c
Log:
sync


Modified: trunk/Config.make
===================================================================
--- trunk/Config.make	2003-12-04 23:52:47 UTC (rev 6)
+++ trunk/Config.make	2003-12-05 03:45:26 UTC (rev 7)
@@ -66,3 +66,6 @@
 #This should be defined for all kernels <= 2.4.21 except 
 #for rhel3 and latest rhas update.
 #USE_JOURNAL_CREATE_REPLACEMENT = yes
+
+#define this for RHEL3 systems and systems that have NPTL.
+#HAVE_NPTL = yes

Modified: trunk/dlm.c
===================================================================
--- trunk/dlm.c	2003-12-04 23:52:47 UTC (rev 6)
+++ trunk/dlm.c	2003-12-05 03:45:26 UTC (rev 7)
@@ -151,7 +151,7 @@
 	int i = 0; 
 
 	LOG_ENTRY_ARGS("do_other_stupid_things = %s\n", do_other_stupid_things ? "true" : "false");
-
+#if 0
 	if((osb->trans_in_progress) && (osb->needs_flush)) 
 	{ 
 		osb->trans_in_progress = false; 
@@ -184,7 +184,7 @@
 		if (osb->needs_flush)
 			LOG_ERROR_STR("CHANGE TO TRACE >>> Trans and needs flush both are set");
 	}
-
+#endif
 	LOG_EXIT();
 	return;
 }
@@ -1212,6 +1212,7 @@
 	lockres->reader_node_num = OCFS_INVALID_NODE_NUM;
 
 	lockres->lock_holders = 0;
+	LOG_TRACE_ARGS("lockres->lock_holders = %u\n", lockres->lock_holders);
 
 	LOG_EXIT ();
 	return;
@@ -1420,6 +1421,9 @@
 			ocfs_release_lockres (lockres);
 			goto bail;
 		}
+		lockres->lock_holders++;
+		LOG_TRACE_ARGS("lockres->lock_holders = %u\n", 
+			       lockres->lock_holders);
 		atomic_inc (&(lockres->lr_share_cnt));
 		ocfs_release_lockres (lockres);
 		goto bail;
@@ -1652,6 +1656,7 @@
 
 skip_lock_write:
 	lockres->lock_holders++;
+	LOG_TRACE_ARGS("lockres->lock_holders = %u\n", lockres->lock_holders);
 	ocfs_release_lockres (lockres);
 
 finally:
@@ -1911,6 +1916,7 @@
 
 finally:
 	lockres->lock_holders--;
+	LOG_TRACE_ARGS("lockres->lock_holders = %u\n", lockres->lock_holders);
 	ocfs_release_lockres (lockres);
 	LOG_EXIT_STATUS (status);
 	return (status);

Modified: trunk/heartbeat.c
===================================================================
--- trunk/heartbeat.c	2003-12-04 23:52:47 UTC (rev 6)
+++ trunk/heartbeat.c	2003-12-05 03:45:26 UTC (rev 7)
@@ -60,6 +60,9 @@
 			read_publish ? "true" : "false");
 
 	if (flag & HEARTBEAT_METHOD_DISK) {
+		if (pub_bh == NULL && !read_publish)
+			BUG();
+
                 if (read_publish) {
 			status = ocfs_read_bh(osb, node_publ_off, pub_bh, 0, NULL);
 		        if (status < 0) {

Modified: trunk/inc/journal.h
===================================================================
--- trunk/inc/journal.h	2003-12-04 23:52:47 UTC (rev 6)
+++ trunk/inc/journal.h	2003-12-05 03:45:26 UTC (rev 7)
@@ -75,11 +75,16 @@
 					       * multiple concurrent
 					       * transactions this may
 					       * become a list.*/
-	/* This is protected by the trans_lock. */
+	/* locking order: trans_lock -> commit_sem -> journal.curr.list_lock */
+	struct semaphore          commit_sem; /* protects *everything*
+					       * in the commited list
+					       * and also protects
+					       * 'curr' from
+					       * removal/creation. */
 	struct list_head          commited;   /* doubly linked list of all
 					       * commited handles awaiting
 					       * checkpointing.           */
-#define OCFS_JOURNAL_CREATE_MAX_BMAPS 1000
+#define OCFS_JOURNAL_CREATE_MAX_BMAPS 600
 	__u32                     bmaps;      /* only used during
 					       * journal_create. see
 					       * ocfs_journal_create
@@ -113,7 +118,14 @@
 	struct buffer_head  **buffs;
 
 	/* The following three fields are for ocfs_journal_add_lock */
-	int                 num_locks;  
+	spinlock_t          list_lock; /* Used to protect the 'locks'
+					* list. Only used if the
+					* handle is the same as
+					* journal->curr. otherwise, we
+					* should be in the commited
+					* list in which case we're
+					* protected by commit_sem */
+	int                 num_locks; 
 	struct list_head    locks;     /* A bunch of locks to 
 					* release on commit/abort. This 
 					* should be a list_head */

Modified: trunk/inc/ocfs.h
===================================================================
--- trunk/inc/ocfs.h	2003-12-04 23:52:47 UTC (rev 6)
+++ trunk/inc/ocfs.h	2003-12-05 03:45:26 UTC (rev 7)
@@ -1852,7 +1852,6 @@
 	__u64 log_file_size;
 	__u32 sect_size;
 	bool needs_flush;
-	bool commit_cache_exec;
 	ocfs_sem map_lock;
 	ocfs_extent_map metadata_map;
 	ocfs_extent_map trans_map;

Modified: trunk/journal.c
===================================================================
--- trunk/journal.c	2003-12-04 23:52:47 UTC (rev 6)
+++ trunk/journal.c	2003-12-05 03:45:26 UTC (rev 7)
@@ -71,8 +71,8 @@
 	}
 	memset(retval->buffs, 0, sizeof(struct buffer_head *) * max_buffs);
 
+	spin_lock_init(&(retval->list_lock));
 	INIT_LIST_HEAD(&(retval->h_list));
-
 	INIT_LIST_HEAD(&(retval->locks));
 	retval->max_buffs = max_buffs;
 	retval->num_buffs = 0;
@@ -92,7 +92,10 @@
 	retval->k_handle->h_sync = 1;
 
 	atomic_inc(&(osb->journal.num_trans));
+
+	down(&osb->journal.commit_sem);
 	osb->journal.curr = retval;
+	up(&osb->journal.commit_sem);
 
 	/* default handle flags! */
 	retval->flags = OCFS_HANDLE_CHECKPOINT;
@@ -189,7 +192,9 @@
 	return(retval);
 }
 
-
+/* This does no locking of the handle, so make sure that the handle
+ * isn't on journal->curr. If the handle is on journal->commited, then
+ * you want to be holding the commit_sem before calling this. */
 static int ocfs_journal_release_locks(ocfs_journal_handle *handle, int abort) 
 {
 	ocfs_super *osb;
@@ -202,6 +207,9 @@
 
 	osb = handle->osb;
 
+	if (osb->journal.curr == handle)
+		BUG();
+
 	LOG_TRACE_ARGS("num_locks = %d\n", handle->num_locks);
 
 	list_for_each_safe(p, n, &(handle->locks)) {
@@ -211,7 +219,8 @@
 					    lock->flags, lock->res, 
 					    (abort ? NULL : lock->bh), NULL);
 		if (tmpstat < 0) {
-			LOG_ERROR_ARGS("Could not release lock %u.%u\n", HILO(lock->id));
+			LOG_ERROR_ARGS("Could not release lock %u.%u\n", 
+				       HILO(lock->id));
 			LOG_ERROR_STATUS(tmpstat);
 			status = tmpstat;
 		}
@@ -305,10 +314,6 @@
 
 		revoked = true;
 	} else {
-		/* If we're not going to checkpoint the handle on
-		 * commit then we need to add it to our journals list
-		 * so it can be done later */
-		list_add_tail(&(handle->h_list), &(journal->commited));
 
 		/* we'll want to get rid of the buffers now as
 		 * journal_flush does the other work for us, so leave
@@ -319,11 +324,9 @@
 done:
 	if (!revoked) {
 		/* usually the journal_revoke in ocfs_revoke_handle
-		 * will brelse the buffers for us, but if we've gotten
-		 * here it's because of error and we have to do it
-		 * manually. Additionally, if we ever decide to not do
-		 * our revoke during commit, we should unconditionally
-		 * execute this block. */
+		 * will brelse the buffers for us, but if we aren't
+		 * checkpointing this handle, or we've gotten here
+		 * because of error then we have to do it manually. */
 		for(i = 0; i < handle->num_buffs; i++) {
 			bh = handle->buffs[i];
 			handle->buffs[i] = NULL;
@@ -331,18 +334,28 @@
 		}
 	}
 
+	down(&journal->commit_sem);
+	journal->curr = NULL;
+
 	if (checkpoint) {
+		up(&journal->commit_sem);
 		atomic_dec(&(osb->journal.num_trans));
 
 		/* Release locks associated with this handle. */
 		retval = ocfs_journal_release_locks(handle, 0);
 		if (retval < 0)
 			LOG_ERROR_STATUS(retval);
-	} else 
+
+
+	} else {
+		/* If we're not going to checkpoint the handle on
+		 * commit then we need to add it to our journals list
+		 * so it can be done later */
+		list_add_tail(&(handle->h_list), &(journal->commited));
 		osb->needs_flush = true;
+		up(&journal->commit_sem);
+	}
 
-	journal->curr = NULL;
-
 	/* we don't free the kernel handle because jbd has freed it. */
 	if (handle->buffs) {
 		ocfs_free(handle->buffs);
@@ -417,12 +430,15 @@
 
 	atomic_dec(&(osb->journal.num_trans));
 done:
+
+	down(&osb->journal.commit_sem);
+	osb->journal.curr = NULL;
+	up(&osb->journal.commit_sem);
+
 	retval = ocfs_journal_release_locks(handle, 1);
 	if (retval < 0)
 		LOG_ERROR_STATUS(retval);
 
-	osb->journal.curr = NULL;
-
 	/* This has to happen after we release the other locks. */
 	ocfs_release_trans_lock(osb);
 
@@ -527,6 +543,10 @@
 	return(status);
 }
 
+
+/* We are expecting to be run on the current running transaction, so
+ * we use the spin_lock here. You really shouldn't be calling this on
+ * other transactions anyway... */
 void ocfs_journal_add_lock(ocfs_journal_handle *handle, __u64 id,  __u32 type, 
 			   __u32 flags, struct _ocfs_lock_res *res, 
 			   struct buffer_head *bh) 
@@ -550,8 +570,10 @@
 	lock->res   = res;
 	lock->bh    = bh;
 
+	spin_lock(&handle->list_lock);
 	list_add_tail(&(lock->lock_list), &(handle->locks));
 	handle->num_locks++;
+	spin_unlock(&handle->list_lock);
 
 	if (bh)
 		get_bh(bh);
@@ -633,6 +655,7 @@
 	memset(&osb->journal, 0, sizeof(ocfs_journal));
 
 	INIT_LIST_HEAD(&(osb->journal.commited));
+	init_MUTEX(&(osb->journal.commit_sem));
 
 	/* get the cleanup file fe and lock */
 	cleanup_file_id = (__u32) (JOURNAL_FILE_BASE_ID + osb->node_num);
@@ -778,9 +801,10 @@
 
 	num_running_trans = atomic_read(&(osb->journal.num_trans));
 	if (num_running_trans > 0)
-		LOG_ERROR_ARGS("Shutting down journal but there are %d "      \
+		LOG_TRACE_ARGS("Shutting down journal: must wait on %d"
 			       " running transactions!\n", num_running_trans);
 
+	down(&osb->trans_lock);
 	journal_lock_updates(journal->k_journal);
 	status = journal_flush(journal->k_journal);
 	journal_unlock_updates(journal->k_journal);
@@ -817,6 +841,7 @@
 
 	journal->state = OCFS_JOURNAL_FREE;
 
+	up (&osb->trans_lock);
 done:
 	if (inode)
 		iput(inode);
@@ -1495,6 +1520,7 @@
 
 	/* now we can run an unlock against any pending handles and
 	 * release them. */
+	down(&journal->commit_sem);
 	list_for_each_safe(p, n, &journal->commited) {
 		handle = list_entry(p, ocfs_journal_handle, h_list);
 		tmpstat = ocfs_journal_release_locks(handle, 0);
@@ -1504,6 +1530,7 @@
 		ocfs_free(handle);
 		atomic_dec(&journal->num_trans);
 	}
+	up(&journal->commit_sem);
 
 flush_data:
 	/* flush data buffers if asked. */

Modified: trunk/nm.c
===================================================================
--- trunk/nm.c	2003-12-04 23:52:47 UTC (rev 6)
+++ trunk/nm.c	2003-12-05 03:45:26 UTC (rev 7)
@@ -34,7 +34,7 @@
 static inline int get_process_vote_action(ocfs_super * osb, ocfs_lock_res *lockres, __u32 node_num, 
 					  __u32 flags, int status, bool *master_alive, ocfs_inode **oin);
 static int ocfs_disk_update_resource (ocfs_super * osb, ocfs_lock_res * lock_res, struct buffer_head **bh, __u32 timeout, struct inode *inode);
-
+static int ocfs_search_commited(ocfs_super *osb, ocfs_lock_res *lockres);
 static void ocfs_inc_inode_seq(ocfs_super *osb, struct inode *inode);
 
 static const char *process_vote_strings[] = {
@@ -88,7 +88,7 @@
 		}
 	}
 
-      finally:
+finally:
 	if (OcfsIpcCtxt.send_sock) {
 		sock_release (OcfsIpcCtxt.send_sock);
 		OcfsIpcCtxt.send_sock = NULL;
@@ -108,7 +108,7 @@
 	/* signal main thread of ipcdlm's exit */
 	complete (&(OcfsIpcCtxt.complete));
 
-      bail:
+bail:
 	LOG_EXIT ();
 	return 0;
 }				/* ocfs_recv_thread */
@@ -182,7 +182,7 @@
 
 		if (!time_after (jiffies, (unsigned long) (osb->hbt)))
 			goto finally;
-	
+
 		if (osb->vol_state == VOLUME_MOUNTED) {
 			if (osb->needs_flush && down_trylock(&osb->trans_lock) == 0) {
 				if (osb->trans_in_progress == false) {
@@ -195,7 +195,9 @@
 				up(&osb->trans_lock);
 			}
 		}
-	
+
+		/* Force a flush every 300 iterations. No longer
+		 * necessary, but I suppose it doesn't hurt... */
 		if (osb->needs_flush)
 			osb->num_nm_thread_iter = 0;
 		else {
@@ -204,7 +206,7 @@
 				osb->needs_flush = true;
 			}
 		}
-	
+
 		/* lock publish to prevent overwrites from vote_req and vote_reset */
 		down (&(osb->publish_lock));
 
@@ -212,7 +214,6 @@
 		offset = osb->vol_layout.new_cfg_off;
 
 		/* Read disk for Publish Sectors of all nodes */
-//		status = ocfs_read_force_disk (osb, osb->cfg_prealloc, osb->cfg_len, offset);
 		status = ocfs_read_bhs(osb, offset, osb->cfg_len, osb->cfg_bhs, 0, NULL);
 		if (status < 0) {
 			up (&(osb->publish_lock));
@@ -262,7 +263,7 @@
 		}
 
 		LOG_TRACE_ARGS ("Publish map: 0x%08x\n", LO (osb->publ_map));
-	
+
 		/* map of local node */
 		curr_node_map = (__u64) ((__u64)1 << osb->node_num);
 	
@@ -297,7 +298,7 @@
 					       highest_vote_node);
 				continue;
 			}
-		loop:
+loop:
 			publish = NULL;
 			OCFS_BH_PUT_DATA(osb->cfg_bhs[which]);
 		}
@@ -620,6 +621,40 @@
 }
 
 
+/* Search the journals committed transactions list for a given
+ * lockres. If it's in there, return true, zero otherwise and -1 on
+ * error. Must hold the journal->commit_sem before going here! */
+static int ocfs_search_commited(ocfs_super *osb, ocfs_lock_res *lockres)
+{
+	struct list_head *handle_p;
+	struct list_head *lock_p;
+	int found = 0;
+	ocfs_journal *journal;
+	ocfs_journal_handle *handle = NULL;
+	ocfs_journal_lock *lock;
+
+	LOG_ENTRY();
+
+	journal = &osb->journal;
+
+	list_for_each(handle_p, &journal->commited) {
+		handle = list_entry(handle_p, ocfs_journal_handle, h_list);
+
+		list_for_each(lock_p, &(handle->locks)) {
+			lock= list_entry(lock_p, ocfs_journal_lock, lock_list);
+
+			if (lock->id == lockres->sector_num) {
+				found = 1;
+				break;
+			}
+		}
+	}
+
+	LOG_EXIT_STATUS(found);
+
+	return(found);
+}
+
 /*
  * ocfs_process_vote()
  *
@@ -644,10 +679,11 @@
 	struct inode *inode = NULL;
 	bool master_alive = true, is_dir = false;
 	bool is_locked, open_handle;
-	int lockflags = 0;
+	int lockflags = 0, in_cache = 0;
 	bool inc_inode_seq = false;
 	bool disk_vote = (ctxt->request_method == DISK_VOTE);
 	bool comm_vote = (ctxt->request_method == COMM_VOTE);
+	bool have_trans_lock = false;
 	ocfs_publish *publish = (disk_vote ? ctxt->u.publish : NULL);
 	ocfs_dlm_msg *dlm_msg = (comm_vote ? ctxt->u.dlm_msg : NULL);
 	__u32 node_num = ctxt->node_num;
@@ -836,36 +872,101 @@
 			else
 				LOG_TRACE_STR("CHANGE_MASTER");
 
-			if (vote_type == RELEASE_CACHE && osb->commit_cache_exec)
-				break;
 			status = -EFAIL;
-			osb->needs_flush = true;
-			for (i=0; i<10 && osb->trans_in_progress; i++)
-				ocfs_sleep (100);
-	
-			if ((vote_type == RELEASE_CACHE && osb->trans_in_progress) ||
-			    (vote_type == CHANGE_MASTER && lockres->lock_type != OCFS_DLM_NO_LOCK)) {
-				/* Ask for a retry as txn is in progress */
+
+#if 0
+			if (vote_type == CHANGE_MASTER 
+			    && lockres->lock_type != OCFS_DLM_NO_LOCK) {
+				LOG_TRACE_STR("FLAG_VOTE_UPDATE_RETRY (1)");
 				vote_response = FLAG_VOTE_UPDATE_RETRY;
 				status = 0;
 				break;
 			}
+#endif
+			/* If nobody currently owns the lock, then
+			 * fastpath it. */
+			if (lockres->lock_holders == 0)
+				goto give_lock;
 
-			if (vote_type == RELEASE_CACHE)
-				osb->commit_cache_exec = true;
+			/* Slow path. We might still be able to give
+			 * him the lock if it's part of the cache and
+			 * we can flush it... */
+
+			LOG_TRACE_ARGS("Lock id (%u.%u) has %u holders\n", 
+				       HILO(lockres->sector_num), 
+				       lockres->lock_holders);
+
+			/* Try to take the trans_lock. We try a couple
+			 * times, with some sleep just in case a
+			 * transaction is about to complete. */
+			have_trans_lock = false;
+			for(i = 0; i < 2; i++) {
+				if (down_trylock(&osb->trans_lock) == 0) {
+					have_trans_lock = true;
+					break;
+				}
+				ocfs_sleep(100);
+			}
+
+			/* We couldn't get the trans_lock. There's no
+			 * point in going any further. */
+			if (!have_trans_lock) {
+				LOG_TRACE_STR("FLAG_VOTE_UPDATE_RETRY (2)");
+				vote_response = FLAG_VOTE_UPDATE_RETRY;
+				status = 0;
+				break;
+			}
+
+			/* We have the trans_lock! If it's in the
+			 * commited list, then dump cache and give it
+			 * to the other node. Otherwise, it's
+			 * currently in use by another transaction. */
+			down(&(osb->journal.commit_sem));
+			in_cache = ocfs_search_commited(osb, lockres);
+			up(&(osb->journal.commit_sem));
+
+			if (in_cache) {
+				/* if we keep the lockres locked, then
+				 * the call to release_lock in
+				 * commit_cache will deadlock. On the
+				 * other hand, we don't want it
+				 * destroyed behind us. */
+				ocfs_get_lockres(lockres);
+				ocfs_release_lockres(lockres);
+
+				status = ocfs_commit_cache(osb, false);
+				if (status < 0) {
+					LOG_ERROR_STATUS(status);
+					ocfs_put_lockres(lockres);
+					goto leave;
+				}
+				osb->needs_flush = false;
+				up(&osb->trans_lock);
+
+				status = ocfs_acquire_lockres_ex(lockres, 
+						   (OCFS_NM_HEARTBEAT_TIME/2));
+				ocfs_put_lockres(lockres);
+				if (status < 0)
+					LOG_TRACE_STR("Timed out locking "
+						      "lockres again.");
+				else if (lockres->lock_holders == 0)
+					goto give_lock;
+			} else
+				up(&osb->trans_lock);
+
+			/* Ok, either we couldn't find it in the
+			 * cache, or it became busy again while we
+			 * were dumping cache. */
+			LOG_TRACE_STR("FLAG_VOTE_UPDATE_RETRY (3)");
+			vote_response = FLAG_VOTE_UPDATE_RETRY;
+			status = 0;
+			break;
+
+give_lock:
 			osb->num_nm_thread_iter = 0;
 			
-			down (&osb->trans_lock);
-			status = ocfs_commit_cache (osb, true);
-			if (status < 0)
-				LOG_ERROR_STATUS (status);
-			osb->needs_flush = false;
-			up (&osb->trans_lock);
-
 			if (vote_type == CHANGE_MASTER)	
 				lockres->master_node_num = node_num;
-			else
-				osb->commit_cache_exec = false;
 
 			if (inode) {
 				fsync_inode_buffers(inode);

Modified: trunk/osb.c
===================================================================
--- trunk/osb.c	2003-12-04 23:52:47 UTC (rev 6)
+++ trunk/osb.c	2003-12-05 03:45:26 UTC (rev 7)
@@ -59,7 +59,6 @@
 	osb->recovery_map = 0;
 
 	osb->needs_flush = false;
-	osb->commit_cache_exec = false;
 	osb->log_disk_off = 0;
 	osb->log_meta_disk_off = 0;
 	osb->trans_in_progress = false;




More information about the Ocfs2-commits mailing list