[Ocfs2-commits] mfasheh commits r1959 - trunk/fs/ocfs2/dlm
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Mon Mar 7 18:55:02 CST 2005
Author: mfasheh
Signed-off-by: khackel
Date: 2005-03-07 18:55:01 -0600 (Mon, 07 Mar 2005)
New Revision: 1959
Modified:
trunk/fs/ocfs2/dlm/dlmlock.c
trunk/fs/ocfs2/dlm/dlmmaster.c
trunk/fs/ocfs2/dlm/dlmmod.c
trunk/fs/ocfs2/dlm/dlmmod.h
trunk/fs/ocfs2/dlm/dlmthread.c
trunk/fs/ocfs2/dlm/dlmunlock.c
Log:
* implement a 'purge' list of unused lock resources
* change dlmthread to wake up periodically and run the purge list
* teach all paths (dlmlock, dlmunlock, dlmthread) which may add or remove a
lock from a resource how to put / remove it from the purge list.
* have dlmlock take a ref on the resource it's dealing with (required for
this purge list stuff to work)
* while we're there, clean up the exit path -- this made dropping our ref
alot easier.
* move the code for not shuffling non local locks into dlm_kick_thread so
that they never get queued in the 1st place
Signed-off-by: khackel
Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c 2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmlock.c 2005-03-08 00:55:01 UTC (rev 1959)
@@ -130,6 +130,7 @@
if (dlm_do_ast(dlm, res, lock) < 0)
dlmprintk0("eek\n");
+ dlm_lockres_calc_usage(dlm, res);
dlm_kick_thread(dlm, res);
return status;
@@ -153,6 +154,7 @@
spin_lock(&res->spinlock);
if (res->state & DLM_LOCK_RES_RECOVERING) {
+ spin_unlock(&res->spinlock);
status = DLM_RECOVERING;
goto bail;
}
@@ -175,8 +177,11 @@
list_del(&lock->list);
lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
}
+ spin_unlock(&res->spinlock);
+
+ dlm_lockres_calc_usage(dlm, res);
bail:
- spin_unlock(&res->spinlock);
+
wake_up(&res->wq);
return status;
}
Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c 2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c 2005-03-08 00:55:01 UTC (rev 1959)
@@ -385,6 +385,7 @@
BUG_ON(!list_empty(&res->blocked));
BUG_ON(!list_empty(&res->dirty));
BUG_ON(!list_empty(&res->recovering));
+ BUG_ON(!list_empty(&res->purge));
kfree(res->lockname.name);
@@ -396,11 +397,27 @@
kref_get(&res->refs);
}
+dlm_lock_resource *dlm_lockres_grab(dlm_ctxt *dlm,
+ dlm_lock_resource *res)
+{
+ spin_lock(&dlm->spinlock);
+ __dlm_lockres_get(res);
+ spin_unlock(&dlm->spinlock);
+
+ return res;
+}
+
+void __dlm_lockres_put(dlm_ctxt *dlm,
+ dlm_lock_resource *res)
+{
+ kref_put(&res->refs, dlm_lockres_release);
+}
+
void dlm_lockres_put(dlm_ctxt *dlm,
dlm_lock_resource *res)
{
spin_lock(&dlm->spinlock);
- kref_put(&res->refs, dlm_lockres_release);
+ __dlm_lockres_put(dlm, res);
spin_unlock(&dlm->spinlock);
}
@@ -429,6 +446,7 @@
INIT_LIST_HEAD(&res->blocked);
INIT_LIST_HEAD(&res->dirty);
INIT_LIST_HEAD(&res->recovering);
+ INIT_LIST_HEAD(&res->purge);
kref_init(&res->refs, dlm_lockres_release);
@@ -439,6 +457,8 @@
res->state = DLM_LOCK_RES_IN_PROGRESS;
+ res->last_used = 0;
+
memset(res->lvb, 0, DLM_LVB_LEN);
}
Modified: trunk/fs/ocfs2/dlm/dlmmod.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.c 2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmmod.c 2005-03-08 00:55:01 UTC (rev 1959)
@@ -211,7 +211,7 @@
dlm_bastlockfunc_t *bast)
{
dlm_status status;
- dlm_lock_resource *res;
+ dlm_lock_resource *res = NULL;
dlm_lock *lock = NULL;
int convert = 0, recovery = 0;
@@ -220,21 +220,21 @@
status = DLM_BADPARAM;
if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE)
- goto error_status;
+ goto error;
if (flags & ~LKM_VALID_FLAGS)
- goto error_status;
+ goto error;
convert = (flags & LKM_CONVERT);
recovery = (flags & LKM_RECOVERY);
if (recovery && (!dlm_is_recovery_lock(name, strlen(name)) ||
convert) ) {
- goto error_status;
+ goto error;
}
if (convert && (flags & LKM_LOCAL)) {
dlmprintk0("strange LOCAL convert request!\n");
- goto error_status;
+ goto error;
}
if (convert) {
@@ -242,7 +242,7 @@
/* if converting, must pass in a valid dlm_lock */
if (!lksb->lockid || !lksb->lockid->lockres)
- goto error_status;
+ goto error;
lock = lksb->lockid;
/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are
@@ -258,9 +258,9 @@
dlmprintk(" orig args: lksb=%p, ast=%p, bast=%p, "
"astdata=%p\n", lock->lksb, lock->ast,
lock->bast, lock->astdata);
- goto error_status;
+ goto error;
}
- res = lock->lockres;
+ res = dlm_lockres_grab(dlm, lock->lockres);
down_read(&dlm->recovery_sem);
if (res->owner == dlm->group_index)
@@ -337,22 +337,23 @@
}
}
+up_error:
if (!recovery)
up_read(&dlm->recovery_sem);
- return status;
-up_error:
- if (!recovery)
- up_read(&dlm->recovery_sem);
error:
- if (lock && !convert) {
- kfree(lock);
- lksb->lockid = NULL;
+ if (status != DLM_NORMAL) {
+ if (lock && !convert) {
+ kfree(lock);
+ lksb->lockid = NULL;
+ }
+ // this is kind of unnecessary
+ lksb->status = status;
}
-error_status:
- // this is kind of unnecessary
- lksb->status = status;
+ if (res)
+ dlm_lockres_put(dlm, res);
+
return status;
}
EXPORT_SYMBOL(dlmlock);
@@ -369,7 +370,7 @@
dlm_lock_resource *res;
dlm_lock *lock = NULL;
int call_ast = 0;
-
+
dlmprintk0("\n");
if (!lksb)
@@ -387,8 +388,8 @@
return DLM_BADPARAM;
lock = lksb->lockid;
- res = lock->lockres;
-
+ res = dlm_lockres_grab(dlm, lock->lockres);
+
DLM_ASSERT(lock);
DLM_ASSERT(res);
dlmprintk("lock=%p res=%p\n", lock, res);
@@ -415,11 +416,22 @@
dlmprintk("kicking the thread\n");
dlm_kick_thread(dlm, res);
}
+
+ dlm_lockres_calc_usage(dlm, res);
+ dlm_lockres_put(dlm, res);
+
dlmprintk("returning status=%d!\n", status);
return status;
}
EXPORT_SYMBOL(dlmunlock);
+void __dlm_unhash_lock(dlm_ctxt *dlm,
+ dlm_lock_resource *lockres)
+{
+ list_del_init(&lockres->list);
+ __dlm_lockres_put(dlm, lockres);
+}
+
void __dlm_insert_lock(dlm_ctxt *dlm,
dlm_lock_resource *res)
{
@@ -710,6 +722,8 @@
INIT_LIST_HEAD(&dlm->dirty_list);
INIT_LIST_HEAD(&dlm->reco.resources);
INIT_LIST_HEAD(&dlm->reco.received);
+ INIT_LIST_HEAD(&dlm->purge_list);
+
dlm->dlm_thread_task = NULL;
init_waitqueue_head(&dlm->dlm_thread_wq);
INIT_LIST_HEAD(&dlm->master_list);
Modified: trunk/fs/ocfs2/dlm/dlmmod.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.h 2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmmod.h 2005-03-08 00:55:01 UTC (rev 1959)
@@ -205,6 +205,8 @@
struct list_head list;
struct list_head *resources;
struct list_head dirty_list;
+ struct list_head purge_list;
+ unsigned int purge_count;
spinlock_t spinlock;
struct rw_semaphore recovery_sem;
char *name;
@@ -239,6 +241,8 @@
#define DLM_LOCK_RES_DIRTY 0x00000008
#define DLM_LOCK_RES_IN_PROGRESS 0x00000010
+#define DLM_PURGE_INTERVAL_MS (8 * 1000)
+
typedef struct _dlm_lock_resource
{
/* WARNING: Please see the comment in dlm_init_lockres before
@@ -249,11 +253,17 @@
/* please keep these next 3 in this order
* some funcs want to iterate over all lists */
struct list_head granted;
- struct list_head converting;
+ struct list_head converting;
struct list_head blocked;
struct list_head dirty;
struct list_head recovering; // dlm_recovery_ctxt.resources list
+
+ /* unused lock resources have their last_used stamped and are
+ * put on a list for the dlm thread to run. */
+ struct list_head purge;
+ unsigned long last_used;
+
spinlock_t spinlock;
wait_queue_head_t wq;
u8 owner; //node which owns the lock resource, or unknown
@@ -593,21 +603,26 @@
dlm_status dlmconvert_remote(dlm_ctxt *dlm, dlm_lock_resource *res,
dlm_lock *lock, int flags, int type);
-dlm_status dlmunlock(dlm_ctxt *dlm, dlm_lockstatus *lksb, int flags,
+dlm_status dlmunlock(dlm_ctxt *dlm, dlm_lockstatus *lksb, int flags,
dlm_astunlockfunc_t *unlockast, void *data);
-dlm_status dlmunlock_common(dlm_ctxt *dlm, dlm_lock_resource *res,
- dlm_lock *lock, dlm_lockstatus *lksb,
+dlm_status dlmunlock_common(dlm_ctxt *dlm, dlm_lock_resource *res,
+ dlm_lock *lock, dlm_lockstatus *lksb,
int flags, int *call_ast, int master_node);
-static inline dlm_status dlmunlock_master(dlm_ctxt *dlm, dlm_lock_resource *res,
- dlm_lock *lock, dlm_lockstatus *lksb,
- int flags, int *call_ast)
+static inline dlm_status dlmunlock_master(dlm_ctxt *dlm,
+ dlm_lock_resource *res,
+ dlm_lock *lock,
+ dlm_lockstatus *lksb,
+ int flags,
+ int *call_ast)
{
return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 1);
}
-static inline dlm_status dlmunlock_remote(dlm_ctxt *dlm, dlm_lock_resource *res,
- dlm_lock *lock, dlm_lockstatus *lksb,
- int flags, int *call_ast)
+static inline dlm_status dlmunlock_remote(dlm_ctxt *dlm,
+ dlm_lock_resource *res,
+ dlm_lock *lock,
+ dlm_lockstatus *lksb,
+ int flags, int *call_ast)
{
return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 0);
}
@@ -620,9 +635,18 @@
void dlm_put(dlm_ctxt *dlm);
dlm_ctxt *dlm_grab(dlm_ctxt *dlm);
+void dlm_lockres_calc_usage(dlm_ctxt *dlm,
+ dlm_lock_resource *res);
void __dlm_lockres_get(dlm_lock_resource *res);
+
+dlm_lock_resource *dlm_lockres_grab(dlm_ctxt *dlm,
+ dlm_lock_resource *res);
+void __dlm_lockres_put(dlm_ctxt *dlm,
+ dlm_lock_resource *res);
void dlm_lockres_put(dlm_ctxt *dlm,
dlm_lock_resource *res);
+void __dlm_unhash_lock(dlm_ctxt *dlm,
+ dlm_lock_resource *res);
void __dlm_insert_lock(dlm_ctxt *dlm,
dlm_lock_resource *res);
dlm_lock_resource * __dlm_lookup_lock(dlm_ctxt *dlm,
Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c 2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmthread.c 2005-03-08 00:55:01 UTC (rev 1959)
@@ -61,10 +61,127 @@
#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->group_index)
-/*
- * DLM THREAD
- */
+static int __dlm_lockres_unused(dlm_lock_resource *res)
+{
+ if (list_empty(&res->granted) &&
+ list_empty(&res->converting) &&
+ list_empty(&res->blocked) &&
+ list_empty(&res->dirty))
+ return 1;
+ return 0;
+}
+/* Call whenever you may have added or deleted something from one of
+ * the lockres queue's. This will figure out whether it belongs on the
+ * unused list or not and does the appropriate thing. */
+static void __dlm_lockres_calc_usage(dlm_ctxt *dlm,
+ dlm_lock_resource *res)
+{
+ assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&res->spinlock);
+
+ if (__dlm_lockres_unused(res)) {
+ dlmprintk("putting lockres %.*s from purge list\n",
+ res->lockname.len, res->lockname.name);
+
+ res->last_used = jiffies;
+ list_add_tail(&res->purge,
+ &dlm->purge_list);
+ dlm->purge_count++;
+ } else if (!list_empty(&res->purge)) {
+ dlmprintk("removing lockres %.*s from purge list\n",
+ res->lockname.len, res->lockname.name);
+
+ list_del_init(&res->purge);
+ dlm->purge_count--;
+ }
+}
+
+void dlm_lockres_calc_usage(dlm_ctxt *dlm,
+ dlm_lock_resource *res)
+{
+ spin_lock(&dlm->spinlock);
+ spin_lock(&res->spinlock);
+
+ __dlm_lockres_calc_usage(dlm, res);
+
+ spin_unlock(&res->spinlock);
+ spin_unlock(&dlm->spinlock);
+}
+
+/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
+ * to do migration, but will re-acquire before exit. */
+static void dlm_purge_lockres(dlm_ctxt *dlm,
+ dlm_lock_resource *lockres)
+{
+ int master;
+
+ /* Since we can't migrate locks yet, for now we only handle
+ * non locally mastered locks. */
+ spin_lock(&lockres->spinlock);
+ master = lockres->owner == dlm->group_index;
+ spin_unlock(&lockres->spinlock);
+
+ dlmprintk("purging lockres %.*s, master = %d\n", lockres->lockname.len,
+ lockres->lockname.name, master);
+
+ /* Non master is the easy case -- no migration required, just
+ * quit. */
+ if (!master)
+ __dlm_unhash_lock(dlm, lockres);
+
+ /* TODO: Wheee! Migrate lockres here! */
+}
+
+static void dlm_run_purge_list(dlm_ctxt *dlm)
+{
+ unsigned int run_max, unused;
+ unsigned long purge_jiffies;
+ dlm_lock_resource *lockres;
+
+ spin_lock(&dlm->spinlock);
+ run_max = dlm->purge_count;
+
+ while(run_max && !list_empty(&dlm->purge_list)) {
+ run_max--;
+
+ lockres = list_entry(dlm->purge_list.next,
+ dlm_lock_resource, purge);
+ list_del_init(&lockres->purge);
+ dlm->purge_count--;
+
+ /* Status of the lockres *might* change so double
+ * check. If the lockres is unused, holding the dlm
+ * spinlock will prevent people from getting and more
+ * refs on it -- there's no need to keep the lockres
+ * spinlock. */
+ spin_lock(&lockres->spinlock);
+ unused = __dlm_lockres_unused(lockres);
+ spin_unlock(&lockres->spinlock);
+
+ if (!unused)
+ continue;
+
+ purge_jiffies = lockres->last_used +
+ msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
+
+ /* Make sure that we want to be processing this guy at
+ * this time. */
+ if (time_after(purge_jiffies, jiffies)) {
+ /* re-add to be processed at a later date. */
+ list_add_tail(&lockres->purge, &dlm->purge_list);
+ dlm->purge_count++;
+ continue;
+ }
+
+ /* This may drop and reacquire the dlm spinlock if it
+ * has to do migration. */
+ dlm_purge_lockres(dlm, lockres);
+ }
+
+ spin_unlock(&dlm->spinlock);
+}
+
void dlm_shuffle_lists(dlm_ctxt *dlm, dlm_lock_resource *res)
{
dlm_lock *lock, *target;
@@ -232,17 +349,20 @@
spin_unlock(&res->spinlock);
}
-
/* must have NO locks when calling this */
void dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res)
{
if (res) {
spin_lock(&dlm->spinlock);
spin_lock(&res->spinlock);
- if (!(res->state & DLM_LOCK_RES_DIRTY)) {
+
+ /* don't shuffle secondary queues */
+ if ((res->owner == dlm->group_index) &&
+ !(res->state & DLM_LOCK_RES_DIRTY)) {
list_add_tail(&res->dirty, &dlm->dirty_list);
res->state |= DLM_LOCK_RES_DIRTY;
}
+
spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
}
@@ -286,36 +406,45 @@
return empty;
}
+#define DLM_THREAD_TIMEOUT_MS (4 * 1000)
+
static int dlm_thread(void *data)
{
struct list_head *iter, *tmpiter;
dlm_lock_resource *res;
dlm_ctxt *dlm = data;
+ unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
dlmprintk("dlm thread running for %s...\n", dlm->name);
while (!kthread_should_stop()) {
+ dlm_run_purge_list(dlm);
+
down_read(&dlm->recovery_sem);
spin_lock(&dlm->spinlock);
list_for_each_safe(iter, tmpiter, &dlm->dirty_list) {
res = list_entry(iter, dlm_lock_resource, dirty);
- /* don't shuffle secondary queues */
- if (res->owner != dlm->group_index)
- continue;
+
spin_lock(&res->spinlock);
- list_del(&res->dirty);
+ list_del_init(&res->dirty);
res->state &= ~DLM_LOCK_RES_DIRTY;
+ BUG_ON(res->owner != dlm->group_index);
spin_unlock(&res->spinlock);
dlm_shuffle_lists(dlm, res);
+
+ spin_lock(&res->spinlock);
+ __dlm_lockres_calc_usage(dlm, res);
+ spin_unlock(&res->spinlock);
}
spin_unlock(&dlm->spinlock);
up_read(&dlm->recovery_sem);
- wait_event_interruptible(dlm->dlm_thread_wq,
- !dlm_dirty_list_empty(dlm) ||
- kthread_should_stop());
+ wait_event_interruptible_timeout(dlm->dlm_thread_wq,
+ !dlm_dirty_list_empty(dlm) ||
+ kthread_should_stop(),
+ timeout);
}
dlmprintk0("quitting DLM thread\n");
Modified: trunk/fs/ocfs2/dlm/dlmunlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmunlock.c 2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmunlock.c 2005-03-08 00:55:01 UTC (rev 1959)
@@ -146,13 +146,12 @@
spin_lock(&res->spinlock);
spin_lock(&lock->spinlock);
}
-
+
if (actions & DLM_UNLOCK_REMOVE_LOCK)
list_del(&lock->list);
if (actions & DLM_UNLOCK_REGRANT_LOCK)
list_add_tail(&lock->list, &res->granted);
-
leave:
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
spin_unlock(&lock->spinlock);
More information about the Ocfs2-commits
mailing list