[Ocfs2-commits] mfasheh commits r2056 - trunk/fs/ocfs2

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Fri Mar 25 18:08:02 CST 2005


Author: mfasheh
Signed-off-by: khackel
Date: 2005-03-25 18:08:00 -0600 (Fri, 25 Mar 2005)
New Revision: 2056

Modified:
   trunk/fs/ocfs2/heartbeat.c
   trunk/fs/ocfs2/journal.c
   trunk/fs/ocfs2/ocfs_journal.h
   trunk/fs/ocfs2/super.c
Log:
* Pull the orphan dir / local alloc recovery work out of recover_node and   
  have the recovery thread queue them up in keventd.
  
* Initialize the journal structure a little earlier in journal_init. I'm
  worried that we may error there and try to access the spinlock later in
  unmount.

* Now that recovery is minimally tested, lets turn some of those printks
  back into LOG_TRACE calls.    

Signed-off-by: khackel



Modified: trunk/fs/ocfs2/heartbeat.c
===================================================================
--- trunk/fs/ocfs2/heartbeat.c	2005-03-25 21:36:27 UTC (rev 2055)
+++ trunk/fs/ocfs2/heartbeat.c	2005-03-26 00:08:00 UTC (rev 2056)
@@ -87,7 +87,7 @@
 
 	OCFS_ASSERT(osb->node_num != node_num);
 
-	printk("ocfs2: node down event for %d\n", node_num);
+	LOG_TRACE_ARGS("ocfs2: node down event for %d\n", node_num);
 
 	if (ocfs_node_map_test_bit(osb, &osb->umount_map, node_num)) {
 		/* If a node is in the umount map, then we've been
@@ -108,7 +108,7 @@
 
 	OCFS_ASSERT(osb->node_num != node_num);
 
-	printk("ocfs2: node up event for %d\n", node_num);
+	LOG_TRACE_ARGS("ocfs2: node up event for %d\n", node_num);
 	ocfs_node_map_clear_bit(osb, &osb->umount_map, node_num);
 }
 

Modified: trunk/fs/ocfs2/journal.c
===================================================================
--- trunk/fs/ocfs2/journal.c	2005-03-25 21:36:27 UTC (rev 2055)
+++ trunk/fs/ocfs2/journal.c	2005-03-26 00:08:00 UTC (rev 2056)
@@ -56,7 +56,10 @@
 spinlock_t trans_inc_lock = SPIN_LOCK_UNLOCKED;
 
 static int ocfs_force_read_journal(struct inode *inode);
-static int ocfs_recover_node(struct _ocfs_super *osb, int node_num);
+static int ocfs_recover_node(ocfs_super *osb,
+			     int node_num,
+			     ocfs2_dinode **la_copy);
+static void ocfs2_complete_recovery(void *data);
 static int __ocfs_recovery_thread(void *arg);
 static int ocfs_commit_cache (ocfs_super * osb);
 static int ocfs_wait_on_mount(ocfs_super *osb);
@@ -610,12 +613,24 @@
 	journal_t * j_journal = NULL;
 	ocfs2_dinode *fe = NULL;
 	struct buffer_head *bh = NULL;
+	ocfs_journal *journal;
 
 	LOG_ENTRY();
 
-	if (!osb)
-		BUG();
+	OCFS_ASSERT(osb);
 
+	journal = osb->journal;
+	journal->j_osb = osb;
+
+	atomic_set(&journal->j_num_trans, 0);
+	init_rwsem(&journal->j_trans_barrier);
+	init_waitqueue_head(&journal->j_checkpointed);
+	spin_lock_init(&journal->j_lock);
+	journal->j_trans_id = (unsigned long) 1;
+	journal->j_cleanup_orphans = 0;
+	INIT_LIST_HEAD(&journal->j_la_cleanups);
+	INIT_WORK(&journal->j_recovery_work, ocfs2_complete_recovery, osb);
+
 	/* already have the inode for our journal */
 	inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, 
 					   osb->slot_num);
@@ -677,20 +692,15 @@
 	LOG_TRACE_ARGS("j_journal->j_maxlen = %u\n", j_journal->j_maxlen);
 	j_journal->j_commit_interval = OCFS_DEFAULT_COMMIT_INTERVAL;
 
