[Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node

Goldwyn Rodrigues rgoldwyn at gmail.com
Wed Nov 17 07:50:04 PST 2010


Review comments by Wengang incorporated.

ocfs2_recover_node is a new data structure to include all information required
to recover one node. The structure is maintainer as a list in the super block.

recovery_map is no longer required with ocfs2_recover_node.

Signed-off-by: Goldwyn Rodrigues <rgoldwyn at suse.de>
---
 fs/ocfs2/journal.c |  171 ++++++++++++++++++++++++----------------------------
 fs/ocfs2/journal.h |    9 ++-
 fs/ocfs2/ocfs2.h   |    3 +-
 fs/ocfs2/super.c   |   16 -----
 4 files changed, 88 insertions(+), 111 deletions(-)

diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index faa2303..277b810 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
 #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000

 static int ocfs2_force_read_journal(struct inode *inode);
-static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num, int slot_num);
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
 static int __ocfs2_recovery_thread(void *arg);
 static int ocfs2_commit_cache(struct ocfs2_super *osb);
 static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
@@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)

 int ocfs2_recovery_init(struct ocfs2_super *osb)
 {
-	struct ocfs2_recovery_map *rm;
-
 	mutex_init(&osb->recovery_lock);
 	osb->disable_recovery = 0;
 	osb->recovery_thread_task = NULL;
 	init_waitqueue_head(&osb->recovery_event);
-
-	rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
-		     osb->max_slots * sizeof(unsigned int),
-		     GFP_KERNEL);
-	if (!rm) {
-		mlog_errno(-ENOMEM);
-		return -ENOMEM;
-	}
-
-	rm->rm_entries = (unsigned int *)((char *)rm +
-					  sizeof(struct ocfs2_recovery_map));
-	osb->recovery_map = rm;
+	INIT_LIST_HEAD(&osb->s_recovery_nodes);

 	return 0;
 }
@@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
ocfs2_super *osb)

 void ocfs2_recovery_exit(struct ocfs2_super *osb)
 {
-	struct ocfs2_recovery_map *rm;
-
+	struct list_head *iter, *next;
+	struct ocfs2_recover_node *rn;
 	/* disable any new recovery threads and wait for any currently
 	 * running ones to exit. Do this before setting the vol_state. */
 	mutex_lock(&osb->recovery_lock);
@@ -226,75 +212,67 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
 	 * complete. */
 	flush_workqueue(ocfs2_wq);

-	/*
-	 * Now that recovery is shut down, and the osb is about to be
-	 * freed,  the osb_lock is not taken here.
-	 */
-	rm = osb->recovery_map;
-	/* XXX: Should we bug if there are dirty entries? */
-
-	kfree(rm);
+	spin_lock(&osb->osb_lock);
+	list_for_each_safe(iter, next, &osb->s_recovery_nodes) {
+			rn = list_entry(iter, struct ocfs2_recover_node,
+					rn_list);
+			list_del(&rn->rn_list);
+			kfree(rn);
+	}
+	spin_unlock(&osb->osb_lock);
 }

-static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
+static int __ocfs2_recovery_node_test(struct ocfs2_super *osb,
 				     unsigned int node_num)
 {
-	int i;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
+	struct ocfs2_recover_node *rn;
+	struct list_head *iter;

 	assert_spin_locked(&osb->osb_lock);
-
-	for (i = 0; i < rm->rm_used; i++) {
-		if (rm->rm_entries[i] == node_num)
+	list_for_each(iter, &osb->s_recovery_nodes) {
+		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
+		if (rn->rn_node_num == node_num)
 			return 1;
 	}
-
 	return 0;
 }

 /* Behaves like test-and-set.  Returns the previous value */
-static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
+static int ocfs2_recovery_node_set(struct ocfs2_super *osb,
 				  unsigned int node_num)
 {
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
-
-	spin_lock(&osb->osb_lock);
-	if (__ocfs2_recovery_map_test(osb, node_num)) {
-		spin_unlock(&osb->osb_lock);
-		return 1;
-	}
+	struct list_head *iter;
+	struct ocfs2_recover_node *rn = NULL, *trn;

-	/* XXX: Can this be exploited? Not from o2dlm... */
-	BUG_ON(rm->rm_used >= osb->max_slots);
+	trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_NOFS);
+	if (!trn)
+		return -ENOMEM;

-	rm->rm_entries[rm->rm_used] = node_num;
-	rm->rm_used++;
+	spin_lock(&osb->osb_lock);
+	list_for_each(iter, &osb->s_recovery_nodes) {
+			rn = list_entry(iter, struct ocfs2_recover_node,
+					rn_list);
+			if (rn->rn_node_num == node_num) {
+				spin_unlock(&osb->osb_lock);
+				kfree(trn);
+				return 1;
+			}
+	}
+	trn->rn_node_num = node_num;
+	trn->rn_osb = osb;
+	list_add_tail(&trn->rn_list, &osb->s_recovery_nodes);
 	spin_unlock(&osb->osb_lock);

 	return 0;
 }

