[Ocfs2-devel] [PATCH 1/2] ocfs2: timer to queue scan of all orphan slots

Sunil Mushran sunil.mushran at oracle.com
Wed Jun 3 17:16:18 PDT 2009


Signed-off-by: Sunil Mushran <sunil.mushran at oracle.com>


Srinivas Eeda wrote:
> When a dentry is unlinked, the unlinking node takes an EX on the dentry lock
> before moving the dentry to the orphan directory. The other nodes, that all had
> a PR on the same dentry lock, flag the corresponding inode as MAYBE_ORPHANED
> during the downconvert. The inode is finally deleted when the last node to iput
> the inode notices the MAYBE_ORPHANED flag.
>
> A problem arises if a node is forced to free dentry locks because of memory
> pressure. If this happens, the node will no longer get downconvert notifications
> for the dentries that have been unlinked on another node. If it also happens
> that node is actively using the corresponding inode and happens to be the one
> performing the last iput on that inode, it will fail to delete the inode as it
> will not have the MAYBE_ORPHANED flag set.
>
> This patch fixes this shortcoming by introducing a periodic scan of the orphan
> directories to delete such inodes. Care has been taken to distribute the
> workload across the cluster so that no one node has to perform the task all the
> time.
>
> Signed-off-by: Srinivas Eeda <srinivas.eeda at oracle.com>
> ---
>  fs/ocfs2/dlmglue.c      |   51 ++++++++++++++++++++++
>  fs/ocfs2/dlmglue.h      |   11 +++++
>  fs/ocfs2/journal.c      |  106 +++++++++++++++++++++++++++++++++++++++++++++++
>  fs/ocfs2/journal.h      |    4 ++
>  fs/ocfs2/ocfs2.h        |   10 ++++
>  fs/ocfs2/ocfs2_lockid.h |    5 ++
>  fs/ocfs2/super.c        |    9 ++++
>  7 files changed, 196 insertions(+), 0 deletions(-)
>
> diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
> index e15fc7d..0f35b83 100644
> --- a/fs/ocfs2/dlmglue.c
> +++ b/fs/ocfs2/dlmglue.c
> @@ -248,6 +248,10 @@ static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
>  	.flags		= 0,
>  };
>  
> +static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
> +	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
> +};
> +
>  static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
>  	.get_osb	= ocfs2_get_dentry_osb,
>  	.post_unlock	= ocfs2_dentry_post_unlock,
> @@ -637,6 +641,19 @@ static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
>  				   &ocfs2_nfs_sync_lops, osb);
>  }
>  
> +static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
> +					    struct ocfs2_super *osb)
> +{
> +	struct ocfs2_orphan_scan_lvb *lvb;
> +
> +	ocfs2_lock_res_init_once(res);
> +	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
> +	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
> +				   &ocfs2_orphan_scan_lops, osb);
> +	lvb = ocfs2_dlm_lvb(&res->l_lksb);
> +	lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
> +}
> +
>  void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
>  			      struct ocfs2_file_private *fp)
>  {
> @@ -2352,6 +2369,37 @@ void ocfs2_inode_unlock(struct inode *inode,
>  	mlog_exit_void();
>  }
>  
> +int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno, int ex)
> +{
> +	struct ocfs2_lock_res *lockres;
> +	struct ocfs2_orphan_scan_lvb *lvb;
> +	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
> +	int status = 0;
> +
> +	lockres = &osb->osb_orphan_scan.os_lockres;
> +	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 
> +	if (status < 0)
> +		return status;
> +
> +	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
> +	if (lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
> +		*seqno = be32_to_cpu(lvb->lvb_os_seqno); 
> +	return status;
> +}
> +
> +void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno, int ex)
> +{
> +	struct ocfs2_lock_res *lockres;
> +	struct ocfs2_orphan_scan_lvb *lvb;
> +	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
> +
> +	lockres = &osb->osb_orphan_scan.os_lockres;
> +	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
> +	lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
> +	lvb->lvb_os_seqno = cpu_to_be32(seqno);
> +	ocfs2_cluster_unlock(osb, lockres, level); 
> +}
> +
>  int ocfs2_super_lock(struct ocfs2_super *osb,
>  		     int ex)
>  {
> @@ -2842,6 +2890,7 @@ local:
>  	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
>  	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
>  	ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
> +	ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
>  
>  	osb->cconn = conn;
>  
> @@ -2878,6 +2927,7 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
>  	ocfs2_lock_res_free(&osb->osb_super_lockres);
>  	ocfs2_lock_res_free(&osb->osb_rename_lockres);
>  	ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
> +	ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
>  
>  	ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
>  	osb->cconn = NULL;
> @@ -3061,6 +3111,7 @@ static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
>  	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
>  	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
>  	ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
> +	ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
>  }
>  
>  int ocfs2_drop_inode_locks(struct inode *inode)
> diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
> index e1fd572..31b90d7 100644
> --- a/fs/ocfs2/dlmglue.h
> +++ b/fs/ocfs2/dlmglue.h
> @@ -62,6 +62,14 @@ struct ocfs2_qinfo_lvb {
>  	__be32	lvb_free_entry;
>  };
>  
> +#define OCFS2_ORPHAN_LVB_VERSION 1
> +
> +struct ocfs2_orphan_scan_lvb {
> +	__u8	lvb_version;
> +	__u8	lvb_reserved[3];
> +	__be32	lvb_os_seqno;
> +};
> +
>  /* ocfs2_inode_lock_full() 'arg_flags' flags */
>  /* don't wait on recovery. */
>  #define OCFS2_META_LOCK_RECOVERY	(0x01)
> @@ -113,6 +121,9 @@ int ocfs2_super_lock(struct ocfs2_super *osb,
>  		     int ex);
>  void ocfs2_super_unlock(struct ocfs2_super *osb,
>  			int ex);
> +int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno, int ex);
> +void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno, int ex);
> +
>  int ocfs2_rename_lock(struct ocfs2_super *osb);
>  void ocfs2_rename_unlock(struct ocfs2_super *osb);
>  int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex);
> diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
> index a20a0f1..dc7cea3 100644
> --- a/fs/ocfs2/journal.c
> +++ b/fs/ocfs2/journal.c
> @@ -28,6 +28,8 @@
>  #include <linux/slab.h>
>  #include <linux/highmem.h>
>  #include <linux/kthread.h>
> +#include <linux/time.h>
> +#include <linux/random.h>
>  
>  #define MLOG_MASK_PREFIX ML_JOURNAL
>  #include <cluster/masklog.h>
> @@ -52,6 +54,8 @@
>  
>  DEFINE_SPINLOCK(trans_inc_lock);
>  
> +#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
> +
>  static int ocfs2_force_read_journal(struct inode *inode);
>  static int ocfs2_recover_node(struct ocfs2_super *osb,
>  			      int node_num, int slot_num);
> @@ -1841,6 +1845,108 @@ bail:
>  	return status;
>  }
>  
> +/*
> + * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some
> + * randomness to the timeout to minimize multple nodes firing the timer at the
> + * same time.
> + */
> +static inline unsigned long ocfs2_orphan_scan_timeout(void)
> +{
> +	unsigned long time;
> +
> +	get_random_bytes(&time, sizeof(time));
> +	time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000);
> +	return msecs_to_jiffies(time);
> +}
> +
> +/*
> + * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for
> + * every slot which queues a recovery of slot on ocfs2_wq thread. This is done
> + * to cleanup any orphans that are left over in orphan slots.
> + *
> + * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT seconds
> + * It gets an EX lock on os_lockres and checks sequence number stored in LVB. If
> + * the sequence number is changed it means some node has done the scan. Skip the
> + * scan and tracks the sequence number. If the sequence number didn't change,
> + * means a scan didn't happen, so the node queues a scan and increments the
> + * sequence number in LVB.
> + */ 
> +void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
> +{
> +	struct ocfs2_orphan_scan *os;
> +	int status, i;
> +	u32 seqno = 0;
> +
> +	os = &osb->osb_orphan_scan;
> +
> +	status = ocfs2_orphan_scan_lock(osb, &seqno, DLM_LOCK_EX);
> +	if (status < 0) {
> +		if (status != -EAGAIN)
> +			mlog_errno(status);
> +		goto out;
> +	}
> +
> +	if (os->os_seqno != seqno) {
> +		os->os_seqno = seqno;
> +		goto unlock;
> +	}
> +
> +	for (i = 0; i < osb->max_slots; i++)
> +		ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL,
> +						NULL);
> +	/*
> +	 * We queued a recovery on orphan slots, increment the sequence
> +	 * number and update LVB so other node will skip the scan for a while
> +	 */
> +	seqno++;
> +unlock:
> +	ocfs2_orphan_scan_unlock(osb, seqno, DLM_LOCK_EX);
> +out:
> +	return;
> +}
> +
> +/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */
> +void ocfs2_orphan_scan_work(struct work_struct *work)
> +{
> +	struct ocfs2_orphan_scan *os;
> +	struct ocfs2_super *osb;
> +
> +	os = container_of(work, struct ocfs2_orphan_scan,
> +			  os_orphan_scan_work.work);
> +	osb = os->os_osb;
> +
> +	mutex_lock(&os->os_lock);
> +	ocfs2_queue_orphan_scan(osb);
> +	schedule_delayed_work(&os->os_orphan_scan_work,
> +			      ocfs2_orphan_scan_timeout());
> +	mutex_unlock(&os->os_lock);
> +}
> +
> +void ocfs2_orphan_scan_stop(struct ocfs2_super *osb)
> +{
> +	struct ocfs2_orphan_scan *os;
> +
> +	os = &osb->osb_orphan_scan;
> +	mutex_lock(&os->os_lock);
> +	cancel_delayed_work(&os->os_orphan_scan_work);
> +	mutex_unlock(&os->os_lock);
> +}
> +
> +int ocfs2_orphan_scan_init(struct ocfs2_super *osb)
> +{
> +	struct ocfs2_orphan_scan *os;
> +
> +	os = &osb->osb_orphan_scan;
> +	os->os_osb = osb;
> +	mutex_init(&os->os_lock);
> +
> +	INIT_DELAYED_WORK(&os->os_orphan_scan_work,
> +			  ocfs2_orphan_scan_work);
> +	schedule_delayed_work(&os->os_orphan_scan_work,
> +			      ocfs2_orphan_scan_timeout());
> +	return 0;
> +}
> +
>  struct ocfs2_orphan_filldir_priv {
>  	struct inode		*head;
>  	struct ocfs2_super	*osb;
> diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
> index 619dd7f..3483202 100644
> --- a/fs/ocfs2/journal.h
> +++ b/fs/ocfs2/journal.h
> @@ -144,6 +144,10 @@ static inline void ocfs2_inode_set_new(struct ocfs2_super *osb,
>  }
>  
>  /* Exported only for the journal struct init code in super.c. Do not call. */
> +int ocfs2_orphan_scan_init(struct ocfs2_super *osb);
> +void ocfs2_orphan_scan_stop(struct ocfs2_super *osb);
> +void ocfs2_orphan_scan_exit(struct ocfs2_super *osb);
> +
>  void ocfs2_complete_recovery(struct work_struct *work);
>  void ocfs2_wait_for_recovery(struct ocfs2_super *osb);
>  
> diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
> index 1386281..373fb1c 100644
> --- a/fs/ocfs2/ocfs2.h
> +++ b/fs/ocfs2/ocfs2.h
> @@ -151,6 +151,14 @@ struct ocfs2_lock_res {
>  #endif
>  };
>  
> +struct ocfs2_orphan_scan {
> +	struct mutex 		os_lock;
> +	struct ocfs2_super 	*os_osb;
> +	struct ocfs2_lock_res 	os_lockres;     /* lock to synchronize scans */
> +	struct delayed_work 	os_orphan_scan_work;
> +	u32  			os_seqno;       /* incremented on every scan */
> +};
> +
>  struct ocfs2_dlm_debug {
>  	struct kref d_refcnt;
>  	struct dentry *d_locking_state;
> @@ -341,6 +349,8 @@ struct ocfs2_super
>  	unsigned int			*osb_orphan_wipes;
>  	wait_queue_head_t		osb_wipe_event;
>  
> +	struct ocfs2_orphan_scan 	osb_orphan_scan; 
> +
>  	/* used to protect metaecc calculation check of xattr. */
>  	spinlock_t osb_xattr_lock;
>  
> diff --git a/fs/ocfs2/ocfs2_lockid.h b/fs/ocfs2/ocfs2_lockid.h
> index a53ce87..fcdba09 100644
> --- a/fs/ocfs2/ocfs2_lockid.h
> +++ b/fs/ocfs2/ocfs2_lockid.h
> @@ -48,6 +48,7 @@ enum ocfs2_lock_type {
>  	OCFS2_LOCK_TYPE_FLOCK,
>  	OCFS2_LOCK_TYPE_QINFO,
>  	OCFS2_LOCK_TYPE_NFS_SYNC,
> +	OCFS2_LOCK_TYPE_ORPHAN_SCAN,
>  	OCFS2_NUM_LOCK_TYPES
>  };
>  
> @@ -85,6 +86,9 @@ static inline char ocfs2_lock_type_char(enum ocfs2_lock_type type)
>  		case OCFS2_LOCK_TYPE_NFS_SYNC:
>  			c = 'Y';
>  			break;
> +		case OCFS2_LOCK_TYPE_ORPHAN_SCAN:
> +			c = 'P';
> +			break;
>  		default:
>  			c = '\0';
>  	}
> @@ -104,6 +108,7 @@ static char *ocfs2_lock_type_strings[] = {
>  	[OCFS2_LOCK_TYPE_OPEN] = "Open",
>  	[OCFS2_LOCK_TYPE_FLOCK] = "Flock",
>  	[OCFS2_LOCK_TYPE_QINFO] = "Quota",
> +	[OCFS2_LOCK_TYPE_ORPHAN_SCAN] = "OrphanScan",
>  };
>  
>  static inline const char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
> diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
> index 79ff8d9..44ac27e 100644
> --- a/fs/ocfs2/super.c
> +++ b/fs/ocfs2/super.c
> @@ -1802,6 +1802,8 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
>  
>  	ocfs2_truncate_log_shutdown(osb);
>  
> +	ocfs2_orphan_scan_stop(osb);
> +
>  	/* This will disable recovery and flush any recovery work. */
>  	ocfs2_recovery_exit(osb);
>  
> @@ -1957,6 +1959,13 @@ static int ocfs2_initialize_super(struct super_block *sb,
>  		goto bail;
>  	}
>  
> +	status = ocfs2_orphan_scan_init(osb);
> +	if (status) {
> +		mlog(ML_ERROR, "Unable to initialize delayed orphan scan\n");
> +		mlog_errno(status);
> +		goto bail;
> +	}
> +
>  	init_waitqueue_head(&osb->checkpoint_event);
>  	atomic_set(&osb->needs_checkpoint, 0);
>  
>   




More information about the Ocfs2-devel mailing list