[Ocfs2-devel] [PATCH 1/2] ocfs2 fix o2dlm dlm run purgelist
Srinivas Eeda
srinivas.eeda at oracle.com
Tue Jun 15 21:43:02 PDT 2010
There are two problems in dlm_run_purgelist
1. If a lockres is found to be in use, dlm_run_purgelist keeps trying to purge
the same lockres instead of trying the next lockres.
2. When a lockres is found unused, dlm_run_purgelist releases lockres spinlock
before setting DLM_LOCK_RES_DROPPING_REF and calls dlm_purge_lockres.
spinlock is reacquired but in this window lockres can get reused. This leads
to BUG.
This patch modifies dlm_run_purgelist to skip lockres if it's in use and purge
next lockres. It also sets DLM_LOCK_RES_DROPPING_REF before releasing the
lockres spinlock protecting it from getting reused.
Signed-off-by: Srinivas Eeda <srinivas.eeda at oracle.com>
---
fs/ocfs2/dlm/dlmthread.c | 125 +++++++++++++++++++++++-----------------------
1 files changed, 63 insertions(+), 62 deletions(-)
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index 11a6d1f..fb0be6c 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -158,39 +158,17 @@ static int dlm_purge_lockres(struct dlm_ctxt *dlm,
int master;
int ret = 0;
- spin_lock(&res->spinlock);
- if (!__dlm_lockres_unused(res)) {
- mlog(0, "%s:%.*s: tried to purge but not unused\n",
- dlm->name, res->lockname.len, res->lockname.name);
- __dlm_print_one_lock_resource(res);
- spin_unlock(&res->spinlock);
- BUG();
- }
-
- if (res->state & DLM_LOCK_RES_MIGRATING) {
- mlog(0, "%s:%.*s: Delay dropref as this lockres is "
- "being remastered\n", dlm->name, res->lockname.len,
- res->lockname.name);
- /* Re-add the lockres to the end of the purge list */
- if (!list_empty(&res->purge)) {
- list_del_init(&res->purge);
- list_add_tail(&res->purge, &dlm->purge_list);
- }
- spin_unlock(&res->spinlock);
- return 0;
- }
-
master = (res->owner == dlm->node_num);
if (!master)
res->state |= DLM_LOCK_RES_DROPPING_REF;
- spin_unlock(&res->spinlock);
mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
res->lockname.name, master);
if (!master) {
/* drop spinlock... retake below */
+ spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
spin_lock(&res->spinlock);
@@ -208,48 +186,37 @@ static int dlm_purge_lockres(struct dlm_ctxt *dlm,
mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
dlm->name, res->lockname.len, res->lockname.name, ret);
spin_lock(&dlm->spinlock);
+ spin_lock(&res->spinlock);
}
- spin_lock(&res->spinlock);
- if (!list_empty(&res->purge)) {
- mlog(0, "removing lockres %.*s:%p from purgelist, "
- "master = %d\n", res->lockname.len, res->lockname.name,
- res, master);
- list_del_init(&res->purge);
- spin_unlock(&res->spinlock);
- dlm_lockres_put(res);
- dlm->purge_count--;
- } else
- spin_unlock(&res->spinlock);
-
- __dlm_unhash_lockres(res);
-
/* lockres is not in the hash now. drop the flag and wake up
* any processes waiting in dlm_get_lock_resource. */
- if (!master) {
- spin_lock(&res->spinlock);
+ if (!master)
res->state &= ~DLM_LOCK_RES_DROPPING_REF;
- spin_unlock(&res->spinlock);
- wake_up(&res->wq);
- }
return 0;
}
static void dlm_run_purge_list(struct dlm_ctxt *dlm,
int purge_now)
{
- unsigned int run_max, unused;
+ unsigned int run_max;
unsigned long purge_jiffies;
struct dlm_lock_resource *lockres;
+ struct dlm_lock_resource *nextres;
spin_lock(&dlm->spinlock);
run_max = dlm->purge_count;
- while(run_max && !list_empty(&dlm->purge_list)) {
- run_max--;
+ if (list_empty(&dlm->purge_list)) {
+ spin_unlock(&dlm->spinlock);
+ return;
+ }
+
+ lockres = list_entry(dlm->purge_list.next,
+ struct dlm_lock_resource, purge);
- lockres = list_entry(dlm->purge_list.next,
- struct dlm_lock_resource, purge);
+ while(run_max && lockres && !list_empty(&dlm->purge_list)) {
+ run_max--;
/* Status of the lockres *might* change so double
* check. If the lockres is unused, holding the dlm
@@ -257,15 +224,12 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
* refs on it -- there's no need to keep the lockres
* spinlock. */
spin_lock(&lockres->spinlock);
- unused = __dlm_lockres_unused(lockres);
- spin_unlock(&lockres->spinlock);
-
- if (!unused)
- continue;
purge_jiffies = lockres->last_used +
msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
+ mlog(0, "purging lockres %.*s\n", lockres->lockname.len,
+ lockres->lockname.name);
/* Make sure that we want to be processing this guy at
* this time. */
if (!purge_now && time_after(purge_jiffies, jiffies)) {
@@ -273,20 +237,57 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
* in tail order, we can stop at the first
* unpurgable resource -- anyone added after
* him will have a greater last_used value */
+ spin_unlock(&lockres->spinlock);
break;
}
- dlm_lockres_get(lockres);
-
+ /* If lockres is being used, or migrating purge next lockres */
+ if (!__dlm_lockres_unused(lockres) ||
+ (lockres->state & DLM_LOCK_RES_MIGRATING)) {
+ if (!list_is_last(&lockres->purge, &dlm->purge_list))
+ nextres = list_entry(lockres->purge.next,
+ struct dlm_lock_resource, purge);
+ else
+ nextres = NULL;
+ spin_unlock(&lockres->spinlock);
+ lockres = nextres;
+ continue;
+ }
+
/* This may drop and reacquire the dlm spinlock if it
* has to do migration. */
- if (dlm_purge_lockres(dlm, lockres))
- BUG();
-
- dlm_lockres_put(lockres);
-
- /* Avoid adding any scheduling latencies */
- cond_resched_lock(&dlm->spinlock);
+ dlm_purge_lockres(dlm, lockres);
+
+ /* before we free the lockres we get the next lockres */
+ if (list_empty(&lockres->purge))
+ /* Shouldn't be in this state. Start from beginning */
+ nextres = list_entry(dlm->purge_list.next,
+ struct dlm_lock_resource, purge);
+ else if (!list_is_last(&lockres->purge, &dlm->purge_list))
+ nextres = list_entry(lockres->purge.next,
+ struct dlm_lock_resource, purge);
+ else
+ nextres = NULL;
+
+ if (__dlm_lockres_unused(lockres)) {
+ if (!list_empty(&lockres->purge)) {
+ list_del_init(&lockres->purge);
+ dlm->purge_count--;
+ }
+ __dlm_unhash_lockres(lockres);
+ spin_unlock(&lockres->spinlock);
+ wake_up(&lockres->wq);
+ dlm_lockres_put(lockres);
+ } else
+ spin_unlock(&lockres->spinlock);
+ lockres = nextres;
+
+ /* Avoid adding any scheduling latencies. If dlm spinlock is
+ * dropped, retry again from the beginning as purgelist could
+ * have been modified */
+ if (cond_resched_lock(&dlm->spinlock))
+ lockres = list_entry(dlm->purge_list.next,
+ struct dlm_lock_resource, purge);
}
spin_unlock(&dlm->spinlock);
@@ -733,7 +734,7 @@ in_progress:
/* unlikely, but we may need to give time to
* other tasks */
if (!--n) {
- mlog(0, "throttling dlm_thread\n");
+ mlog(0, "throttling dlm_thread n=%d\n", n);
break;
}
}
--
1.5.6.5
More information about the Ocfs2-devel
mailing list