[Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
Sunil Mushran
sunil.mushran at oracle.com
Wed Dec 1 20:49:37 PST 2010
On 11/17/2010 07:50 AM, Goldwyn Rodrigues wrote:
> Review comments by Wengang incorporated.
>
> ocfs2_recover_node is a new data structure to include all information required
> to recover one node. The structure is maintainer as a list in the super block.
ocfs2_recover_node suggests action. How about calling it
ocfs2_recovery_list.
> recovery_map is no longer required with ocfs2_recover_node.
>
> Signed-off-by: Goldwyn Rodrigues<rgoldwyn at suse.de>
> ---
> fs/ocfs2/journal.c | 171 ++++++++++++++++++++++++----------------------------
> fs/ocfs2/journal.h | 9 ++-
> fs/ocfs2/ocfs2.h | 3 +-
> fs/ocfs2/super.c | 16 -----
> 4 files changed, 88 insertions(+), 111 deletions(-)
>
> diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
> index faa2303..277b810 100644
> --- a/fs/ocfs2/journal.c
> +++ b/fs/ocfs2/journal.c
> @@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
> #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
>
> static int ocfs2_force_read_journal(struct inode *inode);
> -static int ocfs2_recover_node(struct ocfs2_super *osb,
> - int node_num, int slot_num);
> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
> static int __ocfs2_recovery_thread(void *arg);
> static int ocfs2_commit_cache(struct ocfs2_super *osb);
> static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
> @@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)
>
> int ocfs2_recovery_init(struct ocfs2_super *osb)
> {
> - struct ocfs2_recovery_map *rm;
> -
> mutex_init(&osb->recovery_lock);
> osb->disable_recovery = 0;
> osb->recovery_thread_task = NULL;
> init_waitqueue_head(&osb->recovery_event);
> -
> - rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
> - osb->max_slots * sizeof(unsigned int),
> - GFP_KERNEL);
> - if (!rm) {
> - mlog_errno(-ENOMEM);
> - return -ENOMEM;
> - }
> -
> - rm->rm_entries = (unsigned int *)((char *)rm +
> - sizeof(struct ocfs2_recovery_map));
> - osb->recovery_map = rm;
> + INIT_LIST_HEAD(&osb->s_recovery_nodes);
>
> return 0;
> }
> @@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
> ocfs2_super *osb)
>
> void ocfs2_recovery_exit(struct ocfs2_super *osb)
> {
> - struct ocfs2_recovery_map *rm;
> -
> + struct list_head *iter, *next;
> + struct ocfs2_recover_node *rn;
> /* disable any new recovery threads and wait for any currently
> * running ones to exit. Do this before setting the vol_state. */
> mutex_lock(&osb->recovery_lock);
> @@ -226,75 +212,67 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
> * complete. */
> flush_workqueue(ocfs2_wq);
>
> - /*
> - * Now that recovery is shut down, and the osb is about to be
> - * freed, the osb_lock is not taken here.
> - */
> - rm = osb->recovery_map;
> - /* XXX: Should we bug if there are dirty entries? */
This comment is still relevant.
> -
> - kfree(rm);
> + spin_lock(&osb->osb_lock);
> + list_for_each_safe(iter, next,&osb->s_recovery_nodes) {
> + rn = list_entry(iter, struct ocfs2_recover_node,
> + rn_list);
> + list_del(&rn->rn_list);
> + kfree(rn);
> + }
> + spin_unlock(&osb->osb_lock);
extra indentation
> }
>
> -static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
> +static int __ocfs2_recovery_node_test(struct ocfs2_super *osb,
> unsigned int node_num)
> {
> - int i;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> + struct ocfs2_recover_node *rn;
> + struct list_head *iter;
>
> assert_spin_locked(&osb->osb_lock);
> -
> - for (i = 0; i< rm->rm_used; i++) {
> - if (rm->rm_entries[i] == node_num)
> + list_for_each(iter,&osb->s_recovery_nodes) {
> + rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
> + if (rn->rn_node_num == node_num)
> return 1;
> }
> -
> return 0;
> }
>
> /* Behaves like test-and-set. Returns the previous value */
> -static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
> +static int ocfs2_recovery_node_set(struct ocfs2_super *osb,
> unsigned int node_num)
> {
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> -
> - spin_lock(&osb->osb_lock);
> - if (__ocfs2_recovery_map_test(osb, node_num)) {
> - spin_unlock(&osb->osb_lock);
> - return 1;
> - }
> + struct list_head *iter;
> + struct ocfs2_recover_node *rn = NULL, *trn;
>
> - /* XXX: Can this be exploited? Not from o2dlm... */
> - BUG_ON(rm->rm_used>= osb->max_slots);
> + trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_NOFS);
> + if (!trn)
> + return -ENOMEM;
Is this enomem case handled properly up the stack? This is new.
Previously we allocated memory upfront. Wondering why we cannot
so the same here.
> - rm->rm_entries[rm->rm_used] = node_num;
> - rm->rm_used++;
> + spin_lock(&osb->osb_lock);
> + list_for_each(iter,&osb->s_recovery_nodes) {
> + rn = list_entry(iter, struct ocfs2_recover_node,
> + rn_list);
> + if (rn->rn_node_num == node_num) {
> + spin_unlock(&osb->osb_lock);
> + kfree(trn);
> + return 1;
> + }
> + }
extra indentation
> + trn->rn_node_num = node_num;
> + trn->rn_osb = osb;
> + list_add_tail(&trn->rn_list,&osb->s_recovery_nodes);
> spin_unlock(&osb->osb_lock);
>
> return 0;
> }
>
> -static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
> - unsigned int node_num)
> +static void ocfs2_recovery_node_clear(struct ocfs2_super *osb,
> + struct ocfs2_recover_node *rn)
> {
> - int i;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> -
> spin_lock(&osb->osb_lock);
> -
> - for (i = 0; i< rm->rm_used; i++) {
> - if (rm->rm_entries[i] == node_num)
> - break;
> - }
> -
> - if (i< rm->rm_used) {
> - /* XXX: be careful with the pointer math */
> - memmove(&(rm->rm_entries[i]),&(rm->rm_entries[i + 1]),
> - (rm->rm_used - i - 1) * sizeof(unsigned int));
> - rm->rm_used--;
> - }
> -
> + list_del(&rn->rn_list);
> spin_unlock(&osb->osb_lock);
> + kfree(rn);
Aah...we did memmove. That won't work here. So list manipulation it is.
> }
>
> static int ocfs2_commit_cache(struct ocfs2_super *osb)
> @@ -1092,10 +1070,9 @@ bail:
> static int ocfs2_recovery_completed(struct ocfs2_super *osb)
> {
> int empty;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
>
> spin_lock(&osb->osb_lock);
> - empty = (rm->rm_used == 0);
> + empty = list_empty(&osb->s_recovery_nodes);
> spin_unlock(&osb->osb_lock);
>
> return empty;
> @@ -1332,12 +1309,13 @@ void ocfs2_complete_quota_recovery(struct
> ocfs2_super *osb)
>
> static int __ocfs2_recovery_thread(void *arg)
> {
> - int status, node_num, slot_num;
> + int status;
> struct ocfs2_super *osb = arg;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> int *rm_quota = NULL;
> int rm_quota_used = 0, i;
> struct ocfs2_quota_recovery *qrec;
> + struct list_head *iter;
> + struct ocfs2_recover_node *rn = NULL;
>
> mlog_entry_void();
>
> @@ -1367,20 +1345,21 @@ restart:
> NULL, NULL);
>
> spin_lock(&osb->osb_lock);
> - while (rm->rm_used) {
> + list_for_each(iter,&osb->s_recovery_nodes) {
> + rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
> /* It's always safe to remove entry zero, as we won't
> * clear it until ocfs2_recover_node() has succeeded. */
> - node_num = rm->rm_entries[0];
> spin_unlock(&osb->osb_lock);
> - mlog(0, "checking node %d\n", node_num);
> - slot_num = ocfs2_node_num_to_slot(osb, node_num);
> - if (slot_num == -ENOENT) {
> + mlog(0, "checking node %d\n", rn->rn_node_num);
> + rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
> + if (rn->rn_slot_num == -ENOENT) {
> status = 0;
> mlog(0, "no slot for this node, so no recovery"
> "required.\n");
> goto skip_recovery;
> }
> - mlog(0, "node %d was using slot %d\n", node_num, slot_num);
> + mlog(0, "node %d was using slot %d\n",
> + rn->rn_node_num, rn->rn_slot_num);
>
> /* It is a bit subtle with quota recovery. We cannot do it
> * immediately because we have to obtain cluster locks from
> @@ -1388,18 +1367,19 @@ restart:
> * then quota usage would be out of sync until some node takes
> * the slot. So we remember which nodes need quota recovery
> * and when everything else is done, we recover quotas. */
> - for (i = 0; i< rm_quota_used&& rm_quota[i] != slot_num; i++);
> + for (i = 0; i< rm_quota_used&& rm_quota[i] != rn->rn_slot_num;
> + i++);
> if (i == rm_quota_used)
> - rm_quota[rm_quota_used++] = slot_num;
> + rm_quota[rm_quota_used++] = rn->rn_slot_num;
>
> - status = ocfs2_recover_node(osb, node_num, slot_num);
> + status = ocfs2_recover_one_node(rn);
> skip_recovery:
> if (!status) {
> - ocfs2_recovery_map_clear(osb, node_num);
> + ocfs2_recovery_node_clear(osb, rn);
> } else {
> mlog(ML_ERROR,
> "Error %d recovering node %d on device (%u,%u)!\n",
> - status, node_num,
> + status, rn->rn_node_num,
> MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
> mlog(ML_ERROR, "Volume requires unmount.\n");
Hmm... this mlog could do with some clean up. But that can be
a separate change.
> }
> @@ -1461,6 +1441,7 @@ bail:
>
> void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
> {
> + int ret = 0;
> mlog_entry("(node_num=%d, osb->node_num = %d)\n",
> node_num, osb->node_num);
>
> @@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
> *osb, int node_num)
>
> /* People waiting on recovery will wait on
> * the recovery map to empty. */
> - if (ocfs2_recovery_map_set(osb, node_num))
> + ret = ocfs2_recovery_node_set(osb, node_num);
> + if (ret == -ENOMEM) {
> + mlog_errno(ret);
> + goto out;
> + } else if (ret)
> mlog(0, "node %d already in recovery map.\n", node_num);
>
> mlog(0, "starting recovery thread...\n");
> @@ -1681,26 +1666,27 @@ done:
> * second part of a nodes recovery process (local alloc recovery) is
> * far less concerning.
> */
> -static int ocfs2_recover_node(struct ocfs2_super *osb,
> - int node_num, int slot_num)
> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
> {
> int status = 0;
> struct ocfs2_dinode *la_copy = NULL;
> struct ocfs2_dinode *tl_copy = NULL;
> + struct ocfs2_super *osb = rn->rn_osb;
>
> mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
> - node_num, slot_num, osb->node_num);
> + rn->rn_node_num, rn->rn_slot_num, osb->node_num);
>
> /* Should not ever be called to recover ourselves -- in that
> * case we should've called ocfs2_journal_load instead. */
> - BUG_ON(osb->node_num == node_num);
> + BUG_ON(osb->node_num == rn->rn_node_num);
>
> - status = ocfs2_replay_journal(osb, node_num, slot_num);
> + status = ocfs2_replay_journal(osb, rn->rn_node_num,
> + rn->rn_slot_num);
> if (status< 0) {
> if (status == -EBUSY) {
> mlog(0, "Skipping recovery for slot %u (node %u) "
> - "as another node has recovered it\n", slot_num,
> - node_num);
> + "as another node has recovered it\n",
> + rn->rn_slot_num, rn->rn_node_num);
> status = 0;
> goto done;
> }
> @@ -1709,7 +1695,8 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
> }
>
> /* Stamp a clean local alloc file AFTER recovering the journal... */
> - status = ocfs2_begin_local_alloc_recovery(osb, slot_num,&la_copy);
> + status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
> + &la_copy);
> if (status< 0) {
> mlog_errno(status);
> goto done;
> @@ -1718,22 +1705,24 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
> /* An error from begin_truncate_log_recovery is not
> * serious enough to warrant halting the rest of
> * recovery. */
> - status = ocfs2_begin_truncate_log_recovery(osb, slot_num,&tl_copy);
> + status = ocfs2_begin_truncate_log_recovery(osb,
> + rn->rn_slot_num,&tl_copy);
> if (status< 0)
> mlog_errno(status);
>
> /* Likewise, this would be a strange but ultimately not so
> * harmful place to get an error... */
> - status = ocfs2_clear_slot(osb, slot_num);
> + status = ocfs2_clear_slot(osb, rn->rn_slot_num);
> if (status< 0)
> mlog_errno(status);
>
> /* This will kfree the memory pointed to by la_copy and tl_copy */
> - ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
> - tl_copy, NULL);
> + ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
> + la_copy, tl_copy, NULL);
>
> status = 0;
> done:
> + rn->rn_status = status;
>
> mlog_exit(status);
> return status;
> @@ -1822,7 +1811,7 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
> continue;
> }
>
> - if (__ocfs2_recovery_map_test(osb, node_num)) {
> + if (__ocfs2_recovery_node_test(osb, node_num)) {
> spin_unlock(&osb->osb_lock);
> continue;
> }
> diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
> index 43e56b9..0325d81 100644
> --- a/fs/ocfs2/journal.h
> +++ b/fs/ocfs2/journal.h
> @@ -43,9 +43,12 @@ struct ocfs2_dinode;
> * It is protected by the recovery_lock.
> */
>
> -struct ocfs2_recovery_map {
> - unsigned int rm_used;
> - unsigned int *rm_entries;
> +struct ocfs2_recover_node {
> + struct ocfs2_super *rn_osb;
> + int rn_node_num;
> + int rn_slot_num;
> + int rn_status;
> + struct list_head rn_list;
> };
See comment above.
>
> diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
> index 1efea36..f70c25a 100644
> --- a/fs/ocfs2/ocfs2.h
> +++ b/fs/ocfs2/ocfs2.h
> @@ -337,7 +337,8 @@ struct ocfs2_super
>
> atomic_t vol_state;
> struct mutex recovery_lock;
> - struct ocfs2_recovery_map *recovery_map;
> + struct list_head s_recovery_nodes;
s_recovery_list will fit better.
> +
> struct ocfs2_replay_map *replay_map;
> struct task_struct *recovery_thread_task;
> int disable_recovery;
> diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
> index f02c0ef..478715b 100644
> --- a/fs/ocfs2/super.c
> +++ b/fs/ocfs2/super.c
> @@ -220,7 +220,6 @@ static const match_table_t tokens = {
> static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
> {
> struct ocfs2_cluster_connection *cconn = osb->cconn;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> struct ocfs2_orphan_scan *os =&osb->osb_orphan_scan;
> int i, out = 0;
>
> @@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
> *osb, char *buf, int len)
> osb->dc_work_sequence);
> spin_unlock(&osb->dc_task_lock);
>
> - spin_lock(&osb->osb_lock);
> - out += snprintf(buf + out, len - out, "%10s => Pid: %d Nodes:",
> - "Recovery",
> - (osb->recovery_thread_task ?
> - task_pid_nr(osb->recovery_thread_task) : -1));
> - if (rm->rm_used == 0)
> - out += snprintf(buf + out, len - out, " None\n");
> - else {
> - for (i = 0; i< rm->rm_used; i++)
> - out += snprintf(buf + out, len - out, " %d",
> - rm->rm_entries[i]);
> - out += snprintf(buf + out, len - out, "\n");
> - }
> - spin_unlock(&osb->osb_lock);
> -
> out += snprintf(buf + out, len - out,
> "%10s => Pid: %d Interval: %lu Needs: %d\n", "Commit",
> (osb->commit_task ? task_pid_nr(osb->commit_task) : -1),
More information about the Ocfs2-devel
mailing list