-static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
-				     unsigned int node_num)
+static void ocfs2_recovery_node_clear(struct ocfs2_super *osb,
+			struct ocfs2_recover_node *rn)
 {
-	int i;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
-
 	spin_lock(&osb->osb_lock);
-
-	for (i = 0; i < rm->rm_used; i++) {
-		if (rm->rm_entries[i] == node_num)
-			break;
-	}
-
-	if (i < rm->rm_used) {
-		/* XXX: be careful with the pointer math */
-		memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
-			(rm->rm_used - i - 1) * sizeof(unsigned int));
-		rm->rm_used--;
-	}
-
+	list_del(&rn->rn_list);
 	spin_unlock(&osb->osb_lock);
+	kfree(rn);
 }

 static int ocfs2_commit_cache(struct ocfs2_super *osb)
@@ -1092,10 +1070,9 @@ bail:
 static int ocfs2_recovery_completed(struct ocfs2_super *osb)
 {
 	int empty;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;

 	spin_lock(&osb->osb_lock);
-	empty = (rm->rm_used == 0);
+	empty = list_empty(&osb->s_recovery_nodes);
 	spin_unlock(&osb->osb_lock);

 	return empty;
@@ -1332,12 +1309,13 @@ void ocfs2_complete_quota_recovery(struct
ocfs2_super *osb)

 static int __ocfs2_recovery_thread(void *arg)
 {
-	int status, node_num, slot_num;
+	int status;
 	struct ocfs2_super *osb = arg;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
 	int *rm_quota = NULL;
 	int rm_quota_used = 0, i;
 	struct ocfs2_quota_recovery *qrec;
+	struct list_head *iter;
+	struct ocfs2_recover_node *rn = NULL;

 	mlog_entry_void();

@@ -1367,20 +1345,21 @@ restart:
 					NULL, NULL);

 	spin_lock(&osb->osb_lock);
-	while (rm->rm_used) {
+	list_for_each(iter, &osb->s_recovery_nodes) {
+		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
 		/* It's always safe to remove entry zero, as we won't
 		 * clear it until ocfs2_recover_node() has succeeded. */
-		node_num = rm->rm_entries[0];
 		spin_unlock(&osb->osb_lock);
-		mlog(0, "checking node %d\n", node_num);
-		slot_num = ocfs2_node_num_to_slot(osb, node_num);
-		if (slot_num == -ENOENT) {
+		mlog(0, "checking node %d\n", rn->rn_node_num);
+		rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
+		if (rn->rn_slot_num == -ENOENT) {
 			status = 0;
 			mlog(0, "no slot for this node, so no recovery"
 			     "required.\n");
 			goto skip_recovery;
 		}
-		mlog(0, "node %d was using slot %d\n", node_num, slot_num);
+		mlog(0, "node %d was using slot %d\n",
+				rn->rn_node_num, rn->rn_slot_num);

 		/* It is a bit subtle with quota recovery. We cannot do it
 		 * immediately because we have to obtain cluster locks from
@@ -1388,18 +1367,19 @@ restart:
 		 * then quota usage would be out of sync until some node takes
 		 * the slot. So we remember which nodes need quota recovery
 		 * and when everything else is done, we recover quotas. */
-		for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+		for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
+					i++);
 		if (i == rm_quota_used)
-			rm_quota[rm_quota_used++] = slot_num;
+			rm_quota[rm_quota_used++] = rn->rn_slot_num;

-		status = ocfs2_recover_node(osb, node_num, slot_num);
+		status = ocfs2_recover_one_node(rn);
 skip_recovery:
 		if (!status) {
-			ocfs2_recovery_map_clear(osb, node_num);
+			ocfs2_recovery_node_clear(osb, rn);
 		} else {
 			mlog(ML_ERROR,
 			     "Error %d recovering node %d on device (%u,%u)!\n",
-			     status, node_num,
+			     status, rn->rn_node_num,
 			     MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
 			mlog(ML_ERROR, "Volume requires unmount.\n");
 		}
@@ -1461,6 +1441,7 @@ bail:

 void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
 {
+	int ret = 0;
 	mlog_entry("(node_num=%d, osb->node_num = %d)\n",
 		   node_num, osb->node_num);

@@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
*osb, int node_num)

 	/* People waiting on recovery will wait on
 	 * the recovery map to empty. */
-	if (ocfs2_recovery_map_set(osb, node_num))
+	ret = ocfs2_recovery_node_set(osb, node_num);
+	if (ret == -ENOMEM) {
+		mlog_errno(ret);
+		goto out;
+	} else if (ret)
 		mlog(0, "node %d already in recovery map.\n", node_num);

 	mlog(0, "starting recovery thread...\n");
@@ -1681,26 +1666,27 @@ done:
  * second part of a nodes recovery process (local alloc recovery) is
  * far less concerning.
  */
-static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num, int slot_num)
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
 {
 	int status = 0;
 	struct ocfs2_dinode *la_copy = NULL;
 	struct ocfs2_dinode *tl_copy = NULL;
+	struct ocfs2_super *osb = rn->rn_osb;

 	mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
-		   node_num, slot_num, osb->node_num);
+		   rn->rn_node_num, rn->rn_slot_num, osb->node_num);

 	/* Should not ever be called to recover ourselves -- in that
 	 * case we should've called ocfs2_journal_load instead. */
-	BUG_ON(osb->node_num == node_num);
+	BUG_ON(osb->node_num == rn->rn_node_num);

-	status = ocfs2_replay_journal(osb, node_num, slot_num);
+	status = ocfs2_replay_journal(osb, rn->rn_node_num,
+			rn->rn_slot_num);
 	if (status < 0) {
 		if (status == -EBUSY) {
 			mlog(0, "Skipping recovery for slot %u (node %u) "
-			     "as another node has recovered it\n", slot_num,
-			     node_num);
+			     "as another node has recovered it\n",
+			     rn->rn_slot_num, rn->rn_node_num);
 			status = 0;
 			goto done;
 		}
@@ -1709,7 +1695,8 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 	}

 	/* Stamp a clean local alloc file AFTER recovering the journal... */
-	status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
+	status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
+			&la_copy);
 	if (status < 0) {
 		mlog_errno(status);
 		goto done;
@@ -1718,22 +1705,24 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 	/* An error from begin_truncate_log_recovery is not
 	 * serious enough to warrant halting the rest of
 	 * recovery. */
-	status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
+	status = ocfs2_begin_truncate_log_recovery(osb,
+			rn->rn_slot_num, &tl_copy);
 	if (status < 0)
 		mlog_errno(status);

 	/* Likewise, this would be a strange but ultimately not so
 	 * harmful place to get an error... */
-	status = ocfs2_clear_slot(osb, slot_num);
+	status = ocfs2_clear_slot(osb, rn->rn_slot_num);
 	if (status < 0)
 		mlog_errno(status);

 	/* This will kfree the memory pointed to by la_copy and tl_copy */
-	ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
-					tl_copy, NULL);
+	ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
+				la_copy, tl_copy, NULL);

 	status = 0;
 done:
