[Ocfs2-devel] [PATCH 1/2] ocfs2: add flock lock type

Joel Becker Joel.Becker at oracle.com
Mon Dec 24 13:57:54 PST 2007


On Thu, Dec 20, 2007 at 04:55:42PM -0800, Mark Fasheh wrote:
> This adds a new dlmglue lock type which is intended to back flock()
> requests.
> 
> Since these locks are driven from userspace, usage rules are much more
> liberal than the typical Ocfs2 internal cluster lock. As a result, we can't
> make use of most dlmglue features - lock caching and lock level
> optimizations in particular. Additionally, userspace is free to deadlock
> itself, so we have to deal with that in the same way as the rest of the
> kernel - by allowing a signal to abort a lock request.
> 
> In order to keep ocfs2_cluster_lock() complexity down, ocfs2_file_lock()
> does it's own dlm coordination. We still use the same helper functions
> though, so duplicated code is kept to a minimum.
> 
> Signed-off-by: Mark Fasheh <mark.fasheh at oracle.com>

Signed-off-by: Joel Becker <joel.becker at oracle.com>

> ---
>  fs/ocfs2/dlmglue.c      |  267 +++++++++++++++++++++++++++++++++++++++++++++++
>  fs/ocfs2/dlmglue.h      |    5 +
>  fs/ocfs2/file.h         |    6 +
>  fs/ocfs2/ocfs2.h        |    1 +
>  fs/ocfs2/ocfs2_lockid.h |    5 +
>  5 files changed, 284 insertions(+), 0 deletions(-)
> 
> diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
> index 4e97dcc..2a17305 100644
> --- a/fs/ocfs2/dlmglue.c
> +++ b/fs/ocfs2/dlmglue.c
> @@ -69,6 +69,7 @@ struct ocfs2_mask_waiter {
>  
>  static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
>  static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
> +static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
>  
>  /*
>   * Return value from ->downconvert_worker functions.
> @@ -258,6 +259,11 @@ static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
>  	.flags		= 0,
>  };
>  
> +static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
> +	.get_osb	= ocfs2_get_file_osb,
> +	.flags		= 0,
> +};
> +
>  static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
>  {
>  	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
> @@ -316,6 +322,17 @@ static int ocfs2_meta_lock_update(struct inode *inode,
>  				  struct buffer_head **bh);
>  static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
>  static inline int ocfs2_highest_compat_lock_level(int level);
> +static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
> +				      int new_level);
> +static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
> +				  struct ocfs2_lock_res *lockres,
> +				  int new_level,
> +				  int lvb);
> +static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
> +				        struct ocfs2_lock_res *lockres);
> +static int ocfs2_cancel_convert(struct ocfs2_super *osb,
> +				struct ocfs2_lock_res *lockres);
> +
>  
>  static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
>  				  u64 blkno,
> @@ -428,6 +445,13 @@ static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
>  	return OCFS2_SB(inode->i_sb);
>  }
>  
> +static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
> +{
> +	struct ocfs2_file_private *fp = lockres->l_priv;
> +
> +	return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
> +}
> +
>  static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
>  {
>  	__be64 inode_blkno_be;
> @@ -508,6 +532,21 @@ static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
>  				   &ocfs2_rename_lops, osb);
>  }
>  
> +void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
> +			      struct ocfs2_file_private *fp)
> +{
> +	struct inode *inode = fp->fp_file->f_mapping->host;
> +	struct ocfs2_inode_info *oi = OCFS2_I(inode);
> +
> +	ocfs2_lock_res_init_once(lockres);
> +	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
> +			      inode->i_generation, lockres->l_name);
> +	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
> +				   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
> +				   fp);
> +	lockres->l_flags |= OCFS2_LOCK_NOCACHE;
> +}
> +
>  void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
>  {
>  	mlog_entry_void();
> @@ -724,6 +763,13 @@ static void ocfs2_blocking_ast(void *opaque, int level)
>  	     lockres->l_name, level, lockres->l_level,
>  	     ocfs2_lock_type_string(lockres->l_type));
>  
> +	/*
> +	 * We can skip the bast for locks which don't enable caching -
> +	 * they'll be dropped at the earliest possible time anyway.
> +	 */
> +	if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
> +		return;
> +
>  	spin_lock_irqsave(&lockres->l_lock, flags);
>  	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
>  	if (needs_downconvert)
> @@ -935,6 +981,21 @@ static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
>  
>  }
>  
> +static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
> +					     struct ocfs2_lock_res *lockres)
> +{
> +	int ret;
> +
> +	ret = wait_for_completion_interruptible(&mw->mw_complete);
> +	if (ret)
> +		lockres_remove_mask_waiter(lockres, mw);
> +	else
> +		ret = mw->mw_status;
> +	/* Re-arm the completion in case we want to wait on it again */
> +	INIT_COMPLETION(mw->mw_complete);
> +	return ret;
> +}
> +
>  static int ocfs2_cluster_lock(struct ocfs2_super *osb,
>  			      struct ocfs2_lock_res *lockres,
>  			      int level,
> @@ -1372,6 +1433,212 @@ int ocfs2_data_lock_with_page(struct inode *inode,
>  	return ret;
>  }
>  
> +static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
> +				     int level)
> +{
> +	int ret;
> +	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
> +	unsigned long flags;
> +	struct ocfs2_mask_waiter mw;
> +
> +	ocfs2_init_mask_waiter(&mw);
> +
> +retry_cancel:
> +	spin_lock_irqsave(&lockres->l_lock, flags);
> +	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
> +		ret = ocfs2_prepare_cancel_convert(osb, lockres);
> +		if (ret) {
> +			spin_unlock_irqrestore(&lockres->l_lock, flags);
> +			ret = ocfs2_cancel_convert(osb, lockres);
> +			if (ret < 0) {
> +				mlog_errno(ret);
> +				goto out;
> +			}
> +			goto retry_cancel;
> +		}
> +		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
> +		spin_unlock_irqrestore(&lockres->l_lock, flags);
> +
> +		ocfs2_wait_for_mask(&mw);
> +		goto retry_cancel;
> +	}
> +
> +	ret = -ERESTARTSYS;
> +	/*
> +	 * We may still have gotten the lock, in which case there's no
> +	 * point to restarting the syscall.
> +	 */
> +	if (lockres->l_level == level)
> +		ret = 0;
> +
> +	mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
> +	     lockres->l_flags, lockres->l_level, lockres->l_action);
> +
> +	spin_unlock_irqrestore(&lockres->l_lock, flags);
> +
> +out:
> +	return ret;
> +}
> +
> +/*
> + * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
> + * flock() calls. The locking approach this requires is sufficiently
> + * different from all other cluster lock types that we implement a
> + * seperate path to the "low-level" dlm calls. In particular:
> + *
> + * - No optimization of lock levels is done - we take at exactly
> + *   what's been requested.
> + *
> + * - No lock caching is employed. We immediately downconvert to
> + *   no-lock at unlock time. This also means flock locks never go on
> + *   the blocking list).
> + *
> + * - Since userspace can trivially deadlock itself with flock, we make
> + *   sure to allow cancellation of a misbehaving applications flock()
> + *   request.
> + *
> + * - Access to any flock lockres doesn't require concurrency, so we
> + *   can simplify the code by requiring the caller to guarantee
> + *   serialization of dlmglue flock calls.
> + */
> +int ocfs2_file_lock(struct file *file, int ex, int trylock)
> +{
> +	int ret, level = ex ? LKM_EXMODE : LKM_PRMODE;
> +	unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0;
> +	unsigned long flags;
> +	struct ocfs2_file_private *fp = file->private_data;
> +	struct ocfs2_lock_res *lockres = &fp->fp_flock;
> +	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
> +	struct ocfs2_mask_waiter mw;
> +
> +	ocfs2_init_mask_waiter(&mw);
> +
> +	if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
> +	    (lockres->l_level > LKM_NLMODE)) {
> +		mlog(ML_ERROR,
> +		     "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
> +		     "level: %u\n", lockres->l_name, lockres->l_flags,
> +		     lockres->l_level);
> +		return -EINVAL;
> +	}
> +
> +	spin_lock_irqsave(&lockres->l_lock, flags);
> +	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
> +		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
> +		spin_unlock_irqrestore(&lockres->l_lock, flags);
> +
> +		/*
> +		 * Get the lock at NLMODE to start - that way we
> +		 * can cancel the upconvert request if need be.
> +		 */
> +		ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
> +		if (ret < 0) {
> +			mlog_errno(ret);
> +			goto out;
> +		}
> +
> +		ret = ocfs2_wait_for_mask(&mw);
> +		if (ret) {
> +			mlog_errno(ret);
> +			goto out;
> +		}
> +		spin_lock_irqsave(&lockres->l_lock, flags);
> +	}
> +
> +	lockres->l_action = OCFS2_AST_CONVERT;
> +	lkm_flags |= LKM_CONVERT;
> +	lockres->l_requested = level;
> +	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
> +
> +	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
> +	spin_unlock_irqrestore(&lockres->l_lock, flags);
> +
> +	ret = dlmlock(osb->dlm, level, &lockres->l_lksb, lkm_flags,
> +		      lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
> +		      ocfs2_locking_ast, lockres, ocfs2_blocking_ast);
> +	if (ret != DLM_NORMAL) {
> +		if (trylock && ret == DLM_NOTQUEUED)
> +			ret = -EAGAIN;
> +		else {
> +			ocfs2_log_dlm_error("dlmlock", ret, lockres);
> +			ret = -EINVAL;
> +		}
> +
> +		ocfs2_recover_from_dlm_error(lockres, 1);
> +		lockres_remove_mask_waiter(lockres, &mw);
> +		goto out;
> +	}
> +
> +	ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
> +	if (ret == -ERESTARTSYS) {
> +		/*
> +		 * Userspace can cause deadlock itself with
> +		 * flock(). Current behavior locally is to allow the
> +		 * deadlock, but abort the system call if a signal is
> +		 * received. We follow this example, otherwise a
> +		 * poorly written program could sit in kernel until
> +		 * reboot.
> +		 *
> +		 * Handling this is a bit more complicated for Ocfs2
> +		 * though. We can't exit this function with an
> +		 * outstanding lock request, so a cancel convert is
> +		 * required. We intentionally overwrite 'ret' - if the
> +		 * cancel fails and the lock was granted, it's easier
> +		 * to just bubble sucess back up to the user.
> +		 */
> +		ret = ocfs2_flock_handle_signal(lockres, level);
> +	}
> +
> +out:
> +
> +	mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
> +	     lockres->l_name, ex, trylock, ret);
> +	return ret;
> +}
> +
> +void ocfs2_file_unlock(struct file *file)
> +{
> +	int ret;
> +	unsigned long flags;
> +	struct ocfs2_file_private *fp = file->private_data;
> +	struct ocfs2_lock_res *lockres = &fp->fp_flock;
> +	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
> +	struct ocfs2_mask_waiter mw;
> +
> +	ocfs2_init_mask_waiter(&mw);
> +
> +	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
> +		return;
> +
> +	if (lockres->l_level == LKM_NLMODE)
> +		return;
> +
> +	mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
> +	     lockres->l_name, lockres->l_flags, lockres->l_level,
> +	     lockres->l_action);
> +
> +	spin_lock_irqsave(&lockres->l_lock, flags);
> +	/*
> +	 * Fake a blocking ast for the downconvert code.
> +	 */
> +	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
> +	lockres->l_blocking = LKM_EXMODE;
> +
> +	ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
> +	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
> +	spin_unlock_irqrestore(&lockres->l_lock, flags);
> +
> +	ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0);
> +	if (ret) {
> +		mlog_errno(ret);
> +		return;
> +	}
> +
> +	ret = ocfs2_wait_for_mask(&mw);
> +	if (ret)
> +		mlog_errno(ret);
> +}
> +
>  static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
>  				 struct ocfs2_lock_res *lockres)
>  {
> diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
> index 87a785e..5a58f8b 100644
> --- a/fs/ocfs2/dlmglue.h
> +++ b/fs/ocfs2/dlmglue.h
> @@ -66,6 +66,9 @@ void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
>  			       struct inode *inode);
>  void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
>  				u64 parent, struct inode *inode);
> +struct ocfs2_file_private;
> +void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
> +			      struct ocfs2_file_private *fp);
>  void ocfs2_lock_res_free(struct ocfs2_lock_res *res);
>  int ocfs2_create_new_inode_locks(struct inode *inode);
>  int ocfs2_drop_inode_locks(struct inode *inode);
> @@ -107,6 +110,8 @@ int ocfs2_rename_lock(struct ocfs2_super *osb);
>  void ocfs2_rename_unlock(struct ocfs2_super *osb);
>  int ocfs2_dentry_lock(struct dentry *dentry, int ex);
>  void ocfs2_dentry_unlock(struct dentry *dentry, int ex);
> +int ocfs2_file_lock(struct file *file, int ex, int trylock);
> +void ocfs2_file_unlock(struct file *file);
>  
>  void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres);
>  void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
> diff --git a/fs/ocfs2/file.h b/fs/ocfs2/file.h
> index 066f14a..048ddca 100644
> --- a/fs/ocfs2/file.h
> +++ b/fs/ocfs2/file.h
> @@ -32,6 +32,12 @@ extern const struct inode_operations ocfs2_file_iops;
>  extern const struct inode_operations ocfs2_special_file_iops;
>  struct ocfs2_alloc_context;
>  
> +struct ocfs2_file_private {
> +	struct file		*fp_file;
> +	struct mutex		fp_mutex;
> +	struct ocfs2_lock_res	fp_flock;
> +};
> +
>  enum ocfs2_alloc_restarted {
>  	RESTART_NONE = 0,
>  	RESTART_TRANS,
> diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
> index 60a23e1..9c34b83 100644
> --- a/fs/ocfs2/ocfs2.h
> +++ b/fs/ocfs2/ocfs2.h
> @@ -101,6 +101,7 @@ enum ocfs2_unlock_action {
>  					       * about to be
>  					       * dropped. */
>  #define OCFS2_LOCK_QUEUED        (0x00000100) /* queued for downconvert */
> +#define OCFS2_LOCK_NOCACHE       (0x00000200) /* don't use a holder count */
>  
>  struct ocfs2_lock_res_ops;
>  
> diff --git a/fs/ocfs2/ocfs2_lockid.h b/fs/ocfs2/ocfs2_lockid.h
> index 4ca02b1..86f3e37 100644
> --- a/fs/ocfs2/ocfs2_lockid.h
> +++ b/fs/ocfs2/ocfs2_lockid.h
> @@ -45,6 +45,7 @@ enum ocfs2_lock_type {
>  	OCFS2_LOCK_TYPE_RW,
>  	OCFS2_LOCK_TYPE_DENTRY,
>  	OCFS2_LOCK_TYPE_OPEN,
> +	OCFS2_LOCK_TYPE_FLOCK,
>  	OCFS2_NUM_LOCK_TYPES
>  };
>  
> @@ -73,6 +74,9 @@ static inline char ocfs2_lock_type_char(enum ocfs2_lock_type type)
>  		case OCFS2_LOCK_TYPE_OPEN:
>  			c = 'O';
>  			break;
> +		case OCFS2_LOCK_TYPE_FLOCK:
> +			c = 'F';
> +			break;
>  		default:
>  			c = '\0';
>  	}
> @@ -90,6 +94,7 @@ static char *ocfs2_lock_type_strings[] = {
>  	[OCFS2_LOCK_TYPE_RW] = "Write/Read",
>  	[OCFS2_LOCK_TYPE_DENTRY] = "Dentry",
>  	[OCFS2_LOCK_TYPE_OPEN] = "Open",
> +	[OCFS2_LOCK_TYPE_FLOCK] = "Flock",
>  };
>  
>  static inline const char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
> -- 
> 1.5.3.6
> 
> 
> _______________________________________________
> Ocfs2-devel mailing list
> Ocfs2-devel at oss.oracle.com
> http://oss.oracle.com/mailman/listinfo/ocfs2-devel

-- 

Life's Little Instruction Book #452

	"Never compromise your integrity."

Joel Becker
Principal Software Developer
Oracle
E-mail: joel.becker at oracle.com
Phone: (650) 506-8127



More information about the Ocfs2-devel mailing list