-	/* yay, pass the proper info back to our journal structure. */
-	osb->journal->j_osb = osb;
-	osb->journal->j_journal = j_journal;
-	osb->journal->j_inode = inode;
-	osb->journal->j_bh = bh;
-	atomic_set(&(osb->journal->j_num_trans), 0);
-	init_rwsem(&(osb->journal->j_trans_barrier));
-	init_waitqueue_head(&osb->journal->j_checkpointed);
-	spin_lock_init(&osb->journal->j_lock);
-	osb->journal->j_state = OCFS_JOURNAL_LOADED;
-	osb->journal->j_trans_id = (unsigned long) 1;
-
 	*dirty = (le32_to_cpu(fe->id1.journal1.ij_flags) &
 		  OCFS2_JOURNAL_DIRTY_FL);
+
+	journal->j_journal = j_journal;
+	journal->j_inode = inode;
+	journal->j_bh = bh;
+
+	journal->j_state = OCFS_JOURNAL_LOADED;
+
 	status = 0;
 done:
 	if (status < 0) {
@@ -895,9 +905,7 @@
  * up to date. We know things can't change on this file underneath us
  * as we have the lock by now :)
  *
- * size should be file_size, NOT alloc_size
  */
-#warning ocfs_force_read_journal() needs retesting!
 static int ocfs_force_read_journal(struct inode *inode)
 {
 	int status = 0;
@@ -940,7 +948,7 @@
 			brelse(bhs[i]);
 			bhs[i] = NULL;
 		}
-		
+
 		v_blkno += p_blocks;
 	}
 
@@ -952,11 +960,116 @@
 	return status;
 }
 
+struct ocfs2_la_recovery_item {
+	struct list_head lri_list;
+	ocfs2_dinode *lri_dinode;
+};
+
+/* Does the second half of the recovery process. By this point, the
+ * node is marked clean and can actually be considered recovered,
+ * hence it's no longer in the recovery map, but there's still some
+ * cleanup we can do which shouldn't happen within the recovery thread
+ * as locking in that context becomes very difficult if we are to take
+ * recovering nodes into account.
+ *
+ * NOTE: This function can and will sleep on recovery of other nodes
+ * during cluster locking, just like any other ocfs2 process.
+ */
+static void ocfs2_complete_recovery(void *data)
+{
+	int ret, cleanup_orphans;
+	ocfs_super *osb = data;
+	ocfs_journal *journal = osb->journal;
+	struct ocfs2_la_recovery_item *item;
+	struct list_head *p, *n;
+	LIST_HEAD(tmp_la_list);
+
+	LOG_ENTRY();
+
+	LOG_TRACE_ARGS("completing recovery from keventd\n");
+
+	spin_lock(&journal->j_lock);
+	list_splice_init(&journal->j_la_cleanups, &tmp_la_list);
+	spin_unlock(&journal->j_lock);
+
+	list_for_each_safe(p, n, &tmp_la_list) {
+		item = list_entry(p, struct ocfs2_la_recovery_item, lri_list);
+		list_del_init(&item->lri_list);
+
+		LOG_TRACE_ARGS("Clean up local alloc %llu\n",
+			       item->lri_dinode->i_blkno);
+
+		ret = ocfs_complete_local_alloc_recovery(osb,
+							 item->lri_dinode);
+
+		kfree(item->lri_dinode);
+		kfree(item);
+
+		if (ret < 0)
+			LOG_ERROR_STATUS(ret);
+	}
+
+	spin_lock(&journal->j_lock);
+	cleanup_orphans = journal->j_cleanup_orphans;
+	journal->j_cleanup_orphans = 0;
+	spin_unlock(&journal->j_lock);
+
+	if (cleanup_orphans) {
+		LOG_TRACE_STR("Cleanup the orphan dir\n");
+		ret = ocfs_recover_orphans(osb);
+		if (ret < 0)
+			LOG_ERROR_STATUS(ret);
+	}
+
+	LOG_TRACE_STR("Recovery completion\n");
+
+	LOG_EXIT();
+}
+
+/* NOTE: This function always eats the reference to la_dinode, either
+ * manually on error, or by passing it to ocfs2_complete_recovery */
+static void ocfs2_queue_local_alloc_cleanup(ocfs_journal *journal,
+					    ocfs2_dinode *la_dinode)
+{
+	struct ocfs2_la_recovery_item *item;
+
+	item = kmalloc(sizeof(struct ocfs2_la_recovery_item), GFP_KERNEL);
+	if (!item) {
+		/* Though we wish to avoid it, we are in fact safe in
+		 * skipping local alloc cleanup as fsck.ocfs2 is more
+		 * than capable of reclaiming unused space. */
+		kfree(la_dinode);
+
+		LOG_ERROR_STATUS(-ENOMEM);
+		return;
+	}
+
+	INIT_LIST_HEAD(&item->lri_list);
+	item->lri_dinode = la_dinode;
+
+	spin_lock(&journal->j_lock);
+	list_add_tail(&item->lri_list, &journal->j_la_cleanups);
+	schedule_work(&journal->j_recovery_work);
+	spin_unlock(&journal->j_lock);
+}
+
+static void ocfs2_queue_orphan_dir_cleanup(ocfs_journal *journal)
+{
+	spin_lock(&journal->j_lock);
+	if (!journal->j_cleanup_orphans) {
+		/* No need to schedule again if someone's already
+		 * doing this. */
+		journal->j_cleanup_orphans = 1;
+		schedule_work(&journal->j_recovery_work);
+	}
+	spin_unlock(&journal->j_lock);
+}
+
 static int __ocfs_recovery_thread(void *arg)
 {
+	int status, node_num, recovered;
 	ocfs_super *osb = arg;
-	int status = 0;
-	int node_num;
+	ocfs2_dinode *la_dinode_cp;
 
 	LOG_ENTRY();
 
@@ -974,6 +1087,7 @@
 		goto bail;
 	}
 
