[Ocfs2-devel] [PATCH] ocfs2: re-queue AST or BAST if sending is failed to improve the reliability

Gang He ghe at suse.com
Mon Aug 7 00:43:17 PDT 2017


Base on your description, this case should be a corner case, NOT a fatal error.
Should we use mlog(ML_NOTICE, ...) to print these logs?

Thanks
Gang


>>> 
> Hi,
> 
> In current code, while flushing AST, we don't handle an exception that
> sending AST or BAST is failed.
> But it is indeed possible that AST or BAST is lost due to some kind of
> networks fault.
> 
> If above exception happens, the requesting node will never obtain an AST
> back, hence, it will never acquire the lock or abort current locking.
> 
> With this patch, I'd like to fix this issue by re-queuing the AST or
> BAST if sending is failed due to networks fault.
> 
> And the re-queuing AST or BAST will be dropped if the requesting node is
> dead!
> 
> It will improve the reliability a lot.
> 
> 
> Thanks.
> 
> Changwei.
> 
> Signed-off-by: Changwei Ge <ge.changwei at h3c.com>
> ---
>  fs/ocfs2/dlm/dlmrecovery.c |   51
> ++++++++++++++++++++++++++++++++++++++++++--
>  fs/ocfs2/dlm/dlmthread.c   |   39 +++++++++++++++++++++++++++------
>  2 files changed, 81 insertions(+), 9 deletions(-)
> 
> diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
> index 74407c6..ddfaf74 100644
> --- a/fs/ocfs2/dlm/dlmrecovery.c
> +++ b/fs/ocfs2/dlm/dlmrecovery.c
> @@ -2263,11 +2263,45 @@ static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
>      }
>  }
>  
> +static int dlm_drop_pending_ast_bast(struct dlm_ctxt *dlm,
> +                     struct dlm_lock *lock)
> +{
> +    int reserved = 0;
> +
> +    spin_lock(&dlm->ast_lock);
> +    if (!list_empty(&lock->ast_list)) {
> +        mlog(0, "%s: drop pending AST for lock(cookie=%u:%llu).\n",
> +             dlm->name,
> +             dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
> +             dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)));
> +        list_del_init(&lock->ast_list);
> +        lock->ast_pending = 0;
> +        dlm_lock_put(lock);
> +        reserved++;
> +    }
> +
> +    if (!list_empty(&lock->bast_list)) {
> +        mlog(0, "%s: drop pending BAST for lock(cookie=%u:%llu).\n",
> +             dlm->name,
> +             dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
> +             dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)));
> +        list_del_init(&lock->bast_list);
> +        lock->bast_pending = 0;
> +        dlm_lock_put(lock);
> +        reserved++;
> +    }
> +    spin_unlock(&dlm->ast_lock);
> +
> +    return reserved;
> +}
> +
>  static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
> -                struct dlm_lock_resource *res, u8 dead_node)
> +                struct dlm_lock_resource *res, u8 dead_node,
> +                int *reserved)
>  {
>      struct dlm_lock *lock, *next;
>      unsigned int freed = 0;
> +    int reserved_tmp = 0;
>  
>      /* this node is the lockres master:
>       * 1) remove any stale locks for the dead node
> @@ -2284,6 +2318,9 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
>          if (lock->ml.node == dead_node) {
>              list_del_init(&lock->list);
>              dlm_lock_put(lock);
> +
> +            reserved_tmp += dlm_drop_pending_ast_bast(dlm, lock);
> +
>              /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
>              dlm_lock_put(lock);
>              freed++;
> @@ -2293,6 +2330,9 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
>          if (lock->ml.node == dead_node) {
>              list_del_init(&lock->list);
>              dlm_lock_put(lock);
> +
> +            reserved_tmp += dlm_drop_pending_ast_bast(dlm, lock);
> +
>              /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
>              dlm_lock_put(lock);
>              freed++;
> @@ -2308,6 +2348,8 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
>          }
>      }
>  
> +    *reserved = reserved_tmp;
> +
>      if (freed) {
>          mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
>               "dropping ref from lockres\n", dlm->name,
> @@ -2367,6 +2409,7 @@ static void dlm_do_local_recovery_cleanup(struct
> dlm_ctxt *dlm, u8 dead_node)
>      for (i = 0; i < DLM_HASH_BUCKETS; i++) {
>          bucket = dlm_lockres_hash(dlm, i);
>          hlist_for_each_entry_safe(res, tmp, bucket, hash_node) {
> +            int reserved = 0;
>               /* always prune any $RECOVERY entries for dead nodes,
>                * otherwise hangs can occur during later recovery */
>              if (dlm_is_recovery_lock(res->lockname.name,
> @@ -2420,7 +2463,7 @@ static void dlm_do_local_recovery_cleanup(struct
> dlm_ctxt *dlm, u8 dead_node)
>                      continue;
>                  }
>              } else if (res->owner == dlm->node_num) {
> -                dlm_free_dead_locks(dlm, res, dead_node);
> +                dlm_free_dead_locks(dlm, res, dead_node, &reserved);
>                  __dlm_lockres_calc_usage(dlm, res);
>              } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
>                  if (test_bit(dead_node, res->refmap)) {
> @@ -2432,6 +2475,10 @@ static void dlm_do_local_recovery_cleanup(struct
> dlm_ctxt *dlm, u8 dead_node)
>                  }
>              }
>              spin_unlock(&res->spinlock);
> +            while (reserved) {
> +                dlm_lockres_release_ast(dlm, res);
> +                reserved--;
> +            }
>          }
>      }
>  
> diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
> index 838a06d..c34a619 100644
> --- a/fs/ocfs2/dlm/dlmthread.c
> +++ b/fs/ocfs2/dlm/dlmthread.c
> @@ -587,13 +587,13 @@ static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
>  
>  static void dlm_flush_asts(struct dlm_ctxt *dlm)
>  {
> -    int ret;
> +    int ret = 0;
>      struct dlm_lock *lock;
>      struct dlm_lock_resource *res;
>      u8 hi;
>  
>      spin_lock(&dlm->ast_lock);
> -    while (!list_empty(&dlm->pending_asts)) {
> +    while (!list_empty(&dlm->pending_asts) && !ret) {
>          lock = list_entry(dlm->pending_asts.next,
>                    struct dlm_lock, ast_list);
>          /* get an extra ref on lock */
> @@ -628,8 +628,20 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
>              mlog(0, "%s: res %.*s, AST queued while flushing last "
>                   "one\n", dlm->name, res->lockname.len,
>                   res->lockname.name);
> -        } else
> -            lock->ast_pending = 0;
> +        } else {
> +            if (unlikely(ret < 0)) {
> +                /* If this AST is not sent back successfully,
> +                 * there is no chance that the second lock
> +                 * request comes.
> +                 */
> +                spin_lock(&res->spinlock);
> +                __dlm_lockres_reserve_ast(res);
> +                spin_unlock(&res->spinlock);
> +                __dlm_queue_ast(dlm, lock);
> +            } else {
> +                lock->ast_pending = 0;
> +            }
> +        }
>  
>          /* drop the extra ref.
>           * this may drop it completely. */
> @@ -637,7 +649,9 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
>          dlm_lockres_release_ast(dlm, res);
>      }
>  
> -    while (!list_empty(&dlm->pending_basts)) {
> +    ret = 0;
> +
> +    while (!list_empty(&dlm->pending_basts) && !ret) {
>          lock = list_entry(dlm->pending_basts.next,
>                    struct dlm_lock, bast_list);
>          /* get an extra ref on lock */
> @@ -650,7 +664,6 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
>          spin_lock(&lock->spinlock);
>          BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
>          hi = lock->ml.highest_blocked;
> -        lock->ml.highest_blocked = LKM_IVMODE;
>          spin_unlock(&lock->spinlock);
>  
>          /* remove from list (including ref) */
> @@ -681,7 +694,19 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
>                   "one\n", dlm->name, res->lockname.len,
>                   res->lockname.name);
>          } else
> -            lock->bast_pending = 0;
> +            if (unlikely(ret)) {
> +                spin_lock(&res->spinlock);
> +                __dlm_lockres_reserve_ast(res);
> +                spin_unlock(&res->spinlock);
> +                __dlm_queue_bast(dlm, lock);
> +            } else {
> +                lock->bast_pending = 0;
> +                /* Set ::highest_blocked to invalid after
> +                 * sending BAST successfully so that
> +                 * no more BAST would be queued.
> +                 */
> +                lock->ml.highest_blocked = LKM_IVMODE;
> +            }
>  
>          /* drop the extra ref.
>           * this may drop it completely. */
> -- 
> 1.7.9.5
> 
> 
> _______________________________________________
> Ocfs2-devel mailing list
> Ocfs2-devel at oss.oracle.com 
> https://oss.oracle.com/mailman/listinfo/ocfs2-devel



More information about the Ocfs2-devel mailing list