[Ocfs2-commits] khackel commits r2129 - trunk/fs/ocfs2/dlm
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Fri Apr 8 19:16:37 CDT 2005
Author: khackel
Signed-off-by: mfasheh
Date: 2005-04-08 19:16:35 -0500 (Fri, 08 Apr 2005)
New Revision: 2129
Modified:
trunk/fs/ocfs2/dlm/dlmast.c
trunk/fs/ocfs2/dlm/dlmcommon.h
trunk/fs/ocfs2/dlm/dlmconvert.c
trunk/fs/ocfs2/dlm/dlmdomain.c
trunk/fs/ocfs2/dlm/dlmlock.c
trunk/fs/ocfs2/dlm/dlmmaster.c
trunk/fs/ocfs2/dlm/dlmrecovery.c
trunk/fs/ocfs2/dlm/dlmthread.c
trunk/fs/ocfs2/dlm/dlmunlock.c
Log:
* make lockres migration dependent upon AST flushing (no migration
until all ASTs and BASTs are flushed)
* added refcounting for dlm_lock
* ASTs and BASTs now use a reserve/claim/release system
* rework dlm_flush_asts (and added dlm->ast_lock) to do work without
holding the dlm lock
* rework dlm_thread dirty_list running to do work without
holding the dlm_lock
* handle res->state flags more uniformly in several paths
* uniformly attach and detach lockres and lock structures
* unlock is no longer unsafe
Signed-off-by: mfasheh
Modified: trunk/fs/ocfs2/dlm/dlmast.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmast.c 2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmast.c 2005-04-09 00:16:35 UTC (rev 2129)
@@ -58,11 +58,13 @@
DLM_ASSERT(dlm);
DLM_ASSERT(lock);
- assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&dlm->ast_lock);
DLM_ASSERT(list_empty(&lock->ast_list));
if (lock->ast_pending)
dlmprintk0("lock has an ast getting flushed right now\n");
+ /* putting lock on list, add a ref */
+ dlm_lock_get(lock);
spin_lock(&lock->spinlock);
list_add_tail(&lock->ast_list, &dlm->pending_asts);
lock->ast_pending = 1;
@@ -76,9 +78,9 @@
DLM_ASSERT(dlm);
DLM_ASSERT(lock);
- spin_lock(&dlm->spinlock);
+ spin_lock(&dlm->ast_lock);
__dlm_queue_ast(dlm, lock);
- spin_unlock(&dlm->spinlock);
+ spin_unlock(&dlm->ast_lock);
}
@@ -88,19 +90,32 @@
DLM_ASSERT(dlm);
DLM_ASSERT(lock);
- assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&dlm->ast_lock);
DLM_ASSERT(list_empty(&lock->bast_list));
if (lock->bast_pending)
dlmprintk0("lock has a bast getting flushed right now\n");
+ /* putting lock on list, add a ref */
+ dlm_lock_get(lock);
spin_lock(&lock->spinlock);
list_add_tail(&lock->bast_list, &dlm->pending_basts);
lock->bast_pending = 1;
spin_unlock(&lock->spinlock);
}
+void dlm_queue_bast(dlm_ctxt *dlm, dlm_lock *lock)
+{
+ dlmprintk0("\n");
+ DLM_ASSERT(dlm);
+ DLM_ASSERT(lock);
+
+ spin_lock(&dlm->ast_lock);
+ __dlm_queue_bast(dlm, lock);
+ spin_unlock(&dlm->ast_lock);
+}
+
static void dlm_update_lvb(dlm_ctxt *dlm, dlm_lock_resource *res,
dlm_lock *lock)
{
@@ -302,6 +317,7 @@
do_ast:
ret = DLM_NORMAL;
if (past->type == DLM_AST) {
+ /* do not alter lock refcount. switching lists. */
list_del_init(&lock->list);
list_add_tail(&lock->list, &res->granted);
dlmprintk("ast: adding to granted list... type=%d, "
Modified: trunk/fs/ocfs2/dlm/dlmcommon.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmcommon.h 2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmcommon.h 2005-04-09 00:16:35 UTC (rev 2129)
@@ -105,6 +105,7 @@
struct list_head pending_basts;
unsigned int purge_count;
spinlock_t spinlock;
+ spinlock_t ast_lock;
struct rw_semaphore recovery_sem;
char *name;
u8 node_num;
@@ -239,6 +240,8 @@
struct list_head purge;
unsigned long last_used;
+ unsigned migration_pending:1;
+ atomic_t asts_reserved;
spinlock_t spinlock;
wait_queue_head_t wq;
u8 owner; //node which owns the lock resource, or unknown
@@ -272,6 +275,7 @@
struct list_head bast_list;
dlm_lock_resource *lockres;
spinlock_t spinlock;
+ struct kref lock_refs;
// ast and bast must be callable while holding a spinlock!
dlm_astlockfunc_t *ast;
@@ -772,9 +776,32 @@
dlm_migratable_lock_to_host(&(mr->ml[i]));
}
+static inline dlm_status __dlm_lockres_state_to_status(dlm_lock_resource *res)
+{
+ dlm_status status = DLM_NORMAL;
+ assert_spin_locked(&res->spinlock);
+
+ if (res->state & DLM_LOCK_RES_RECOVERING) {
+ dlmprintk0("returning DLM_RECOVERING\n");
+ status = DLM_RECOVERING;
+ } else if (res->state & DLM_LOCK_RES_MIGRATING) {
+ dlmprintk0("returning DLM_MIGRATING\n");
+ status = DLM_MIGRATING;
+ } else if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
+ dlmprintk0("returning DLM_FORWARD\n");
+ status = DLM_FORWARD;
+ }
+ return status;
+}
+
dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, dlm_lockstatus *lksb);
+void dlm_lock_get(dlm_lock *lock);
+void dlm_lock_put(dlm_lock *lock);
+void dlm_lock_detach_lockres(dlm_lock *lock);
+void dlm_lock_attach_lockres(dlm_lock *lock, dlm_lock_resource *res);
+
int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
@@ -785,6 +812,7 @@
int dlm_launch_thread(dlm_ctxt *dlm);
void dlm_complete_thread(dlm_ctxt *dlm);
void dlm_flush_asts(dlm_ctxt *dlm);
+int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res);
int dlm_launch_recovery_thread(dlm_ctxt *dlm);
void dlm_complete_recovery_thread(dlm_ctxt *dlm);
@@ -819,6 +847,7 @@
void __dlm_queue_ast(dlm_ctxt *dlm, dlm_lock *lock);
void dlm_queue_ast(dlm_ctxt *dlm, dlm_lock *lock);
void __dlm_queue_bast(dlm_ctxt *dlm, dlm_lock *lock);
+void dlm_queue_bast(dlm_ctxt *dlm, dlm_lock *lock);
void dlm_do_local_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock);
int dlm_do_remote_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock);
void dlm_do_local_bast(dlm_ctxt *dlm, dlm_lock_resource *res,
@@ -852,6 +881,8 @@
int dlm_migrate_lockres(dlm_ctxt *dlm, dlm_lock_resource *res, u8 target);
int dlm_finish_migration(dlm_ctxt *dlm, dlm_lock_resource *res, u8 old_master);
+void dlm_lockres_release_ast(dlm_lock_resource *res);
+void __dlm_lockres_reserve_ast(dlm_lock_resource *res);
int dlm_master_request_handler(net_msg *msg, u32 len, void *data);
int dlm_assert_master_handler(net_msg *msg, u32 len, void *data);
@@ -878,6 +909,7 @@
/* will exit holding res->spinlock, but may drop in function */
void __dlm_wait_on_lockres_flags(dlm_lock_resource *res, int flags);
+void __dlm_wait_on_lockres_flags_set(dlm_lock_resource *res, int flags);
/* will exit holding res->spinlock, but may drop in function */
static inline void __dlm_wait_on_lockres(dlm_lock_resource *res)
Modified: trunk/fs/ocfs2/dlm/dlmconvert.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmconvert.c 2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmconvert.c 2005-04-09 00:16:35 UTC (rev 2129)
@@ -79,6 +79,7 @@
spin_lock(&res->spinlock);
/* we are not in a network handler, this is fine */
__dlm_wait_on_lockres(res);
+ __dlm_lockres_reserve_ast(res);
res->state |= DLM_LOCK_RES_IN_PROGRESS;
status = __dlmconvert_master(dlm, res, lock, flags, type,
@@ -88,8 +89,11 @@
spin_unlock(&res->spinlock);
wake_up(&res->wq);
+ /* either queue the ast or release it */
if (call_ast)
dlm_queue_ast(dlm, lock);
+ else
+ dlm_lockres_release_ast(res);
if (kick_thread)
dlm_kick_thread(dlm, res);
@@ -216,6 +220,7 @@
res->lockname.name);
lock->ml.convert_type = type;
+ /* do not alter lock refcount. switching lists. */
list_del_init(&lock->list);
list_add_tail(&lock->list, &res->converting);
@@ -252,6 +257,7 @@
res->state |= DLM_LOCK_RES_IN_PROGRESS;
/* move lock to local convert queue */
+ /* do not alter lock refcount. switching lists. */
list_del_init(&lock->list);
list_add_tail(&lock->list, &res->converting);
if (lock->ml.convert_type != LKM_IVMODE) {
@@ -288,6 +294,7 @@
/* if it failed, move it back to granted queue */
if (status != DLM_NORMAL) {
+ /* do not alter lock refcount. switching lists. */
list_del_init(&lock->list);
list_add_tail(&lock->list, &res->granted);
lock->ml.convert_type = LKM_IVMODE;
@@ -389,7 +396,6 @@
dlm_status status = DLM_NORMAL;
u32 flags;
int call_ast = 0, kick_thread = 0;
- int found = 0;
if (!dlm_grab(dlm))
return DLM_REJECTED;
@@ -421,28 +427,17 @@
goto leave;
spin_lock(&res->spinlock);
- if (res->state & DLM_LOCK_RES_RECOVERING) {
- spin_unlock(&res->spinlock);
- dlmprintk0("returning DLM_RECOVERING\n");
- status = DLM_RECOVERING;
- goto leave;
- }
- if (res->state & DLM_LOCK_RES_MIGRATING) {
- spin_unlock(&res->spinlock);
- dlmprintk0("returning DLM_MIGRATING\n");
- status = DLM_MIGRATING;
- goto leave;
- }
list_for_each(iter, &res->granted) {
lock = list_entry(iter, dlm_lock, list);
if (lock->ml.cookie == cnv->cookie &&
lock->ml.node == cnv->node_idx) {
- found = 1;
+ dlm_lock_get(lock);
break;
}
+ lock = NULL;
}
spin_unlock(&res->spinlock);
- if (!found)
+ if (!lock)
goto leave;
/* found the lock */
@@ -461,9 +456,9 @@
}
spin_lock(&res->spinlock);
- if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
- status = DLM_FORWARD;
- } else {
+ status = __dlm_lockres_state_to_status(res);
+ if (status == DLM_NORMAL) {
+ __dlm_lockres_reserve_ast(res);
res->state |= DLM_LOCK_RES_IN_PROGRESS;
status = __dlmconvert_master(dlm, res, lock, flags,
cnv->requested_type,
@@ -480,9 +475,15 @@
if (!lock)
dlmprintk("did not find lock to convert on "
"grant queue! cookie=%llu\n", cnv->cookie);
+ else
+ dlm_lock_put(lock);
+ /* either queue the ast or release it */
if (call_ast)
dlm_queue_ast(dlm, lock);
+ else
+ dlm_lockres_release_ast(res);
+
if (kick_thread)
dlm_kick_thread(dlm, res);
Modified: trunk/fs/ocfs2/dlm/dlmdomain.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmdomain.c 2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmdomain.c 2005-04-09 00:16:35 UTC (rev 2129)
@@ -47,7 +47,13 @@
/*
*
* spinlock lock ordering: if multiple locks are needed, obey this ordering:
- * dlm_domain_lock -> dlm_ctxt -> dlm_lock_resource -> dlm_lock
+ * dlm_domain_lock
+ * dlm_ctxt->spinlock
+ * dlm_lock_resource->spinlock
+ * dlm_ctxt->master_lock
+ * dlm_ctxt->ast_lock
+ * dlm_master_list_entry->spinlock
+ * dlm_lock->spinlock
*
*/
@@ -1153,6 +1159,7 @@
spin_lock_init(&dlm->spinlock);
spin_lock_init(&dlm->master_lock);
+ spin_lock_init(&dlm->ast_lock);
INIT_LIST_HEAD(&dlm->list);
INIT_LIST_HEAD(&dlm->dirty_list);
INIT_LIST_HEAD(&dlm->reco.resources);
Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c 2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmlock.c 2005-04-09 00:16:35 UTC (rev 2129)
@@ -56,6 +56,7 @@
dlm_lock_resource *res,
dlm_lock *lock, int flags);
static void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie);
+static void dlm_lock_release(struct kref *kref);
/* Tell us whether we can grant a new lock request.
* locking:
@@ -97,7 +98,7 @@
static dlm_status dlmlock_master(dlm_ctxt *dlm, dlm_lock_resource *res,
dlm_lock *lock, int flags)
{
- int call_ast = 0;
+ int call_ast = 0, kick_thread = 0;
dlm_status status = DLM_NORMAL;
DLM_ASSERT(lock);
@@ -108,13 +109,24 @@
dlmprintk("type=%d\n", lock->ml.type);
spin_lock(&res->spinlock);
+ /* if called from dlm_create_lock_handler, need to
+ * ensure it will not sleep in dlm_wait_on_lockres */
+ status = __dlm_lockres_state_to_status(res);
+ if (status != DLM_NORMAL &&
+ lock->ml.node != dlm->node_num) {
+ /* erf. state changed after lock was dropped. */
+ spin_unlock(&res->spinlock);
+ return status;
+ }
__dlm_wait_on_lockres(res);
+ __dlm_lockres_reserve_ast(res);
if (dlm_can_grant_new_lock(res, lock)) {
dlmprintk("I can grant this lock right away\n");
/* got it right away */
lock->lksb->status = DLM_NORMAL;
status = DLM_NORMAL;
+ dlm_lock_get(lock);
list_add_tail(&lock->list, &res->granted);
/* for the recovery lock, we can't allow the ast
@@ -123,25 +135,33 @@
* with LKM_NOQUEUE so we do not need the ast in
* this special case */
if (!dlm_is_recovery_lock(res->lockname.name,
- res->lockname.len))
+ res->lockname.len)) {
+ kick_thread = 1;
call_ast = 1;
+ }
} else {
/* for NOQUEUE request, unless we get the
* lock right away, return DLM_NOTQUEUED */
if (flags & LKM_NOQUEUE)
status = DLM_NOTQUEUED;
- else
+ else {
+ dlm_lock_get(lock);
list_add_tail(&lock->list, &res->blocked);
+ kick_thread = 1;
+ }
}
spin_unlock(&res->spinlock);
wake_up(&res->wq);
+ /* either queue the ast or release it */
if (call_ast)
dlm_queue_ast(dlm, lock);
+ else
+ dlm_lockres_release_ast(res);
dlm_lockres_calc_usage(dlm, res);
- if (status == DLM_NORMAL)
+ if (kick_thread)
dlm_kick_thread(dlm, res);
return status;
@@ -170,6 +190,7 @@
res->state |= DLM_LOCK_RES_IN_PROGRESS;
/* add lock to local (secondary) queue */
+ dlm_lock_get(lock);
list_add_tail(&lock->list, &res->blocked);
spin_unlock(&res->spinlock);
@@ -183,6 +204,7 @@
/* remove from local queue if it failed */
list_del_init(&lock->list);
lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
+ dlm_lock_put(lock);
}
spin_unlock(&res->spinlock);
@@ -233,7 +255,68 @@
return ret;
}
+void dlm_lock_get(dlm_lock *lock)
+{
+ kref_get(&lock->lock_refs);
+}
+void dlm_lock_put(dlm_lock *lock)
+{
+ kref_put(&lock->lock_refs, dlm_lock_release);
+}
+
+static void dlm_lock_release(struct kref *kref)
+{
+ dlm_lock *lock;
+ dlm_lockstatus *lksb;
+
+ DLM_ASSERT(kref);
+ lock = container_of(kref, dlm_lock, lock_refs);
+
+ lksb = lock->lksb;
+ DLM_ASSERT(lksb);
+ DLM_ASSERT(lksb->lockid == lock);
+ DLM_ASSERT(list_empty(&lock->list));
+ DLM_ASSERT(list_empty(&lock->ast_list));
+ DLM_ASSERT(list_empty(&lock->bast_list));
+ DLM_ASSERT(!lock->ast_pending);
+ DLM_ASSERT(!lock->bast_pending);
+
+ dlm_lock_detach_lockres(lock);
+
+ if (lksb->flags & DLM_LKSB_KERNEL_ALLOCATED) {
+ dlmprintk0("freeing kernel-allocated lksb\n");
+ kfree(lksb);
+ } else {
+ dlmprintk0("clearing lockid pointer on user-allocated lksb\n");
+ lksb->lockid = NULL;
+ }
+ kfree(lock);
+}
+
+/* associate a lock with it's lockres, getting a ref on the lockres */
+void dlm_lock_attach_lockres(dlm_lock *lock, dlm_lock_resource *res)
+{
+ DLM_ASSERT(lock);
+ DLM_ASSERT(res);
+ dlm_lockres_get(res);
+ lock->lockres = res;
+}
+
+/* drop ref on lockres, if there is still one associated with lock */
+void dlm_lock_detach_lockres(dlm_lock *lock)
+{
+ dlm_lock_resource *res;
+
+ DLM_ASSERT(lock);
+ res = lock->lockres;
+ if (res) {
+ lock->lockres = NULL;
+ dlmprintk0("removing lock's lockres reference\n");
+ dlm_lockres_put(res);
+ }
+}
+
static void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie)
{
INIT_LIST_HEAD(&newlock->list);
@@ -250,6 +333,7 @@
newlock->ml.cookie = cookie;
newlock->ast_pending = 0;
newlock->bast_pending = 0;
+ kref_init(&newlock->lock_refs, dlm_lock_release);
}
dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, dlm_lockstatus *lksb)
@@ -334,29 +418,21 @@
goto leave;
spin_lock(&res->spinlock);
- if (res->state & DLM_LOCK_RES_RECOVERING) {
- dlmprintk0("returning DLM_RECOVERING\n");
- status = DLM_RECOVERING;
- spin_unlock(&res->spinlock);
+ status = __dlm_lockres_state_to_status(res);
+ spin_unlock(&res->spinlock);
+
+ if (status != DLM_NORMAL) {
+ dlmprintk("lockres recovering/migrating/in-progress\n");
goto leave;
}
- if (res->state & DLM_LOCK_RES_MIGRATING) {
- dlmprintk0("returning DLM_MIGRATING\n");
- status = DLM_MIGRATING;
- spin_unlock(&res->spinlock);
- goto leave;
- }
- spin_unlock(&res->spinlock);
- newlock->lockres = res;
+ dlm_lock_attach_lockres(newlock, res);
+
status = dlmlock_master(dlm, res, newlock, create->flags);
leave:
- if (status != DLM_NORMAL) {
+ if (status != DLM_NORMAL)
if (newlock)
- kfree(newlock);
- if (lksb)
- kfree(lksb);
- }
+ dlm_lock_put(newlock);
if (res)
dlm_lockres_put(res);
@@ -366,6 +442,7 @@
return status;
}
+
/* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
{
@@ -392,6 +469,9 @@
dlm_lock *lock = NULL;
int convert = 0, recovery = 0;
+ /* yes this function is a mess.
+ * TODO: clean this up. lots of common code in the
+ * lock and convert paths, especially in the retry blocks */
if (!lksb)
return DLM_BADARGS;
@@ -405,8 +485,8 @@
convert = (flags & LKM_CONVERT);
recovery = (flags & LKM_RECOVERY);
- if (recovery && (!dlm_is_recovery_lock(name, strlen(name)) ||
- convert) ) {
+ if (recovery &&
+ (!dlm_is_recovery_lock(name, strlen(name)) || convert) ) {
goto error;
}
if (convert && (flags & LKM_LOCAL)) {
@@ -418,9 +498,18 @@
/* CONVERT request */
/* if converting, must pass in a valid dlm_lock */
- if (!lksb->lockid || !lksb->lockid->lockres)
+ lock = lksb->lockid;
+ if (!lock) {
+ dlmerror0("NULL lock pointer in convert request\n");
goto error;
- lock = lksb->lockid;
+ }
+
+ res = lock->lockres;
+ if (!res) {
+ dlmerror0("NULL lockres pointer in convert request\n");
+ goto error;
+ }
+ dlm_lockres_get(res);
/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are
* static after the original lock call. convert requests will
@@ -437,8 +526,6 @@
lock->bast, lock->astdata);
goto error;
}
- res = lock->lockres;
- dlm_lockres_get(res);
retry_convert:
down_read(&dlm->recovery_sem);
@@ -455,6 +542,7 @@
dlmprintk0("retrying convert with migration/"
"recovery/in-progress\n");
up_read(&dlm->recovery_sem);
+ yield();
goto retry_convert;
}
} else {
@@ -487,7 +575,7 @@
dlmprintk("type=%d, flags = 0x%x\n", mode, flags);
dlmprintk("creating lock: lock=%p res=%p\n", lock, res);
- lock->lockres = res;
+ dlm_lock_attach_lockres(lock, res);
lock->ast = ast;
lock->bast = bast;
lock->astdata = data;
@@ -511,10 +599,12 @@
else
status = dlmlock_remote(dlm, res, lock, flags);
- if (status == DLM_RECOVERING || status == DLM_MIGRATING) {
- dlmprintk0("retrying lock with migration or "
- "recovery in progress\n");
+ if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
+ status == DLM_FORWARD) {
+ dlmprintk0("retrying lock with migration/"
+ "recovery/in progress\n");
up_read(&dlm->recovery_sem);
+ yield();
down_read(&dlm->recovery_sem);
goto retry_lock;
}
@@ -531,14 +621,14 @@
error:
if (status != DLM_NORMAL) {
- if (lock && !convert) {
- kfree(lock);
- lksb->lockid = NULL;
- }
+ if (lock && !convert)
+ dlm_lock_put(lock);
// this is kind of unnecessary
lksb->status = status;
}
+ /* put lockres ref from the convert path
+ * or from dlm_get_lock_resource */
if (res)
dlm_lockres_put(res);
Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c 2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c 2005-04-09 00:16:35 UTC (rev 2129)
@@ -132,7 +132,7 @@
dlm_master_list_entry *mle);
static int dlm_restart_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res,
dlm_master_list_entry *mle);
-static void dlm_add_migration_mle(dlm_ctxt *dlm,
+static int dlm_add_migration_mle(dlm_ctxt *dlm,
dlm_lock_resource *res,
dlm_master_list_entry *mle,
dlm_master_list_entry **oldmle,
@@ -141,8 +141,10 @@
static u8 dlm_pick_migration_target(dlm_ctxt *dlm, dlm_lock_resource *res);
static void dlm_remove_nonlocal_locks(dlm_ctxt *dlm, dlm_lock_resource *res);
+static void dlm_mark_lockres_migrating(dlm_ctxt *dlm, dlm_lock_resource *res);
+
/*
* MASTER LIST FUNCTIONS
*/
@@ -473,6 +475,8 @@
INIT_LIST_HEAD(&res->dirty);
INIT_LIST_HEAD(&res->recovering);
INIT_LIST_HEAD(&res->purge);
+ atomic_set(&res->asts_reserved, 0);
+ res->migration_pending = 0;
kref_init(&res->refs, dlm_lockres_release);
@@ -1269,13 +1273,6 @@
return 0;
}
-int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res)
-{
- dlm_flush_asts(dlm);
- /* still need to implement dlm_flush_lockres_asts */
- return 0;
-}
-
int dlm_dispatch_assert_master(dlm_ctxt *dlm, dlm_lock_resource *res,
int ignore_higher, u8 request_from)
{
@@ -1386,12 +1383,20 @@
dlmprintk("migrating %.*s to %u\n", namelen, name, target);
+ /*
+ * ensure this lockres is a proper candidate for migration
+ */
spin_lock(&res->spinlock);
if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
dlmprintk0("cannot migrate lockres with unknown owner!\n");
spin_unlock(&res->spinlock);
goto leave;
}
+ if (res->owner != dlm->node_num) {
+ dlmprintk0("cannot migrate lockres this node doesn't own!\n");
+ spin_unlock(&res->spinlock);
+ goto leave;
+ }
dlmprintk0("checking queues...\n");
queue = &res->granted;
for (i=0; i<3; i++) {
@@ -1415,13 +1420,18 @@
dlmprintk0("all locks on this lockres are nonlocal. continuing\n");
spin_unlock(&res->spinlock);
+ /* no work to do */
if (empty) {
dlmprintk0("no locks were found on this lockres! done!\n");
ret = 0;
goto leave;
}
- /* preallocate.. if this fails, abort */
+ /*
+ * preallocate up front
+ * if this fails, abort
+ */
+
ret = -ENOMEM;
mres = (dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);
if (!mres) {
@@ -1436,6 +1446,10 @@
}
ret = 0;
+ /*
+ * find a node to migrate the lockres to
+ */
+
dlmprintk0("picking a migration node\n");
spin_lock(&dlm->spinlock);
/* pick a new node */
@@ -1453,34 +1467,33 @@
if (ret) {
spin_unlock(&dlm->spinlock);
- goto not_live;
+ goto fail;
}
dlmprintk("continuing with target = %u\n", target);
- /* clear any existing master requests and
- * add the migration mle to the list */
+ /*
+ * clear any existing master requests and
+ * add the migration mle to the list
+ */
spin_lock(&dlm->master_lock);
- dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen,
- target, dlm->node_num);
- mle_added = 1;
+ ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
+ namelen, target, dlm->node_num);
spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock);
- /* set migrating flag on lockres */
- spin_lock(&res->spinlock);
- res->state |= DLM_LOCK_RES_MIGRATING;
- spin_unlock(&res->spinlock);
-
- /* flush the last of the pending asts */
- ret = dlm_flush_lockres_asts(dlm, res);
- if (ret < 0) {
- spin_lock(&res->spinlock);
- res->state &= ~DLM_LOCK_RES_MIGRATING;
- spin_unlock(&res->spinlock);
+ if (ret == -EEXIST) {
+ dlmprintk0("another process is already migrating it\n");
+ goto fail;
}
+ mle_added = 1;
-not_live:
+ /*
+ * set the MIGRATING flag and flush asts
+ */
+ dlm_mark_lockres_migrating(dlm, res);
+
+fail:
if (oldmle) {
/* master is known, detach if not already detached */
dlm_mle_detach_hb_events(dlm, oldmle);
@@ -1503,6 +1516,11 @@
* the lockres
*/
+
+ /* get an extra reference on the mle.
+ * otherwise the assert_master from the new
+ * master will destroy this. */
+ dlm_get_mle(mle);
/* notify new node and send all lock state */
/* call send_one_lockres with migration flag.
@@ -1517,6 +1535,7 @@
/* migration failed, detach and clean up mle */
dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle);
+ dlm_put_mle(mle);
goto leave;
}
@@ -1548,6 +1567,7 @@
/* migration failed, detach and clean up mle */
dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle);
+ dlm_put_mle(mle);
goto leave;
}
/* TODO: if node died: stop, clean up, return error */
@@ -1578,6 +1598,39 @@
EXPORT_SYMBOL(dlm_migrate_lockres);
+static void dlm_mark_lockres_migrating(dlm_ctxt *dlm, dlm_lock_resource *res)
+{
+ /* need to set MIGRATING flag on lockres. this is done by
+ * ensuring that all asts have been flushed for this lockres. */
+ spin_lock(&res->spinlock);
+ DLM_ASSERT(!(res->migration_pending));
+ res->migration_pending = 1;
+ /* strategy is to reserve an extra ast then release
+ * it below, letting the release do all of the work */
+ __dlm_lockres_reserve_ast(res);
+ spin_unlock(&res->spinlock);
+
+ /* now flush all the pending asts.. hang out for a bit */
+ dlm_flush_lockres_asts(dlm, res);
+ dlm_lockres_release_ast(res);
+
+ /* if the extra ref we just put was the final one, this
+ * will pass thru immediately. otherwise, we need to wait
+ * for the last ast to finish. */
+ spin_lock(&res->spinlock);
+ __dlm_wait_on_lockres_flags_set(res, DLM_LOCK_RES_MIGRATING);
+ spin_unlock(&res->spinlock);
+
+ /*
+ * at this point:
+ *
+ * o the DLM_LOCK_RES_MIGRATING flag is set
+ * o there are no pending asts on this lockres
+ * o all processes trying to reserve an ast on this
+ * lockres must wait for the MIGRATING flag to clear
+ */
+}
+
/* last step in the migration process.
* original master calls this to free all of the dlm_lock
* structures that used to be for other nodes. */
@@ -1597,13 +1650,15 @@
lock = list_entry (iter, dlm_lock, list);
DLM_ASSERT(lock);
if (lock->ml.node != dlm->node_num) {
- dlmprintk("freeing lock for node %u\n",
+ dlmprintk("putting lock for node %u\n",
lock->ml.node);
+ /* be extra careful */
+ DLM_ASSERT(list_empty(&lock->ast_list));
+ DLM_ASSERT(list_empty(&lock->bast_list));
+ DLM_ASSERT(!lock->ast_pending);
+ DLM_ASSERT(!lock->bast_pending);
list_del_init(&lock->list);
- dlm_lockres_put(lock->lockres);
- DLM_ASSERT(lock->lksb);
- kfree(lock->lksb);
- kfree(lock);
+ dlm_lock_put(lock);
}
}
queue++;
@@ -1757,8 +1812,11 @@
spin_unlock(&res->spinlock);
}
- dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen,
- migrate->new_master, migrate->master);
+ /* ignore status. only nonzero status would BUG. */
+ ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
+ name, namelen,
+ migrate->new_master,
+ migrate->master);
unlock:
spin_unlock(&dlm->master_lock);
@@ -1784,7 +1842,7 @@
* the list after setting it's master field, and then add
* the new migration mle. this way we can hold with the rule
* of having only one mle for a given lock name at all times. */
-static void dlm_add_migration_mle(dlm_ctxt *dlm,
+static int dlm_add_migration_mle(dlm_ctxt *dlm,
dlm_lock_resource *res,
dlm_master_list_entry *mle,
dlm_master_list_entry **oldmle,
@@ -1792,6 +1850,7 @@
u8 new_master, u8 master)
{
int found;
+ int ret = 0;
*oldmle = NULL;
@@ -1800,18 +1859,38 @@
assert_spin_locked(&dlm->spinlock);
assert_spin_locked(&dlm->master_lock);
+ /* caller is responsible for any ref taken here on oldmle */
found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
if (found) {
dlm_master_list_entry *tmp = *oldmle;
- /* this is essentially what assert_master does */
spin_lock(&tmp->spinlock);
- tmp->master = master;
- atomic_set(&tmp->woken, 1);
- wake_up(&tmp->wq);
- /* remove it from the list so that only one
- * mle will be found */
- list_del_init(&tmp->list);
- INIT_LIST_HEAD(&tmp->list);
+ if (tmp->type == DLM_MLE_MIGRATION) {
+ if (master == dlm->node_num) {
+ /* ah another process raced me to it */
+ dlmprintk("tried to migrate %.*s, but some "
+ "process beat me to it\n",
+ namelen, name);
+ ret = -EEXIST;
+ } else {
+ /* bad. 2 NODES are trying to migrate! */
+ dlmerror("migration error. mle: master=%u "
+ "new_master=%u // request: "
+ "master=%u new_master=%u // "
+ "lockres=%.*s\n",
+ tmp->master, tmp->new_master,
+ master, new_master,
+ namelen, name);
+ BUG();
+ }
+ } else {
+ /* this is essentially what assert_master does */
+ tmp->master = master;
+ atomic_set(&tmp->woken, 1);
+ wake_up(&tmp->wq);
+ /* remove it from the list so that only one
+ * mle will be found */
+ list_del_init(&tmp->list);
+ }
spin_unlock(&tmp->spinlock);
}
@@ -1822,6 +1901,8 @@
/* do this for consistency with other mle types */
set_bit(new_master, mle->maybe_map);
list_add(&mle->list, &dlm->master_list);
+
+ return ret;
}
@@ -1979,3 +2060,50 @@
leave:
return ret;
}
+
+/*
+ * LOCKRES AST REFCOUNT
+ * this is integral to migration
+ */
+
+/* for future intent to call an ast, reserve one ahead of time.
+ * this should be called only after waiting on the lockres
+ * with dlm_wait_on_lockres, and while still holding the
+ * spinlock after the call. */
+void __dlm_lockres_reserve_ast(dlm_lock_resource *res)
+{
+ assert_spin_locked(&res->spinlock);
+ DLM_ASSERT(!(res->state & DLM_LOCK_RES_MIGRATING));
+
+ atomic_inc(&res->asts_reserved);
+}
+
+/*
+ * used to drop the reserved ast, either because it went unused,
+ * or because the ast/bast was actually called.
+ *
+ * also, if there is a pending migration on this lockres,
+ * and this was the last pending ast on the lockres,
+ * atomically set the MIGRATING flag before we drop the lock.
+ * this is how we ensure that migration can proceed with no
+ * asts in progress. note that it is ok if the state of the
+ * queues is such that a lock should be granted in the future
+ * or that a bast should be fired, because the new master will
+ * shuffle the lists on this lockres as soon as it is migrated.
+ */
+void dlm_lockres_release_ast(dlm_lock_resource *res)
+{
+ if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
+ return;
+
+ if (!res->migration_pending) {
+ spin_unlock(&res->spinlock);
+ return;
+ }
+
+ DLM_ASSERT(!(res->state & DLM_LOCK_RES_MIGRATING));
+ res->migration_pending = 0;
+ res->state |= DLM_LOCK_RES_MIGRATING;
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+}
Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-04-09 00:16:35 UTC (rev 2129)
@@ -1028,6 +1028,7 @@
"%.*s, but marked as recovering!\n",
mres->lockname_len, mres->lockname);
ret = -EFAULT;
+ spin_unlock(&res->spinlock);
goto leave;
}
res->state |= DLM_LOCK_RES_MIGRATING;
@@ -1369,6 +1370,7 @@
* to match the master here */
/* move the lock to its proper place */
+ /* do not alter lock refcount. switching lists. */
list_del_init(&lock->list);
list_add_tail(&lock->list, queue);
spin_unlock(&res->spinlock);
@@ -1384,8 +1386,7 @@
goto leave;
}
lksb = newlock->lksb;
- dlm_lockres_get(res);
- newlock->lockres = res;
+ dlm_lock_attach_lockres(newlock, res);
if (ml->convert_type != LKM_IVMODE) {
DLM_ASSERT(queue == &res->converting);
@@ -1417,6 +1418,7 @@
* preserved relative to locks from other nodes.
*/
spin_lock(&res->spinlock);
+ dlm_lock_get(newlock);
list_add_tail(&newlock->list, queue);
spin_unlock(&res->spinlock);
}
@@ -1427,9 +1429,7 @@
dlmprintk("error occurred while processing recovery "
"data! %d\n", ret);
if (newlock)
- kfree(newlock);
- if (lksb)
- kfree(lksb);
+ dlm_lock_put(newlock);
}
dlmprintk("returning %d\n", ret);
return ret;
@@ -1483,21 +1483,21 @@
lock = list_entry (iter, dlm_lock, list);
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
- kfree(lock);
+ dlm_lock_put(lock);
}
}
list_for_each_safe(iter, tmpiter, &res->converting) {
lock = list_entry (iter, dlm_lock, list);
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
- kfree(lock);
+ dlm_lock_put(lock);
}
}
list_for_each_safe(iter, tmpiter, &res->blocked) {
lock = list_entry (iter, dlm_lock, list);
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
- kfree(lock);
+ dlm_lock_put(lock);
}
}
}
Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c 2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmthread.c 2005-04-09 00:16:35 UTC (rev 2129)
@@ -58,6 +58,7 @@
#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
/* will exit holding res->spinlock, but may drop in function */
+/* waits until flags are cleared on res->state */
void __dlm_wait_on_lockres_flags(dlm_lock_resource *res, int flags)
{
DECLARE_WAITQUEUE(wait, current);
@@ -77,6 +78,26 @@
current->state = TASK_RUNNING;
}
+/* opposite of the above, waits until flags are SET */
+void __dlm_wait_on_lockres_flags_set(dlm_lock_resource *res, int flags)
+{
+ DECLARE_WAITQUEUE(wait, current);
+
+ assert_spin_locked(&res->spinlock);
+
+ add_wait_queue(&res->wq, &wait);
+repeat:
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ if ((res->state & flags) != flags) {
+ spin_unlock(&res->spinlock);
+ schedule();
+ spin_lock(&res->spinlock);
+ goto repeat;
+ }
+ remove_wait_queue(&res->wq, &wait);
+ current->state = TASK_RUNNING;
+}
+
static int __dlm_lockres_unused(dlm_lock_resource *res)
{
if (list_empty(&res->granted) &&
@@ -236,7 +257,14 @@
// dlmprintk("shuffle res %.*s\n", res->lockname.len,
// res->lockname.name);
- spin_lock(&res->spinlock);
+ /* because this function is called with the lockres
+ * spinlock, and because we know that it is not migrating/
+ * recovering/in-progress, it is fine to reserve asts and
+ * basts right before queueing them all throughout */
+ assert_spin_locked(&res->spinlock);
+ DLM_ASSERT(!(res->state & (DLM_LOCK_RES_MIGRATING|
+ DLM_LOCK_RES_RECOVERING|
+ DLM_LOCK_RES_IN_PROGRESS)));
converting:
if (list_empty(&res->converting))
@@ -255,14 +283,18 @@
lock = list_entry(iter, dlm_lock, list);
if (lock==target)
continue;
- if (!dlm_lock_compatible(lock->ml.type, target->ml.convert_type)) {
+ if (!dlm_lock_compatible(lock->ml.type,
+ target->ml.convert_type)) {
can_grant = 0;
/* queue the BAST if not already */
- if (lock->ml.highest_blocked == LKM_IVMODE)
- __dlm_queue_bast(dlm, lock);
+ if (lock->ml.highest_blocked == LKM_IVMODE) {
+ __dlm_lockres_reserve_ast(res);
+ dlm_queue_bast(dlm, lock);
+ }
/* update the highest_blocked if needed */
if (lock->ml.highest_blocked < target->ml.convert_type)
- lock->ml.highest_blocked = target->ml.convert_type;
+ lock->ml.highest_blocked =
+ target->ml.convert_type;
}
}
head = &res->converting;
@@ -270,12 +302,16 @@
lock = list_entry(iter, dlm_lock, list);
if (lock==target)
continue;
- if (!dlm_lock_compatible(lock->ml.type, target->ml.convert_type)) {
+ if (!dlm_lock_compatible(lock->ml.type,
+ target->ml.convert_type)) {
can_grant = 0;
- if (lock->ml.highest_blocked == LKM_IVMODE)
- __dlm_queue_bast(dlm, lock);
+ if (lock->ml.highest_blocked == LKM_IVMODE) {
+ __dlm_lockres_reserve_ast(res);
+ dlm_queue_bast(dlm, lock);
+ }
if (lock->ml.highest_blocked < target->ml.convert_type)
- lock->ml.highest_blocked = target->ml.convert_type;
+ lock->ml.highest_blocked =
+ target->ml.convert_type;
}
}
@@ -299,7 +335,8 @@
spin_unlock(&target->spinlock);
- __dlm_queue_ast(dlm, target);
+ __dlm_lockres_reserve_ast(res);
+ dlm_queue_ast(dlm, target);
/* go back and check for more */
goto converting;
}
@@ -316,8 +353,10 @@
continue;
if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
can_grant = 0;
- if (lock->ml.highest_blocked == LKM_IVMODE)
- __dlm_queue_bast(dlm, lock);
+ if (lock->ml.highest_blocked == LKM_IVMODE) {
+ __dlm_lockres_reserve_ast(res);
+ dlm_queue_bast(dlm, lock);
+ }
if (lock->ml.highest_blocked < target->ml.type)
lock->ml.highest_blocked = target->ml.type;
}
@@ -330,8 +369,10 @@
continue;
if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
can_grant = 0;
- if (lock->ml.highest_blocked == LKM_IVMODE)
- __dlm_queue_bast(dlm, lock);
+ if (lock->ml.highest_blocked == LKM_IVMODE) {
+ __dlm_lockres_reserve_ast(res);
+ dlm_queue_bast(dlm, lock);
+ }
if (lock->ml.highest_blocked < target->ml.type)
lock->ml.highest_blocked = target->ml.type;
}
@@ -356,13 +397,14 @@
spin_unlock(&target->spinlock);
- __dlm_queue_ast(dlm, target);
+ __dlm_lockres_reserve_ast(res);
+ dlm_queue_ast(dlm, target);
/* go back and check for more */
goto converting;
}
leave:
- spin_unlock(&res->spinlock);
+ return;
}
/* must have NO locks when calling this */
@@ -423,29 +465,37 @@
return empty;
}
+
+int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res)
+{
+ dlm_flush_asts(dlm);
+ /* still need to implement dlm_flush_lockres_asts */
+ return 0;
+}
+
+
+
void dlm_flush_asts(dlm_ctxt *dlm)
{
- struct list_head *iter, *iter2;
dlm_lock *lock;
dlm_lock_resource *res;
- LIST_HEAD(ast_tmp);
- LIST_HEAD(bast_tmp);
u8 hi;
- spin_lock(&dlm->spinlock);
- list_splice_init(&dlm->pending_asts, &ast_tmp);
- list_splice_init(&dlm->pending_basts, &bast_tmp);
- spin_unlock(&dlm->spinlock);
-
- list_for_each_safe(iter, iter2, &ast_tmp) {
- lock = list_entry(iter, dlm_lock, ast_list);
+ spin_lock(&dlm->ast_lock);
+ while (!list_empty(&dlm->pending_asts)) {
+ lock = list_entry(dlm->pending_asts.next,
+ dlm_lock, ast_list);
+ /* get an extra ref on lock */
+ dlm_lock_get(lock);
res = lock->lockres;
dlmprintk0("delivering an ast for this lockres\n");
+
DLM_ASSERT(lock->ast_pending);
- spin_lock(&lock->spinlock);
+ /* remove from list (including ref) */
list_del_init(&lock->ast_list);
- spin_unlock(&lock->spinlock);
+ dlm_lock_put(lock);
+ spin_unlock(&dlm->ast_lock);
if (lock->ml.node != dlm->node_num) {
if (dlm_do_remote_ast(dlm, res, lock) < 0)
@@ -453,7 +503,8 @@
} else
dlm_do_local_ast(dlm, res, lock);
- spin_lock(&lock->spinlock);
+ spin_lock(&dlm->ast_lock);
+
/* possible that another ast was queued while
* we were delivering the last one */
if (!list_empty(&lock->ast_list)) {
@@ -462,11 +513,18 @@
"keep the ast_pending flag set.\n");
} else
lock->ast_pending = 0;
- spin_unlock(&lock->spinlock);
+
+ /* drop the extra ref.
+ * this may drop it completely. */
+ dlm_lock_put(lock);
+ dlm_lockres_release_ast(res);
}
-
- list_for_each_safe(iter, iter2, &bast_tmp) {
- lock = list_entry(iter, dlm_lock, bast_list);
+
+ while (!list_empty(&dlm->pending_basts)) {
+ lock = list_entry(dlm->pending_basts.next,
+ dlm_lock, bast_list);
+ /* get an extra ref on lock */
+ dlm_lock_get(lock);
res = lock->lockres;
DLM_ASSERT(lock->bast_pending);
@@ -476,19 +534,24 @@
DLM_ASSERT(lock->ml.highest_blocked > LKM_IVMODE);
hi = lock->ml.highest_blocked;
lock->ml.highest_blocked = LKM_IVMODE;
+ spin_unlock(&lock->spinlock);
+ /* remove from list (including ref) */
list_del_init(&lock->bast_list);
- spin_unlock(&lock->spinlock);
+ dlm_lock_put(lock);
+ spin_unlock(&dlm->ast_lock);
dlmprintk("delivering a bast for this lockres "
"(blocked = %d\n", hi);
+
if (lock->ml.node != dlm->node_num) {
if (dlm_send_proxy_bast(dlm, res, lock, hi) < 0)
dlmprintk0("eeek\n");
} else
dlm_do_local_bast(dlm, res, lock, hi);
- spin_lock(&lock->spinlock);
+ spin_lock(&dlm->ast_lock);
+
/* possible that another bast was queued while
* we were delivering the last one */
if (!list_empty(&lock->bast_list)) {
@@ -497,15 +560,22 @@
"keep the bast_pending flag set.\n");
} else
lock->bast_pending = 0;
- spin_unlock(&lock->spinlock);
+
+ /* drop the extra ref.
+ * this may drop it completely. */
+ dlm_lock_put(lock);
+ dlm_lockres_release_ast(res);
}
+ spin_unlock(&dlm->ast_lock);
}
+
#define DLM_THREAD_TIMEOUT_MS (4 * 1000)
+#define DLM_THREAD_MAX_DIRTY 100
+#define DLM_THREAD_MAX_ASTS 10
static int dlm_thread(void *data)
{
- struct list_head *iter, *tmpiter;
dlm_lock_resource *res;
dlm_ctxt *dlm = data;
unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
@@ -513,33 +583,91 @@
dlmprintk("dlm thread running for %s...\n", dlm->name);
while (!kthread_should_stop()) {
+ int n = DLM_THREAD_MAX_DIRTY;
dlm_run_purge_list(dlm);
down_read(&dlm->recovery_sem);
+
+ /* this will now do the dlm_shuffle_lists
+ * while the dlm->spinlock is unlocked */
spin_lock(&dlm->spinlock);
- list_for_each_safe(iter, tmpiter, &dlm->dirty_list) {
- res = list_entry(iter, dlm_lock_resource, dirty);
-
+ while (!list_empty(&dlm->dirty_list)) {
+ int delay = 0;
+ res = list_entry(dlm->dirty_list.next,
+ dlm_lock_resource, dirty);
+
+ /* peel a lockres off, remove it from the list,
+ * unset the dirty flag and drop the dlm lock */
DLM_ASSERT(res);
+ dlm_lockres_get(res);
+
spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_DIRTY;
list_del_init(&res->dirty);
- res->state &= ~DLM_LOCK_RES_DIRTY;
- BUG_ON(res->owner != dlm->node_num);
spin_unlock(&res->spinlock);
+ spin_unlock(&dlm->spinlock);
+
+ spin_lock(&res->spinlock);
+ DLM_ASSERT(!(res->state & DLM_LOCK_RES_MIGRATING));
+ DLM_ASSERT(!(res->state & DLM_LOCK_RES_RECOVERING));
+ DLM_ASSERT(res->owner == dlm->node_num);
+
+ if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
+ /* move it to the tail and keep going */
+ spin_unlock(&res->spinlock);
+ dlmprintk("delaying list shuffling for in-"
+ "progress lockres %.*s\n",
+ res->lockname.len,
+ res->lockname.name);
+ delay = 1;
+ goto in_progress;
+ }
+
+ /* at this point the lockres is not migrating/
+ * recovering/in-progress. we have the lockres
+ * spinlock and do NOT have the dlm lock.
+ * safe to reserve/queue asts and run the lists. */
+
dlmprintk("calling dlm_shuffle_lists with "
- "dlm=%p, res=%p\n", dlm, res);
+ "dlm=%p, res=%p\n", dlm, res);
+
+ /* called while holding lockres lock */
dlm_shuffle_lists(dlm, res);
+ spin_unlock(&res->spinlock);
- spin_lock(&res->spinlock);
- __dlm_lockres_calc_usage(dlm, res);
- spin_unlock(&res->spinlock);
+ dlm_lockres_calc_usage(dlm, res);
+
+in_progress:
+
+ spin_lock(&dlm->spinlock);
+ /* if the lock was in-progress, stick
+ * it on the back of the list */
+ if (delay) {
+ spin_lock(&res->spinlock);
+ list_add_tail(&res->dirty, &dlm->dirty_list);
+ res->state |= DLM_LOCK_RES_DIRTY;
+ spin_unlock(&res->spinlock);
+ }
+ dlm_lockres_put(res);
+
+ /* unlikely, but we may need to give time to
+ * other tasks */
+ if (!--n) {
+ dlmprintk0("throttling dlm_thread\n");
+ break;
+ }
}
+
spin_unlock(&dlm->spinlock);
dlm_flush_asts(dlm);
up_read(&dlm->recovery_sem);
+ /* no need to sleep if we know there is more work to do */
+ if (!n)
+ continue;
+
wait_event_interruptible_timeout(dlm->dlm_thread_wq,
!dlm_dirty_list_empty(dlm) ||
kthread_should_stop(),
Modified: trunk/fs/ocfs2/dlm/dlmunlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmunlock.c 2005-04-08 21:25:16 UTC (rev 2128)
+++ trunk/fs/ocfs2/dlm/dlmunlock.c 2005-04-09 00:16:35 UTC (rev 2129)
@@ -76,6 +76,7 @@
* taken: res->spinlock and lock->spinlock taken and dropped
* held on exit: none
* returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
+ * all callers should have taken an extra ref on lock coming in
*/
static dlm_status dlmunlock_common(dlm_ctxt *dlm, dlm_lock_resource *res,
dlm_lock *lock, dlm_lockstatus *lksb,
@@ -160,10 +161,21 @@
spin_lock(&lock->spinlock);
}
- if (actions & DLM_UNLOCK_REMOVE_LOCK)
+ /* get an extra ref on lock. if we are just switching
+ * lists here, we dont want the lock to go away. */
+ dlm_lock_get(lock);
+
+ if (actions & DLM_UNLOCK_REMOVE_LOCK) {
list_del_init(&lock->list);
- if (actions & DLM_UNLOCK_REGRANT_LOCK)
+ dlm_lock_put(lock);
+ }
+ if (actions & DLM_UNLOCK_REGRANT_LOCK) {
+ dlm_lock_get(lock);
list_add_tail(&lock->list, &res->granted);
+ }
+
+ /* remove the extra ref on lock */
+ dlm_lock_put(lock);
leave:
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
@@ -171,13 +183,15 @@
spin_unlock(&res->spinlock);
wake_up(&res->wq);
+ /* let the caller's final dlm_lock_put handle the actual kfree
+ * NOTE: this silly block and the FREE_LOCK constant
+ * can go once the lock refcounting stuff is tested */
if (actions & DLM_UNLOCK_FREE_LOCK) {
-#warning this can corrupt memory!
- /* XXX If this lock has a bast pending, then we've
- * just free'd memory that the dlmthread will be
- * referencing... BAAAAD! */
- kfree(lock);
- lksb->lockid = NULL;
+ /* this should always be coupled with list removal */
+ DLM_ASSERT(actions & DLM_UNLOCK_REMOVE_LOCK);
+ dlmprintk("lock %llu should be gone now! refs=%d\n",
+ lock->ml.cookie,
+ atomic_read(&lock->lock_refs.refcount));
}
if (actions & DLM_UNLOCK_CALL_AST)
*call_ast = 1;
@@ -358,6 +372,7 @@
lock = list_entry(iter, dlm_lock, list);
if (lock->ml.cookie == unlock->cookie &&
lock->ml.node == unlock->node_idx) {
+ dlm_lock_get(lock);
found = 1;
break;
}
@@ -399,6 +414,7 @@
else {
/* send the lksb->status back to the other node */
status = lksb->status;
+ dlm_lock_put(lock);
}
leave:
@@ -498,6 +514,8 @@
lock = lksb->lockid;
DLM_ASSERT(lock);
+ dlm_lock_get(lock);
+
res = lock->lockres;
DLM_ASSERT(res);
dlm_lockres_get(res);
@@ -531,6 +549,8 @@
* may be happening on another node. Perhaps the
* proper solution is to queue up requests on the
* other end? */
+
+ /* do we want to yield(); ?? */
msleep(50);
dlmprintk0("retrying unlock due to pending recovery/"
@@ -551,6 +571,7 @@
dlm_lockres_calc_usage(dlm, res);
dlm_lockres_put(res);
+ dlm_lock_put(lock);
dlmprintk("returning status=%d!\n", status);
return status;
More information about the Ocfs2-commits
mailing list