[Ocfs2-devel] [PATCH] ocfs2: re-queue AST or BAST if sending is failed to improve the reliability
Gang He
ghe at suse.com
Mon Aug 7 00:43:17 PDT 2017
Base on your description, this case should be a corner case, NOT a fatal error.
Should we use mlog(ML_NOTICE, ...) to print these logs?
Thanks
Gang
>>>
> Hi,
>
> In current code, while flushing AST, we don't handle an exception that
> sending AST or BAST is failed.
> But it is indeed possible that AST or BAST is lost due to some kind of
> networks fault.
>
> If above exception happens, the requesting node will never obtain an AST
> back, hence, it will never acquire the lock or abort current locking.
>
> With this patch, I'd like to fix this issue by re-queuing the AST or
> BAST if sending is failed due to networks fault.
>
> And the re-queuing AST or BAST will be dropped if the requesting node is
> dead!
>
> It will improve the reliability a lot.
>
>
> Thanks.
>
> Changwei.
>
> Signed-off-by: Changwei Ge <ge.changwei at h3c.com>
> ---
> fs/ocfs2/dlm/dlmrecovery.c | 51
> ++++++++++++++++++++++++++++++++++++++++++--
> fs/ocfs2/dlm/dlmthread.c | 39 +++++++++++++++++++++++++++------
> 2 files changed, 81 insertions(+), 9 deletions(-)
>
> diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
> index 74407c6..ddfaf74 100644
> --- a/fs/ocfs2/dlm/dlmrecovery.c
> +++ b/fs/ocfs2/dlm/dlmrecovery.c
> @@ -2263,11 +2263,45 @@ static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
> }
> }
>
> +static int dlm_drop_pending_ast_bast(struct dlm_ctxt *dlm,
> + struct dlm_lock *lock)
> +{
> + int reserved = 0;
> +
> + spin_lock(&dlm->ast_lock);
> + if (!list_empty(&lock->ast_list)) {
> + mlog(0, "%s: drop pending AST for lock(cookie=%u:%llu).\n",
> + dlm->name,
> + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
> + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)));
> + list_del_init(&lock->ast_list);
> + lock->ast_pending = 0;
> + dlm_lock_put(lock);
> + reserved++;
> + }
> +
> + if (!list_empty(&lock->bast_list)) {
> + mlog(0, "%s: drop pending BAST for lock(cookie=%u:%llu).\n",
> + dlm->name,
> + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
> + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)));
> + list_del_init(&lock->bast_list);
> + lock->bast_pending = 0;
> + dlm_lock_put(lock);
> + reserved++;
> + }
> + spin_unlock(&dlm->ast_lock);
> +
> + return reserved;
> +}
> +
> static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
> - struct dlm_lock_resource *res, u8 dead_node)
> + struct dlm_lock_resource *res, u8 dead_node,
> + int *reserved)
> {
> struct dlm_lock *lock, *next;
> unsigned int freed = 0;
> + int reserved_tmp = 0;
>
> /* this node is the lockres master:
> * 1) remove any stale locks for the dead node
> @@ -2284,6 +2318,9 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
> if (lock->ml.node == dead_node) {
> list_del_init(&lock->list);
> dlm_lock_put(lock);
> +
> + reserved_tmp += dlm_drop_pending_ast_bast(dlm, lock);
> +
> /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
> dlm_lock_put(lock);
> freed++;
> @@ -2293,6 +2330,9 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
> if (lock->ml.node == dead_node) {
> list_del_init(&lock->list);
> dlm_lock_put(lock);
> +
> + reserved_tmp += dlm_drop_pending_ast_bast(dlm, lock);
> +
> /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
> dlm_lock_put(lock);
> freed++;
> @@ -2308,6 +2348,8 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
> }
> }
>
> + *reserved = reserved_tmp;
> +
> if (freed) {
> mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
> "dropping ref from lockres\n", dlm->name,
> @@ -2367,6 +2409,7 @@ static void dlm_do_local_recovery_cleanup(struct
> dlm_ctxt *dlm, u8 dead_node)
> for (i = 0; i < DLM_HASH_BUCKETS; i++) {
> bucket = dlm_lockres_hash(dlm, i);
> hlist_for_each_entry_safe(res, tmp, bucket, hash_node) {
> + int reserved = 0;
> /* always prune any $RECOVERY entries for dead nodes,
> * otherwise hangs can occur during later recovery */
> if (dlm_is_recovery_lock(res->lockname.name,
> @@ -2420,7 +2463,7 @@ static void dlm_do_local_recovery_cleanup(struct
> dlm_ctxt *dlm, u8 dead_node)
> continue;
> }
> } else if (res->owner == dlm->node_num) {
> - dlm_free_dead_locks(dlm, res, dead_node);
> + dlm_free_dead_locks(dlm, res, dead_node, &reserved);
> __dlm_lockres_calc_usage(dlm, res);
> } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
> if (test_bit(dead_node, res->refmap)) {
> @@ -2432,6 +2475,10 @@ static void dlm_do_local_recovery_cleanup(struct
> dlm_ctxt *dlm, u8 dead_node)
> }
> }
> spin_unlock(&res->spinlock);
> + while (reserved) {
> + dlm_lockres_release_ast(dlm, res);
> + reserved--;
> + }
> }
> }
>
> diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
> index 838a06d..c34a619 100644
> --- a/fs/ocfs2/dlm/dlmthread.c
> +++ b/fs/ocfs2/dlm/dlmthread.c
> @@ -587,13 +587,13 @@ static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
>
> static void dlm_flush_asts(struct dlm_ctxt *dlm)
> {
> - int ret;
> + int ret = 0;
> struct dlm_lock *lock;
> struct dlm_lock_resource *res;
> u8 hi;
>
> spin_lock(&dlm->ast_lock);
> - while (!list_empty(&dlm->pending_asts)) {
> + while (!list_empty(&dlm->pending_asts) && !ret) {
> lock = list_entry(dlm->pending_asts.next,
> struct dlm_lock, ast_list);
> /* get an extra ref on lock */
> @@ -628,8 +628,20 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
> mlog(0, "%s: res %.*s, AST queued while flushing last "
> "one\n", dlm->name, res->lockname.len,
> res->lockname.name);
> - } else
> - lock->ast_pending = 0;
> + } else {
> + if (unlikely(ret < 0)) {
> + /* If this AST is not sent back successfully,
> + * there is no chance that the second lock
> + * request comes.
> + */
> + spin_lock(&res->spinlock);
> + __dlm_lockres_reserve_ast(res);
> + spin_unlock(&res->spinlock);
> + __dlm_queue_ast(dlm, lock);
> + } else {
> + lock->ast_pending = 0;
> + }
> + }
>
> /* drop the extra ref.
> * this may drop it completely. */
> @@ -637,7 +649,9 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
> dlm_lockres_release_ast(dlm, res);
> }
>
> - while (!list_empty(&dlm->pending_basts)) {
> + ret = 0;
> +
> + while (!list_empty(&dlm->pending_basts) && !ret) {
> lock = list_entry(dlm->pending_basts.next,
> struct dlm_lock, bast_list);
> /* get an extra ref on lock */
> @@ -650,7 +664,6 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
> spin_lock(&lock->spinlock);
> BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
> hi = lock->ml.highest_blocked;
> - lock->ml.highest_blocked = LKM_IVMODE;
> spin_unlock(&lock->spinlock);
>
> /* remove from list (including ref) */
> @@ -681,7 +694,19 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
> "one\n", dlm->name, res->lockname.len,
> res->lockname.name);
> } else
> - lock->bast_pending = 0;
> + if (unlikely(ret)) {
> + spin_lock(&res->spinlock);
> + __dlm_lockres_reserve_ast(res);
> + spin_unlock(&res->spinlock);
> + __dlm_queue_bast(dlm, lock);
> + } else {
> + lock->bast_pending = 0;
> + /* Set ::highest_blocked to invalid after
> + * sending BAST successfully so that
> + * no more BAST would be queued.
> + */
> + lock->ml.highest_blocked = LKM_IVMODE;
> + }
>
> /* drop the extra ref.
> * this may drop it completely. */
> --
> 1.7.9.5
>
>
> _______________________________________________
> Ocfs2-devel mailing list
> Ocfs2-devel at oss.oracle.com
> https://oss.oracle.com/mailman/listinfo/ocfs2-devel
More information about the Ocfs2-devel
mailing list