+	recovered = 0;
 	while(!ocfs_node_map_is_empty(osb, &osb->recovery_map)) {
 		node_num = ocfs_node_map_first_set_bit(osb,
 						       &osb->recovery_map);
@@ -982,11 +1096,8 @@
 			break;
 		}
 
-		ocfs_recovery_map_clear(osb, node_num);
-		/* TODO: Figure out how we're going to save all the
-		 * local alloc stuff for after recovery on all nodes
-		 * is complete? */
-		status = ocfs_recover_node(osb, node_num);
+		la_dinode_cp = NULL;
+		status = ocfs_recover_node(osb, node_num, &la_dinode_cp);
 		if (status < 0) {
 			printk("ocfs2: Error %d recovering node %d on device "
 				"(%u,%u)!\n", status, node_num,
@@ -994,9 +1105,24 @@
 			printk("ocfs2: Volume requires unmount.\n");
 			continue;
 		}
+
+		ocfs_recovery_map_clear(osb, node_num);
+
+		if (la_dinode_cp) {
+			LOG_TRACE_ARGS("queueing local alloc cleanup for "
+				       "node %d\n", node_num);
+			ocfs2_queue_local_alloc_cleanup(osb->journal,
+							la_dinode_cp);
+		}
+		recovered++;
 	}
 	ocfs2_super_unlock(osb, 1);
 
+	/* Lets not fire off orphan dir cleanup unless we actually had
+	 * to recover a node. */
+	if (recovered)
+		ocfs2_queue_orphan_dir_cleanup(osb->journal);
+
 bail:
 	down(&osb->recovery_lock);
 	if (!ocfs_node_map_is_empty(osb, &osb->recovery_map)) {
@@ -1049,59 +1175,34 @@
 	LOG_EXIT();
 }
 
-static int ocfs_recover_node(ocfs_super *osb, int node_num) 
+/* Does the actual journal replay and marks the journal inode as
+ * clean. Will only replay if the journal inode is marked dirty. */
+static int ocfs2_replay_journal(ocfs_super *osb,
+				int node_num,
+				int slot_num)
 {
-	int status = 0;
-	int slot_num;
+	int status;
+	int got_lock = 0;
 	unsigned int flags;
+	struct inode *inode = NULL;
 	ocfs2_dinode *fe;
-	ocfs2_dinode *local_alloc = NULL;
-	struct inode *inode = NULL;
-	journal_t *j_journal = NULL;
+	journal_t *journal = NULL;
 	struct buffer_head *bh = NULL;
-	ocfs_journal * journal = NULL;
-	int got_lock = 0, clean_orphans = 0;
-	ocfs2_slot_info *si = osb->slot_info;
 
-	LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n",
-		       node_num, osb->node_num);
-
-	printk("ocfs2_recover_node: checking node %d\n", node_num);
-
-	/* Should not ever be called to recover ourselves -- in that
-	 * case we should've called ocfs_journal_load instead. */
-	if (osb->node_num == node_num)
-		BUG();
-
-	ocfs2_update_slot_info(si);
-	slot_num = ocfs2_node_num_to_slot(si, node_num);
-	if (slot_num == OCFS_INVALID_NODE_NUM) {
-		printk("ocfs2_recover_node: no slot for this node, so no "
-		       "recovery required.\n");
-		goto done;
-	}
-
-	printk("ocfs2_recover_node: node %d was using slot %d\n", node_num,
-	       slot_num);
-
-	journal = osb->journal;
-
-	/* Ok, look up the inode for our journal */
 	inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
 					   slot_num);
 	if (inode == NULL) {
-		LOG_ERROR_STR("access error");
 		status = -EACCES;
+		LOG_ERROR_STATUS(status);
 		goto done;
 	}
 	if (is_bad_inode (inode)) {
-		LOG_ERROR_STR("access error (bad inode)");
+		status = -EACCES;
 		iput (inode);
 		inode = NULL;
-		status = -EACCES;
+		LOG_ERROR_STATUS(status);
 		goto done;
 	}