+	rn->rn_status = status;

 	mlog_exit(status);
 	return status;
@@ -1822,7 +1811,7 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
 			continue;
 		}

-		if (__ocfs2_recovery_map_test(osb, node_num)) {
+		if (__ocfs2_recovery_node_test(osb, node_num)) {
 			spin_unlock(&osb->osb_lock);
 			continue;
 		}
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index 43e56b9..0325d81 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -43,9 +43,12 @@ struct ocfs2_dinode;
  * It is protected by the recovery_lock.
  */

-struct ocfs2_recovery_map {
-	unsigned int rm_used;
-	unsigned int *rm_entries;
+struct ocfs2_recover_node {
+	struct ocfs2_super *rn_osb;
+	int rn_node_num;
+	int rn_slot_num;
+	int rn_status;
+	struct list_head rn_list;
 };


diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 1efea36..f70c25a 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -337,7 +337,8 @@ struct ocfs2_super

 	atomic_t vol_state;
 	struct mutex recovery_lock;
-	struct ocfs2_recovery_map *recovery_map;
+	struct list_head s_recovery_nodes;
+
 	struct ocfs2_replay_map *replay_map;
 	struct task_struct *recovery_thread_task;
 	int disable_recovery;
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index f02c0ef..478715b 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -220,7 +220,6 @@ static const match_table_t tokens = {
 static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
 {
 	struct ocfs2_cluster_connection *cconn = osb->cconn;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
 	struct ocfs2_orphan_scan *os = &osb->osb_orphan_scan;
 	int i, out = 0;

@@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
*osb, char *buf, int len)
 			osb->dc_work_sequence);
 	spin_unlock(&osb->dc_task_lock);

-	spin_lock(&osb->osb_lock);
-	out += snprintf(buf + out, len - out, "%10s => Pid: %d  Nodes:",
-			"Recovery",
-			(osb->recovery_thread_task ?
-			 task_pid_nr(osb->recovery_thread_task) : -1));
-	if (rm->rm_used == 0)
-		out += snprintf(buf + out, len - out, " None\n");
-	else {
-		for (i = 0; i < rm->rm_used; i++)
-			out += snprintf(buf + out, len - out, " %d",
-					rm->rm_entries[i]);
-		out += snprintf(buf + out, len - out, "\n");
-	}
-	spin_unlock(&osb->osb_lock);
-
 	out += snprintf(buf + out, len - out,
 			"%10s => Pid: %d  Interval: %lu  Needs: %d\n", "Commit",
 			(osb->commit_task ? task_pid_nr(osb->commit_task) : -1),
-- 
1.7.1


-- 
Goldwyn



More information about the Ocfs2-devel mailing list