[Ocfs2-devel] [PATCH 1/2] Use ocfs2_recovery_node instead of recovery map

Goldwyn Rodrigues rgoldwyn at gmail.com
Tue Sep 20 08:11:43 PDT 2011


The ocfs2_recovery_node structure replaces the ocfs2_recovery_map.
For each node to be recovered, a recovery node structure is added to
s_active_reco_list. ocfs2rec kernel thread picks up the recovery node
structure from s_active_reco_list and processes it.

Signed-off-by: Goldwyn Rodrigues <rgoldwyn at suse.de>
---
 fs/ocfs2/heartbeat.c |   12 +++-
 fs/ocfs2/journal.c   |  174 +++++++++++++++++++++-----------------------------
 fs/ocfs2/journal.h   |   10 ++-
 fs/ocfs2/ocfs2.h     |    4 +-
 fs/ocfs2/super.c     |   10 ++-
 5 files changed, 98 insertions(+), 112 deletions(-)

diff --git a/fs/ocfs2/heartbeat.c b/fs/ocfs2/heartbeat.c
index d8208b2..2218721 100644
--- a/fs/ocfs2/heartbeat.c
+++ b/fs/ocfs2/heartbeat.c
@@ -37,7 +37,7 @@
 #include "inode.h"
 #include "journal.h"
 #include "ocfs2_trace.h"
-
+#include "slot_map.h"
 #include "buffer_head_io.h"

 static inline void __ocfs2_node_map_set_bit(struct ocfs2_node_map *map,
@@ -63,6 +63,7 @@ void ocfs2_init_node_maps(struct ocfs2_super *osb)
 void ocfs2_do_node_down(int node_num, void *data)
 {
 	struct ocfs2_super *osb = data;
+	int slot_num;

 	BUG_ON(osb->node_num == node_num);

@@ -78,7 +79,14 @@ void ocfs2_do_node_down(int node_num, void *data)
 		return;
 	}

-	ocfs2_recovery_thread(osb, node_num);
+	slot_num = ocfs2_node_num_to_slot(osb, node_num);
+	if (slot_num == -ENOENT) {
+		mlog(ML_ERROR, "Skipping recovery on node %d because "
+		     "could not find corresponding slot.\n", node_num);
+		return;
+	}
+
+	ocfs2_recovery_thread(osb, node_num, slot_num);
 }

 static inline void __ocfs2_node_map_set_bit(struct ocfs2_node_map *map,
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index 295d564..2e07c67 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
 #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000

 static int ocfs2_force_read_journal(struct inode *inode);
-static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num, int slot_num);
+static int ocfs2_recover_node(struct ocfs2_recovery_node *rn);
 static int __ocfs2_recovery_thread(void *arg);
 static int ocfs2_commit_cache(struct ocfs2_super *osb);
 static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
@@ -179,25 +178,21 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)

 int ocfs2_recovery_init(struct ocfs2_super *osb)
 {
-	struct ocfs2_recovery_map *rm;
-
+	struct ocfs2_recovery_node *rn;
+	int i;
 	mutex_init(&osb->recovery_lock);
 	osb->disable_recovery = 0;
 	osb->recovery_thread_task = NULL;
 	init_waitqueue_head(&osb->recovery_event);
-
-	rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
-		     osb->max_slots * sizeof(unsigned int),
-		     GFP_KERNEL);
-	if (!rm) {
-		mlog_errno(-ENOMEM);
-		return -ENOMEM;
+	INIT_LIST_HEAD(&osb->s_active_reco_list);
+	INIT_LIST_HEAD(&osb->s_recovery_list);
+	for (i = 0; i < osb->max_slots - 1; i++) {
+		rn = kzalloc(sizeof(struct ocfs2_recovery_node), GFP_KERNEL);
+		if (!rn)
+			return -ENOMEM;
+		rn->rn_osb = osb;
+		list_add(&rn->rn_list, &osb->s_recovery_list);
 	}
-
-	rm->rm_entries = (unsigned int *)((char *)rm +
-					  sizeof(struct ocfs2_recovery_map));
-	osb->recovery_map = rm;
-
 	return 0;
 }