-
 	SET_INODE_JOURNAL(inode);
 
 	status = ocfs2_meta_lock_flags(inode, NULL, &bh, 1,
@@ -1117,17 +1218,16 @@
 
 	fe = (ocfs2_dinode *) bh->b_data;
 
-	if (!(le32_to_cpu(fe->id1.journal1.ij_flags) & OCFS2_JOURNAL_DIRTY_FL)) {
+	flags = le32_to_cpu(fe->id1.journal1.ij_flags);
+
+	if (!(flags & OCFS2_JOURNAL_DIRTY_FL)) {
 		LOG_TRACE_ARGS("No recovery required for node %d\n", node_num);
-		printk("ocfs2_recover_node: No recovery required for node "
-		       "%d\n", node_num);
-		goto clear_node;
+		goto done;
 	}
 
 	printk("ocfs2: Recovering node %d from slot %d on device (%u,%u)\n",
 	       node_num, slot_num, MAJOR(osb->sb->s_dev),
 	       MINOR(osb->sb->s_dev));
-	clean_orphans = 1;
 
 	OCFS_I(inode)->ip_clusters = fe->i_clusters;
 
@@ -1137,34 +1237,34 @@
 		goto done;
 	}
 
-	/* init the journal, load it and possibly replay it */
 	LOG_TRACE_STR("calling journal_init_inode");
-	j_journal = journal_init_inode(inode);
-	if (j_journal == NULL) {
+	journal = journal_init_inode(inode);
+	if (journal == NULL) {
 		LOG_ERROR_STR("Linux journal layer error");
 		status = -EIO;
 		goto done;
 	}
 
-	status = journal_load(j_journal);
+	status = journal_load(journal);
 	if (status < 0) {
 		LOG_ERROR_STATUS(status);
 		if (!igrab(inode))
 			BUG();
-		journal_destroy(j_journal);
+		journal_destroy(journal);
 		goto done;
 	}
 
 	/* wipe the journal */
 	LOG_TRACE_STR("flushing the journal.");
-	journal_lock_updates(j_journal);
-	status = journal_flush(j_journal);
-	journal_unlock_updates(j_journal);
+	journal_lock_updates(journal);
+	status = journal_flush(journal);
+	journal_unlock_updates(journal);
 	if (status < 0)
 		LOG_ERROR_STATUS(status);
 
-	/* mark the node clean. */
-	flags = le32_to_cpu(fe->id1.journal1.ij_flags) & ~OCFS2_JOURNAL_DIRTY_FL;
+	/* This will mark the node clean */
+	flags = le32_to_cpu(fe->id1.journal1.ij_flags);
+	flags &= ~OCFS2_JOURNAL_DIRTY_FL;
 	fe->id1.journal1.ij_flags = cpu_to_le32(flags);
 
 	status = ocfs_write_block(osb, bh, inode);
@@ -1174,23 +1274,8 @@
 	if (!igrab(inode))
 		BUG();
 
-	/* shutdown the journal */
-	journal_destroy(j_journal);
+	journal_destroy(journal);
 
-	/* recover his local alloc file, AFTER recovering his journal... */
-	status = ocfs_begin_local_alloc_recovery(osb, slot_num, &local_alloc);
-	if (status < 0) {
-		LOG_ERROR_STATUS(status);
-		goto done;
-	}
-
-	status = 0;
-
-clear_node:
-	ocfs2_clear_slot(si, slot_num);
-	status = ocfs2_update_disk_slots(osb, si);
-	if (status < 0)
-		LOG_ERROR_STATUS(status);
 done:
 	/* drop the lock on this nodes journal */
 	if (got_lock)
@@ -1201,23 +1286,82 @@
 
 	if (bh)
 		brelse(bh);
-#if 0
-	if (local_alloc && !status) {
-		tmpstat = ocfs_complete_local_alloc_recovery(osb, local_alloc);
-		if (tmpstat < 0)
-			LOG_ERROR_STATUS(tmpstat);
+
+	LOG_EXIT_STATUS(status);
+	return status;
+
+}
+
+/*
+ * Do the most important parts of node recovery:
+ *  - Replay it's journal
+ *  - Stamp a clean local allocator file
+ *  - Mark the node clean
+ *
+ * If this function completes without error, a node in OCFS2 can be
+ * said to have been safely recovered. As a result, failure during the
+ * second part of a nodes recovery process (local alloc recovery) is
+ * far less concerning.
+ * 
+ * A copy of the nodes local alloc file is passed back so unused space
+ * can be reclaimed once all nodes are recovered. This must be kfree'd
+ * by the caller.
+ */
+static int ocfs_recover_node(ocfs_super *osb,
+			     int node_num,
+			     ocfs2_dinode **la_copy)
+{
+	int status = 0;
+	int slot_num;
+	ocfs2_slot_info *si = osb->slot_info;
+
+	LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n",
+		       node_num, osb->node_num);
+
+	*la_copy = NULL;
+
+	LOG_TRACE_ARGS("ocfs2_recover_node: checking node %d\n", node_num);
+
+	/* Should not ever be called to recover ourselves -- in that
+	 * case we should've called ocfs_journal_load instead. */
+	if (osb->node_num == node_num)
+		BUG();
+
+	ocfs2_update_slot_info(si);
+	slot_num = ocfs2_node_num_to_slot(si, node_num);
+	if (slot_num == OCFS_INVALID_NODE_NUM) {
+		status = 0;
+		LOG_TRACE_STR("ocfs2_recover_node: no slot for this node, so "
+			      "no recovery required.");
+		goto done;
 	}
-#endif
-	if (local_alloc)
-		kfree(local_alloc);
-#if 0
-	if (clean_orphans && !status) {
-		tmpstat = ocfs_recover_orphans(osb);
-		if (tmpstat < 0)
-			LOG_ERROR_STATUS(tmpstat);
+
+	LOG_TRACE_ARGS("ocfs2_recover_node: node %d was using slot %d\n",
+		       node_num, slot_num);
+
+	status = ocfs2_replay_journal(osb, node_num, slot_num);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto done;
 	}
-#endif
 
+	/* Stamp a clean local alloc file AFTER recovering the journal... */
+	status = ocfs_begin_local_alloc_recovery(osb, slot_num, la_copy);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto done;
+	}
+
+	/* This would be a strange but ultimately not so harmful place
+	 * to get an error... */
+	ocfs2_clear_slot(si, slot_num);
+	status = ocfs2_update_disk_slots(osb, si);
+	if (status < 0)
+		LOG_ERROR_STATUS(status);
+
+	status = 0;
+done:
+
 	LOG_EXIT_STATUS(status);
 	return status;
 }

