[Ocfs2-commits] mfasheh commits r2056 - trunk/fs/ocfs2
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Fri Mar 25 18:08:02 CST 2005
Author: mfasheh
Signed-off-by: khackel
Date: 2005-03-25 18:08:00 -0600 (Fri, 25 Mar 2005)
New Revision: 2056
Modified:
trunk/fs/ocfs2/heartbeat.c
trunk/fs/ocfs2/journal.c
trunk/fs/ocfs2/ocfs_journal.h
trunk/fs/ocfs2/super.c
Log:
* Pull the orphan dir / local alloc recovery work out of recover_node and
have the recovery thread queue them up in keventd.
* Initialize the journal structure a little earlier in journal_init. I'm
worried that we may error there and try to access the spinlock later in
unmount.
* Now that recovery is minimally tested, lets turn some of those printks
back into LOG_TRACE calls.
Signed-off-by: khackel
Modified: trunk/fs/ocfs2/heartbeat.c
===================================================================
--- trunk/fs/ocfs2/heartbeat.c 2005-03-25 21:36:27 UTC (rev 2055)
+++ trunk/fs/ocfs2/heartbeat.c 2005-03-26 00:08:00 UTC (rev 2056)
@@ -87,7 +87,7 @@
OCFS_ASSERT(osb->node_num != node_num);
- printk("ocfs2: node down event for %d\n", node_num);
+ LOG_TRACE_ARGS("ocfs2: node down event for %d\n", node_num);
if (ocfs_node_map_test_bit(osb, &osb->umount_map, node_num)) {
/* If a node is in the umount map, then we've been
@@ -108,7 +108,7 @@
OCFS_ASSERT(osb->node_num != node_num);
- printk("ocfs2: node up event for %d\n", node_num);
+ LOG_TRACE_ARGS("ocfs2: node up event for %d\n", node_num);
ocfs_node_map_clear_bit(osb, &osb->umount_map, node_num);
}
Modified: trunk/fs/ocfs2/journal.c
===================================================================
--- trunk/fs/ocfs2/journal.c 2005-03-25 21:36:27 UTC (rev 2055)
+++ trunk/fs/ocfs2/journal.c 2005-03-26 00:08:00 UTC (rev 2056)
@@ -56,7 +56,10 @@
spinlock_t trans_inc_lock = SPIN_LOCK_UNLOCKED;
static int ocfs_force_read_journal(struct inode *inode);
-static int ocfs_recover_node(struct _ocfs_super *osb, int node_num);
+static int ocfs_recover_node(ocfs_super *osb,
+ int node_num,
+ ocfs2_dinode **la_copy);
+static void ocfs2_complete_recovery(void *data);
static int __ocfs_recovery_thread(void *arg);
static int ocfs_commit_cache (ocfs_super * osb);
static int ocfs_wait_on_mount(ocfs_super *osb);
@@ -610,12 +613,24 @@
journal_t * j_journal = NULL;
ocfs2_dinode *fe = NULL;
struct buffer_head *bh = NULL;
+ ocfs_journal *journal;
LOG_ENTRY();
- if (!osb)
- BUG();
+ OCFS_ASSERT(osb);
+ journal = osb->journal;
+ journal->j_osb = osb;
+
+ atomic_set(&journal->j_num_trans, 0);
+ init_rwsem(&journal->j_trans_barrier);
+ init_waitqueue_head(&journal->j_checkpointed);
+ spin_lock_init(&journal->j_lock);
+ journal->j_trans_id = (unsigned long) 1;
+ journal->j_cleanup_orphans = 0;
+ INIT_LIST_HEAD(&journal->j_la_cleanups);
+ INIT_WORK(&journal->j_recovery_work, ocfs2_complete_recovery, osb);
+
/* already have the inode for our journal */
inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
osb->slot_num);
@@ -677,20 +692,15 @@
LOG_TRACE_ARGS("j_journal->j_maxlen = %u\n", j_journal->j_maxlen);
j_journal->j_commit_interval = OCFS_DEFAULT_COMMIT_INTERVAL;
- /* yay, pass the proper info back to our journal structure. */
- osb->journal->j_osb = osb;
- osb->journal->j_journal = j_journal;
- osb->journal->j_inode = inode;
- osb->journal->j_bh = bh;
- atomic_set(&(osb->journal->j_num_trans), 0);
- init_rwsem(&(osb->journal->j_trans_barrier));
- init_waitqueue_head(&osb->journal->j_checkpointed);
- spin_lock_init(&osb->journal->j_lock);
- osb->journal->j_state = OCFS_JOURNAL_LOADED;
- osb->journal->j_trans_id = (unsigned long) 1;
-
*dirty = (le32_to_cpu(fe->id1.journal1.ij_flags) &
OCFS2_JOURNAL_DIRTY_FL);
+
+ journal->j_journal = j_journal;
+ journal->j_inode = inode;
+ journal->j_bh = bh;
+
+ journal->j_state = OCFS_JOURNAL_LOADED;
+
status = 0;
done:
if (status < 0) {
@@ -895,9 +905,7 @@
* up to date. We know things can't change on this file underneath us
* as we have the lock by now :)
*
- * size should be file_size, NOT alloc_size
*/
-#warning ocfs_force_read_journal() needs retesting!
static int ocfs_force_read_journal(struct inode *inode)
{
int status = 0;
@@ -940,7 +948,7 @@
brelse(bhs[i]);
bhs[i] = NULL;
}
-
+
v_blkno += p_blocks;
}
@@ -952,11 +960,116 @@
return status;
}
+struct ocfs2_la_recovery_item {
+ struct list_head lri_list;
+ ocfs2_dinode *lri_dinode;
+};
+
+/* Does the second half of the recovery process. By this point, the
+ * node is marked clean and can actually be considered recovered,
+ * hence it's no longer in the recovery map, but there's still some
+ * cleanup we can do which shouldn't happen within the recovery thread
+ * as locking in that context becomes very difficult if we are to take
+ * recovering nodes into account.
+ *
+ * NOTE: This function can and will sleep on recovery of other nodes
+ * during cluster locking, just like any other ocfs2 process.
+ */
+static void ocfs2_complete_recovery(void *data)
+{
+ int ret, cleanup_orphans;
+ ocfs_super *osb = data;
+ ocfs_journal *journal = osb->journal;
+ struct ocfs2_la_recovery_item *item;
+ struct list_head *p, *n;
+ LIST_HEAD(tmp_la_list);
+
+ LOG_ENTRY();
+
+ LOG_TRACE_ARGS("completing recovery from keventd\n");
+
+ spin_lock(&journal->j_lock);
+ list_splice_init(&journal->j_la_cleanups, &tmp_la_list);
+ spin_unlock(&journal->j_lock);
+
+ list_for_each_safe(p, n, &tmp_la_list) {
+ item = list_entry(p, struct ocfs2_la_recovery_item, lri_list);
+ list_del_init(&item->lri_list);
+
+ LOG_TRACE_ARGS("Clean up local alloc %llu\n",
+ item->lri_dinode->i_blkno);
+
+ ret = ocfs_complete_local_alloc_recovery(osb,
+ item->lri_dinode);
+
+ kfree(item->lri_dinode);
+ kfree(item);
+
+ if (ret < 0)
+ LOG_ERROR_STATUS(ret);
+ }
+
+ spin_lock(&journal->j_lock);
+ cleanup_orphans = journal->j_cleanup_orphans;
+ journal->j_cleanup_orphans = 0;
+ spin_unlock(&journal->j_lock);
+
+ if (cleanup_orphans) {
+ LOG_TRACE_STR("Cleanup the orphan dir\n");
+ ret = ocfs_recover_orphans(osb);
+ if (ret < 0)
+ LOG_ERROR_STATUS(ret);
+ }
+
+ LOG_TRACE_STR("Recovery completion\n");
+
+ LOG_EXIT();
+}
+
+/* NOTE: This function always eats the reference to la_dinode, either
+ * manually on error, or by passing it to ocfs2_complete_recovery */
+static void ocfs2_queue_local_alloc_cleanup(ocfs_journal *journal,
+ ocfs2_dinode *la_dinode)
+{
+ struct ocfs2_la_recovery_item *item;
+
+ item = kmalloc(sizeof(struct ocfs2_la_recovery_item), GFP_KERNEL);
+ if (!item) {
+ /* Though we wish to avoid it, we are in fact safe in
+ * skipping local alloc cleanup as fsck.ocfs2 is more
+ * than capable of reclaiming unused space. */
+ kfree(la_dinode);
+
+ LOG_ERROR_STATUS(-ENOMEM);
+ return;
+ }
+
+ INIT_LIST_HEAD(&item->lri_list);
+ item->lri_dinode = la_dinode;
+
+ spin_lock(&journal->j_lock);
+ list_add_tail(&item->lri_list, &journal->j_la_cleanups);
+ schedule_work(&journal->j_recovery_work);
+ spin_unlock(&journal->j_lock);
+}
+
+static void ocfs2_queue_orphan_dir_cleanup(ocfs_journal *journal)
+{
+ spin_lock(&journal->j_lock);
+ if (!journal->j_cleanup_orphans) {
+ /* No need to schedule again if someone's already
+ * doing this. */
+ journal->j_cleanup_orphans = 1;
+ schedule_work(&journal->j_recovery_work);
+ }
+ spin_unlock(&journal->j_lock);
+}
+
static int __ocfs_recovery_thread(void *arg)
{
+ int status, node_num, recovered;
ocfs_super *osb = arg;
- int status = 0;
- int node_num;
+ ocfs2_dinode *la_dinode_cp;
LOG_ENTRY();
@@ -974,6 +1087,7 @@
goto bail;
}
+ recovered = 0;
while(!ocfs_node_map_is_empty(osb, &osb->recovery_map)) {
node_num = ocfs_node_map_first_set_bit(osb,
&osb->recovery_map);
@@ -982,11 +1096,8 @@
break;
}
- ocfs_recovery_map_clear(osb, node_num);
- /* TODO: Figure out how we're going to save all the
- * local alloc stuff for after recovery on all nodes
- * is complete? */
- status = ocfs_recover_node(osb, node_num);
+ la_dinode_cp = NULL;
+ status = ocfs_recover_node(osb, node_num, &la_dinode_cp);
if (status < 0) {
printk("ocfs2: Error %d recovering node %d on device "
"(%u,%u)!\n", status, node_num,
@@ -994,9 +1105,24 @@
printk("ocfs2: Volume requires unmount.\n");
continue;
}
+
+ ocfs_recovery_map_clear(osb, node_num);
+
+ if (la_dinode_cp) {
+ LOG_TRACE_ARGS("queueing local alloc cleanup for "
+ "node %d\n", node_num);
+ ocfs2_queue_local_alloc_cleanup(osb->journal,
+ la_dinode_cp);
+ }
+ recovered++;
}
ocfs2_super_unlock(osb, 1);
+ /* Lets not fire off orphan dir cleanup unless we actually had
+ * to recover a node. */
+ if (recovered)
+ ocfs2_queue_orphan_dir_cleanup(osb->journal);
+
bail:
down(&osb->recovery_lock);
if (!ocfs_node_map_is_empty(osb, &osb->recovery_map)) {
@@ -1049,59 +1175,34 @@
LOG_EXIT();
}
-static int ocfs_recover_node(ocfs_super *osb, int node_num)
+/* Does the actual journal replay and marks the journal inode as
+ * clean. Will only replay if the journal inode is marked dirty. */
+static int ocfs2_replay_journal(ocfs_super *osb,
+ int node_num,
+ int slot_num)
{
- int status = 0;
- int slot_num;
+ int status;
+ int got_lock = 0;
unsigned int flags;
+ struct inode *inode = NULL;
ocfs2_dinode *fe;
- ocfs2_dinode *local_alloc = NULL;
- struct inode *inode = NULL;
- journal_t *j_journal = NULL;
+ journal_t *journal = NULL;
struct buffer_head *bh = NULL;
- ocfs_journal * journal = NULL;
- int got_lock = 0, clean_orphans = 0;
- ocfs2_slot_info *si = osb->slot_info;
- LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n",
- node_num, osb->node_num);
-
- printk("ocfs2_recover_node: checking node %d\n", node_num);
-
- /* Should not ever be called to recover ourselves -- in that
- * case we should've called ocfs_journal_load instead. */
- if (osb->node_num == node_num)
- BUG();
-
- ocfs2_update_slot_info(si);
- slot_num = ocfs2_node_num_to_slot(si, node_num);
- if (slot_num == OCFS_INVALID_NODE_NUM) {
- printk("ocfs2_recover_node: no slot for this node, so no "
- "recovery required.\n");
- goto done;
- }
-
- printk("ocfs2_recover_node: node %d was using slot %d\n", node_num,
- slot_num);
-
- journal = osb->journal;
-
- /* Ok, look up the inode for our journal */
inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
slot_num);
if (inode == NULL) {
- LOG_ERROR_STR("access error");
status = -EACCES;
+ LOG_ERROR_STATUS(status);
goto done;
}
if (is_bad_inode (inode)) {
- LOG_ERROR_STR("access error (bad inode)");
+ status = -EACCES;
iput (inode);
inode = NULL;
- status = -EACCES;
+ LOG_ERROR_STATUS(status);
goto done;
}
-
SET_INODE_JOURNAL(inode);
status = ocfs2_meta_lock_flags(inode, NULL, &bh, 1,
@@ -1117,17 +1218,16 @@
fe = (ocfs2_dinode *) bh->b_data;
- if (!(le32_to_cpu(fe->id1.journal1.ij_flags) & OCFS2_JOURNAL_DIRTY_FL)) {
+ flags = le32_to_cpu(fe->id1.journal1.ij_flags);
+
+ if (!(flags & OCFS2_JOURNAL_DIRTY_FL)) {
LOG_TRACE_ARGS("No recovery required for node %d\n", node_num);
- printk("ocfs2_recover_node: No recovery required for node "
- "%d\n", node_num);
- goto clear_node;
+ goto done;
}
printk("ocfs2: Recovering node %d from slot %d on device (%u,%u)\n",
node_num, slot_num, MAJOR(osb->sb->s_dev),
MINOR(osb->sb->s_dev));
- clean_orphans = 1;
OCFS_I(inode)->ip_clusters = fe->i_clusters;
@@ -1137,34 +1237,34 @@
goto done;
}
- /* init the journal, load it and possibly replay it */
LOG_TRACE_STR("calling journal_init_inode");
- j_journal = journal_init_inode(inode);
- if (j_journal == NULL) {
+ journal = journal_init_inode(inode);
+ if (journal == NULL) {
LOG_ERROR_STR("Linux journal layer error");
status = -EIO;
goto done;
}
- status = journal_load(j_journal);
+ status = journal_load(journal);
if (status < 0) {
LOG_ERROR_STATUS(status);
if (!igrab(inode))
BUG();
- journal_destroy(j_journal);
+ journal_destroy(journal);
goto done;
}
/* wipe the journal */
LOG_TRACE_STR("flushing the journal.");
- journal_lock_updates(j_journal);
- status = journal_flush(j_journal);
- journal_unlock_updates(j_journal);
+ journal_lock_updates(journal);
+ status = journal_flush(journal);
+ journal_unlock_updates(journal);
if (status < 0)
LOG_ERROR_STATUS(status);
- /* mark the node clean. */
- flags = le32_to_cpu(fe->id1.journal1.ij_flags) & ~OCFS2_JOURNAL_DIRTY_FL;
+ /* This will mark the node clean */
+ flags = le32_to_cpu(fe->id1.journal1.ij_flags);
+ flags &= ~OCFS2_JOURNAL_DIRTY_FL;
fe->id1.journal1.ij_flags = cpu_to_le32(flags);
status = ocfs_write_block(osb, bh, inode);
@@ -1174,23 +1274,8 @@
if (!igrab(inode))
BUG();
- /* shutdown the journal */
- journal_destroy(j_journal);
+ journal_destroy(journal);
- /* recover his local alloc file, AFTER recovering his journal... */
- status = ocfs_begin_local_alloc_recovery(osb, slot_num, &local_alloc);
- if (status < 0) {
- LOG_ERROR_STATUS(status);
- goto done;
- }
-
- status = 0;
-
-clear_node:
- ocfs2_clear_slot(si, slot_num);
- status = ocfs2_update_disk_slots(osb, si);
- if (status < 0)
- LOG_ERROR_STATUS(status);
done:
/* drop the lock on this nodes journal */
if (got_lock)
@@ -1201,23 +1286,82 @@
if (bh)
brelse(bh);
-#if 0
- if (local_alloc && !status) {
- tmpstat = ocfs_complete_local_alloc_recovery(osb, local_alloc);
- if (tmpstat < 0)
- LOG_ERROR_STATUS(tmpstat);
+
+ LOG_EXIT_STATUS(status);
+ return status;
+
+}
+
+/*
+ * Do the most important parts of node recovery:
+ * - Replay it's journal
+ * - Stamp a clean local allocator file
+ * - Mark the node clean
+ *
+ * If this function completes without error, a node in OCFS2 can be
+ * said to have been safely recovered. As a result, failure during the
+ * second part of a nodes recovery process (local alloc recovery) is
+ * far less concerning.
+ *
+ * A copy of the nodes local alloc file is passed back so unused space
+ * can be reclaimed once all nodes are recovered. This must be kfree'd
+ * by the caller.
+ */
+static int ocfs_recover_node(ocfs_super *osb,
+ int node_num,
+ ocfs2_dinode **la_copy)
+{
+ int status = 0;
+ int slot_num;
+ ocfs2_slot_info *si = osb->slot_info;
+
+ LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n",
+ node_num, osb->node_num);
+
+ *la_copy = NULL;
+
+ LOG_TRACE_ARGS("ocfs2_recover_node: checking node %d\n", node_num);
+
+ /* Should not ever be called to recover ourselves -- in that
+ * case we should've called ocfs_journal_load instead. */
+ if (osb->node_num == node_num)
+ BUG();
+
+ ocfs2_update_slot_info(si);
+ slot_num = ocfs2_node_num_to_slot(si, node_num);
+ if (slot_num == OCFS_INVALID_NODE_NUM) {
+ status = 0;
+ LOG_TRACE_STR("ocfs2_recover_node: no slot for this node, so "
+ "no recovery required.");
+ goto done;
}
-#endif
- if (local_alloc)
- kfree(local_alloc);
-#if 0
- if (clean_orphans && !status) {
- tmpstat = ocfs_recover_orphans(osb);
- if (tmpstat < 0)
- LOG_ERROR_STATUS(tmpstat);
+
+ LOG_TRACE_ARGS("ocfs2_recover_node: node %d was using slot %d\n",
+ node_num, slot_num);
+
+ status = ocfs2_replay_journal(osb, node_num, slot_num);
+ if (status < 0) {
+ LOG_ERROR_STATUS(status);
+ goto done;
}
-#endif
+ /* Stamp a clean local alloc file AFTER recovering the journal... */
+ status = ocfs_begin_local_alloc_recovery(osb, slot_num, la_copy);
+ if (status < 0) {
+ LOG_ERROR_STATUS(status);
+ goto done;
+ }
+
+ /* This would be a strange but ultimately not so harmful place
+ * to get an error... */
+ ocfs2_clear_slot(si, slot_num);
+ status = ocfs2_update_disk_slots(osb, si);
+ if (status < 0)
+ LOG_ERROR_STATUS(status);
+
+ status = 0;
+done:
+
LOG_EXIT_STATUS(status);
return status;
}
Modified: trunk/fs/ocfs2/ocfs_journal.h
===================================================================
--- trunk/fs/ocfs2/ocfs_journal.h 2005-03-25 21:36:27 UTC (rev 2055)
+++ trunk/fs/ocfs2/ocfs_journal.h 2005-03-26 00:08:00 UTC (rev 2056)
@@ -64,7 +64,10 @@
struct rw_semaphore j_trans_barrier;
wait_queue_head_t j_checkpointed;
- spinlock_t j_lock; /* */
+ spinlock_t j_lock;
+ unsigned int j_cleanup_orphans;
+ struct list_head j_la_cleanups;
+ struct work_struct j_recovery_work;
};
extern spinlock_t trans_inc_lock;
Modified: trunk/fs/ocfs2/super.c
===================================================================
--- trunk/fs/ocfs2/super.c 2005-03-25 21:36:27 UTC (rev 2055)
+++ trunk/fs/ocfs2/super.c 2005-03-26 00:08:00 UTC (rev 2056)
@@ -968,6 +968,11 @@
up(&osb->recovery_lock);
wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
+ /* At this point, we know that no more recovery threads can be
+ * launched, so wait for any recovery completion work to
+ * complete. */
+ flush_scheduled_work();
+
ocfs_journal_shutdown(osb);
ocfs_sync_blockdev(sb);
More information about the Ocfs2-commits
mailing list