@@ -212,8 +207,7 @@ static int ocfs2_recovery_thread_running(struct
ocfs2_super *osb)

 void ocfs2_recovery_exit(struct ocfs2_super *osb)
 {
-	struct ocfs2_recovery_map *rm;
-
+	struct ocfs2_recovery_node *rn, *tmp;
 	/* disable any new recovery threads and wait for any currently
 	 * running ones to exit. Do this before setting the vol_state. */
 	mutex_lock(&osb->recovery_lock);
@@ -226,75 +220,57 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
 	 * complete. */
 	flush_workqueue(ocfs2_wq);

-	/*
-	 * Now that recovery is shut down, and the osb is about to be
-	 * freed,  the osb_lock is not taken here.
-	 */
-	rm = osb->recovery_map;
-	/* XXX: Should we bug if there are dirty entries? */
-
-	kfree(rm);
+	spin_lock(&osb->osb_lock);
+	list_for_each_entry_safe(rn, tmp, &osb->s_recovery_list, rn_list) {
+			list_del(&rn->rn_list);
+			kfree(rn);
+	}
+	spin_unlock(&osb->osb_lock);
 }

-static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
+static int __ocfs2_recovery_node_test(struct ocfs2_super *osb,
 				     unsigned int node_num)
 {
-	int i;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
+	struct ocfs2_recovery_node *rn;

 	assert_spin_locked(&osb->osb_lock);
-
-	for (i = 0; i < rm->rm_used; i++) {
-		if (rm->rm_entries[i] == node_num)
+	list_for_each_entry(rn, &osb->s_active_reco_list, rn_list) {
+		if (rn->rn_node_num == node_num)
 			return 1;
 	}
-
 	return 0;
 }

 /* Behaves like test-and-set.  Returns the previous value */
-static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
-				  unsigned int node_num)
+static int ocfs2_recovery_node_set(struct ocfs2_super *osb,
+				  unsigned int node_num, unsigned int slot_num)
 {
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
+	struct ocfs2_recovery_node *rn;
+	int ret = 0;

 	spin_lock(&osb->osb_lock);
-	if (__ocfs2_recovery_map_test(osb, node_num)) {
-		spin_unlock(&osb->osb_lock);
-		return 1;
+	if (unlikely(__ocfs2_recovery_node_test(osb, node_num))) {
+		ret = 1;
+		goto out;
 	}
-
-	/* XXX: Can this be exploited? Not from o2dlm... */
-	BUG_ON(rm->rm_used >= osb->max_slots);
-
-	rm->rm_entries[rm->rm_used] = node_num;
-	rm->rm_used++;
+	BUG_ON(list_empty(&osb->s_recovery_list));
+	rn = list_first_entry(&osb->s_recovery_list,
+			struct ocfs2_recovery_node, rn_list);
+	rn->rn_node_num = node_num;
+	rn->rn_slot_num = slot_num;
+	list_move(&rn->rn_list, &osb->s_active_reco_list);
+out:
 	spin_unlock(&osb->osb_lock);
-
-	return 0;
+	return ret;
 }

-static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
-				     unsigned int node_num)
+static void ocfs2_recovery_node_clear(struct ocfs2_recovery_node *rn)
 {
-	int i;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
-
+	struct ocfs2_super *osb = rn->rn_osb;
 	spin_lock(&osb->osb_lock);
-
-	for (i = 0; i < rm->rm_used; i++) {
-		if (rm->rm_entries[i] == node_num)
-			break;
-	}
-
-	if (i < rm->rm_used) {
-		/* XXX: be careful with the pointer math */
-		memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
-			(rm->rm_used - i - 1) * sizeof(unsigned int));
-		rm->rm_used--;
-	}
-
+	list_move(&rn->rn_list, &osb->s_recovery_list);
 	spin_unlock(&osb->osb_lock);
+	kfree(rn);
 }

 static int ocfs2_commit_cache(struct ocfs2_super *osb)
