[Ocfs2-devel] [PATCH 1/2] ocfs2: timer to queue scan of all orphan slots
Sunil Mushran
sunil.mushran at oracle.com
Tue Jun 9 16:47:14 PDT 2009
Srini,
I was re-reviewing these patches as part of 1.4 merge and something
caught my eye. Specifically that this may slowdown umount unnecessarily.
Consider the case if scan_work() is fired a tick before scan_stop().
Currently, scan_stop() will cancel the newly queued scan_work() but do
nothing to the tasks queued by the earlier fire. umount thread will have
to wait for all those tasks to complete.
One solution is to add a flag, atomic_t os_stop_scan, in struct
ocfs2_orphan_scan. Call atomic_set() at the top of ocfs2_orphan_scan_stop().
In ocfs2_queue_orphan_scan(), check if the flag is set before and after
ocfs2_orphan_scan_lock(). If set, exit without queuing the tasks.
Secondly, call scan_stop() earlier in umount. Definitely before truncatelog
shutdown. Actually make it even before localalloc shutdown.
Make the patch atop what is in Joel's git tree already.
Thanks
Sunil
Srinivas Eeda wrote:
> +void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
> +{
> + struct ocfs2_orphan_scan *os;
> + int status, i;
> + u32 seqno = 0;
> +
> + os = &osb->osb_orphan_scan;
> +
> + status = ocfs2_orphan_scan_lock(osb, &seqno, LKM_EXMODE);
> + if (status < 0) {
> + if (status != -EAGAIN)
> + mlog_errno(status);
> + goto out;
> + }
> +
> + if (os->os_seqno != seqno) {
> + os->os_seqno = seqno;
> + goto unlock;
> + }
> +
> + for (i = 0; i < osb->max_slots; i++)
> + ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL);
> +
> + /*
> + * We queued a recovery on orphan slots, increment the sequence
> + * number and update LVB so other node will skip the scan for a while
> + */
> + seqno++;
> +unlock:
> + ocfs2_orphan_scan_unlock(osb, seqno, LKM_EXMODE);
> +out:
> + return;
> +}
> +
> +/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */
> +void ocfs2_orphan_scan_work(kapi_work_struct_t *work)
> +{
> + struct ocfs2_orphan_scan *os;
> + struct ocfs2_super *osb;
> +
> + os = work_to_object(work, struct ocfs2_orphan_scan,
> + os_orphan_scan_work.work);
> + osb = os->os_osb;
> +
> + mutex_lock(&os->os_lock);
> + ocfs2_queue_orphan_scan(osb);
> + schedule_delayed_work(&os->os_orphan_scan_work,
> + ocfs2_orphan_scan_timeout());
> + mutex_unlock(&os->os_lock);
> +}
> +
> +void ocfs2_orphan_scan_stop(struct ocfs2_super *osb)
> +{
> + struct ocfs2_orphan_scan *os;
> +
> + os = &osb->osb_orphan_scan;
> + mutex_lock(&os->os_lock);
> + cancel_delayed_work(&os->os_orphan_scan_work);
> + mutex_unlock(&os->os_lock);
> +}
> +
> +int ocfs2_orphan_scan_init(struct ocfs2_super *osb)
> +{
> + struct ocfs2_orphan_scan *os;
> +
> + os = &osb->osb_orphan_scan;
> + os->os_osb = osb;
> + mutex_init(&os->os_lock);
> +
> + KAPI_INIT_DELAYED_WORK(&os->os_orphan_scan_work,
> + ocfs2_orphan_scan_work, os);
> + schedule_delayed_work(&os->os_orphan_scan_work,
> + ocfs2_orphan_scan_timeout());
> + return 0;
> +}
> +
>
> diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
> index e84185d..7989966 100644
> --- a/fs/ocfs2/ocfs2.h
> +++ b/fs/ocfs2/ocfs2.h
> @@ -145,6 +145,14 @@ struct ocfs2_lock_res {
> #endif
> };
>
> +struct ocfs2_orphan_scan {
> + struct mutex os_lock;
> + struct ocfs2_super *os_osb;
> + struct ocfs2_lock_res os_lockres; /* lock to synchronize scans */
> + struct delayed_work os_orphan_scan_work;
> + u32 os_seqno; /* incremented on every scan */
> +};
> +
>
> diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
> index a421e7d..cd66b4d 100644
> --- a/fs/ocfs2/super.c
> +++ b/fs/ocfs2/super.c
> @@ -1495,6 +1495,8 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
>
> ocfs2_truncate_log_shutdown(osb);
>
> + ocfs2_orphan_scan_stop(osb);
> +
> /* disable any new recovery threads and wait for any currently
> * running ones to exit. Do this before setting the vol_state. */
> mutex_lock(&osb->recovery_lock);
> @@ -1640,6 +1642,13 @@ static int ocfs2_initialize_super(struct super_block *sb,
> osb->disable_recovery = 0;
> osb->recovery_thread_task = NULL;
>
> + status = ocfs2_orphan_scan_init(osb);
> + if (status) {
> + mlog(ML_ERROR, "Unable to initialize delayed orphan scan\n");
> + mlog_errno(status);
> + goto bail;
> + }
> +
> init_waitqueue_head(&osb->checkpoint_event);
> atomic_set(&osb->needs_checkpoint, 0);
>
More information about the Ocfs2-devel
mailing list