Modified: trunk/fs/ocfs2/ocfs_journal.h
===================================================================
--- trunk/fs/ocfs2/ocfs_journal.h	2005-03-25 21:36:27 UTC (rev 2055)
+++ trunk/fs/ocfs2/ocfs_journal.h	2005-03-26 00:08:00 UTC (rev 2056)
@@ -64,7 +64,10 @@
 	struct rw_semaphore       j_trans_barrier;
 	wait_queue_head_t         j_checkpointed;
 
-	spinlock_t                j_lock;     /* */
+	spinlock_t                j_lock;
+	unsigned int              j_cleanup_orphans;
+	struct list_head          j_la_cleanups;
+	struct work_struct        j_recovery_work;
 };
 
 extern spinlock_t trans_inc_lock;

Modified: trunk/fs/ocfs2/super.c
===================================================================
--- trunk/fs/ocfs2/super.c	2005-03-25 21:36:27 UTC (rev 2055)
+++ trunk/fs/ocfs2/super.c	2005-03-26 00:08:00 UTC (rev 2056)
@@ -968,6 +968,11 @@
 	up(&osb->recovery_lock);
 	wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
 
+	/* At this point, we know that no more recovery threads can be
+	 * launched, so wait for any recovery completion work to
+	 * complete. */
+	flush_scheduled_work();
+
 	ocfs_journal_shutdown(osb);
 
 	ocfs_sync_blockdev(sb);



More information about the Ocfs2-commits mailing list