@@ -1057,10 +1033,9 @@ bail:
 static int ocfs2_recovery_completed(struct ocfs2_super *osb)
 {
 	int empty;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;

 	spin_lock(&osb->osb_lock);
-	empty = (rm->rm_used == 0);
+	empty = list_empty(&osb->s_active_reco_list);
 	spin_unlock(&osb->osb_lock);

 	return empty;
@@ -1292,12 +1267,12 @@ void ocfs2_complete_quota_recovery(struct
ocfs2_super *osb)

 static int __ocfs2_recovery_thread(void *arg)
 {
-	int status, node_num, slot_num;
+	int status;
 	struct ocfs2_super *osb = arg;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
 	int *rm_quota = NULL;
 	int rm_quota_used = 0, i;
 	struct ocfs2_quota_recovery *qrec;
+	struct ocfs2_recovery_node *rn;

 	status = ocfs2_wait_on_mount(osb);
 	if (status < 0) {
@@ -1325,17 +1300,10 @@ restart:
 					NULL, NULL);

 	spin_lock(&osb->osb_lock);
-	while (rm->rm_used) {
-		/* It's always safe to remove entry zero, as we won't
-		 * clear it until ocfs2_recover_node() has succeeded. */
-		node_num = rm->rm_entries[0];
+	list_for_each_entry(rn, &osb->s_active_reco_list, rn_list) {
 		spin_unlock(&osb->osb_lock);
-		slot_num = ocfs2_node_num_to_slot(osb, node_num);
-		trace_ocfs2_recovery_thread_node(node_num, slot_num);
-		if (slot_num == -ENOENT) {
-			status = 0;
-			goto skip_recovery;
-		}
+		trace_ocfs2_recovery_thread_node(rn->rn_node_num,
+				rn->rn_slot_num);

 		/* It is a bit subtle with quota recovery. We cannot do it
 		 * immediately because we have to obtain cluster locks from
@@ -1343,18 +1311,18 @@ restart:
 		 * then quota usage would be out of sync until some node takes
 		 * the slot. So we remember which nodes need quota recovery
 		 * and when everything else is done, we recover quotas. */
-		for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+		for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
+				i++);
 		if (i == rm_quota_used)
-			rm_quota[rm_quota_used++] = slot_num;
+			rm_quota[rm_quota_used++] = rn->rn_slot_num;

-		status = ocfs2_recover_node(osb, node_num, slot_num);
-skip_recovery:
+		status = ocfs2_recover_node(rn);
 		if (!status) {
-			ocfs2_recovery_map_clear(osb, node_num);
+			ocfs2_recovery_node_clear(rn);
 		} else {
 			mlog(ML_ERROR,
 			     "Error %d recovering node %d on device (%u,%u)!\n",
-			     status, node_num,
+			     status, rn->rn_node_num,
 			     MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
 			mlog(ML_ERROR, "Volume requires unmount.\n");
 		}
@@ -1413,14 +1381,14 @@ bail:
 	return status;
 }

-void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
+void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num, int slot_num)
 {
 	mutex_lock(&osb->recovery_lock);

 	trace_ocfs2_recovery_thread(node_num, osb->node_num,
 		osb->disable_recovery, osb->recovery_thread_task,
 		osb->disable_recovery ?
-		-1 : ocfs2_recovery_map_set(osb, node_num));
+		-1 : ocfs2_recovery_node_set(osb, node_num, slot_num));

 	if (osb->disable_recovery)
 		goto out;
@@ -1626,23 +1594,26 @@ done:
  * second part of a nodes recovery process (local alloc recovery) is
  * far less concerning.
  */
-static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num, int slot_num)
+static int ocfs2_recover_node(struct ocfs2_recovery_node *rn)
 {
 	int status = 0;
 	struct ocfs2_dinode *la_copy = NULL;
 	struct ocfs2_dinode *tl_copy = NULL;
+	struct ocfs2_super *osb = rn->rn_osb;

-	trace_ocfs2_recover_node(node_num, slot_num, osb->node_num);
+	trace_ocfs2_recover_node(rn->rn_node_num, rn->rn_slot_num,
+			osb->node_num);

 	/* Should not ever be called to recover ourselves -- in that
 	 * case we should've called ocfs2_journal_load instead. */
-	BUG_ON(osb->node_num == node_num);
+	BUG_ON(osb->node_num == rn->rn_node_num);

-	status = ocfs2_replay_journal(osb, node_num, slot_num);
+	status = ocfs2_replay_journal(osb, rn->rn_node_num,
+			rn->rn_slot_num);
 	if (status < 0) {
 		if (status == -EBUSY) {
-			trace_ocfs2_recover_node_skip(slot_num, node_num);
+			trace_ocfs2_recover_node_skip(rn->rn_slot_num,
+					rn->rn_node_num);
 			status = 0;
 			goto done;
 		}
@@ -1651,7 +1622,8 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 	}

 	/* Stamp a clean local alloc file AFTER recovering the journal... */
-	status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
+	status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
+			&la_copy);
 	if (status < 0) {
 		mlog_errno(status);
 		goto done;
@@ -1660,23 +1632,23 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 	/* An error from begin_truncate_log_recovery is not
 	 * serious enough to warrant halting the rest of
 	 * recovery. */
-	status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
+	status = ocfs2_begin_truncate_log_recovery(osb,
+			rn->rn_slot_num, &tl_copy);
 	if (status < 0)
 		mlog_errno(status);

 	/* Likewise, this would be a strange but ultimately not so
 	 * harmful place to get an error... */
-	status = ocfs2_clear_slot(osb, slot_num);
+	status = ocfs2_clear_slot(osb, rn->rn_slot_num);
 	if (status < 0)
 		mlog_errno(status);

 	/* This will kfree the memory pointed to by la_copy and tl_copy */
-	ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
-					tl_copy, NULL);
+	ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
+				la_copy, tl_copy, NULL);

 	status = 0;
 done:
-
 	return status;
 }

@@ -1763,7 +1735,7 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
 			continue;
 		}

-		if (__ocfs2_recovery_map_test(osb, node_num)) {
+		if (__ocfs2_recovery_node_test(osb, node_num)) {
 			spin_unlock(&osb->osb_lock);
 			continue;
 		}
@@ -1777,7 +1749,7 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
 			/* Since we're called from mount, we know that
 			 * the recovery thread can't race us on
 			 * setting / checking the recovery bits. */
-			ocfs2_recovery_thread(osb, node_num);
+			ocfs2_recovery_thread(osb, node_num, i);
 		} else if ((status < 0) && (status != -EAGAIN)) {
 			mlog_errno(status);
 			goto bail;
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index 68cf2f6..4447964 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -43,9 +43,11 @@ struct ocfs2_dinode;
  * It is protected by the recovery_lock.
  */

-struct ocfs2_recovery_map {
-	unsigned int rm_used;
-	unsigned int *rm_entries;
+struct ocfs2_recovery_node {
+	struct ocfs2_super *rn_osb;
+	int rn_node_num;
+	int rn_slot_num;
+	struct list_head rn_list;
 };


@@ -193,7 +195,7 @@ int    ocfs2_journal_load(struct ocfs2_journal
*journal, int local,
 			  int replayed);
 int    ocfs2_check_journals_nolocks(struct ocfs2_super *osb);
 void   ocfs2_recovery_thread(struct ocfs2_super *osb,
-			     int node_num);
+			     int node_num, int slot_num);
 int    ocfs2_mark_dead_nodes(struct ocfs2_super *osb);
 void   ocfs2_complete_mount_recovery(struct ocfs2_super *osb);
 void ocfs2_complete_quota_recovery(struct ocfs2_super *osb);
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 4092858..03a625e 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -342,7 +342,9 @@ struct ocfs2_super

 	atomic_t vol_state;
 	struct mutex recovery_lock;
-	struct ocfs2_recovery_map *recovery_map;
+	struct list_head s_active_reco_list;
+	struct list_head s_recovery_list;
+
 	struct ocfs2_replay_map *replay_map;
 	struct task_struct *recovery_thread_task;
 	int disable_recovery;
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index 56f6102..6250ec2 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -222,7 +222,7 @@ static const match_table_t tokens = {
 static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
 {
 	struct ocfs2_cluster_connection *cconn = osb->cconn;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
+	struct ocfs2_recovery_node *rn;
 	struct ocfs2_orphan_scan *os = &osb->osb_orphan_scan;
 	int i, out = 0;

@@ -274,12 +274,14 @@ static int ocfs2_osb_dump(struct ocfs2_super
*osb, char *buf, int len)
 			"Recovery",
 			(osb->recovery_thread_task ?
 			 task_pid_nr(osb->recovery_thread_task) : -1));
-	if (rm->rm_used == 0)
+	if (list_empty(&osb->s_active_reco_list))
 		out += snprintf(buf + out, len - out, " None\n");
 	else {
-		for (i = 0; i < rm->rm_used; i++)
+		list_for_each_entry(rn, &osb->s_active_reco_list,
+				rn_list) {
 			out += snprintf(buf + out, len - out, " %d",
-					rm->rm_entries[i]);
+					rn->rn_node_num);
+		}
 		out += snprintf(buf + out, len - out, "\n");
 	}
 	spin_unlock(&osb->osb_lock);
-- 
1.7.6



More information about the Ocfs2-devel mailing list