[Ocfs2-commits] khackel commits r2026 - trunk/fs/ocfs2/dlm
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Mon Mar 21 16:23:35 CST 2005
Author: khackel
Signed-off-by: mfasheh
Date: 2005-03-21 16:23:34 -0600 (Mon, 21 Mar 2005)
New Revision: 2026
Modified:
trunk/fs/ocfs2/dlm/dlmast.c
trunk/fs/ocfs2/dlm/dlmconvert.c
trunk/fs/ocfs2/dlm/dlmlock.c
trunk/fs/ocfs2/dlm/dlmmaster.c
trunk/fs/ocfs2/dlm/dlmmod.c
trunk/fs/ocfs2/dlm/dlmmod.h
trunk/fs/ocfs2/dlm/dlmrecovery.c
trunk/fs/ocfs2/dlm/dlmthread.c
trunk/fs/ocfs2/dlm/dlmunlock.c
Log:
* adds the dlm recovery thread and implements recovery
* adds dlm_migrate_lockres and a /proc entry to test it
externally
* fixes several bugs that prevented clean dismount
* implements shutdown code to migrate locks away to other
nodes in order to complete a dismount
* added a work queue (currently run by keventd, but can be
directly run by dlm_thread in the future) to dispatch
certain deferrable network handlers which must themselves
send network messages
Signed-off-by: mfasheh
Modified: trunk/fs/ocfs2/dlm/dlmast.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmast.c 2005-03-21 21:17:54 UTC (rev 2025)
+++ trunk/fs/ocfs2/dlm/dlmast.c 2005-03-21 22:23:34 UTC (rev 2026)
@@ -60,7 +60,12 @@
assert_spin_locked(&dlm->spinlock);
DLM_ASSERT(list_empty(&lock->ast_list));
+ DLM_ASSERT(!lock->ast_pending);
+
+ spin_lock(&lock->spinlock);
list_add_tail(&lock->ast_list, &dlm->pending_asts);
+ lock->ast_pending = 1;
+ spin_unlock(&lock->spinlock);
}
void dlm_queue_ast(dlm_ctxt *dlm, dlm_lock *lock)
@@ -71,8 +76,7 @@
DLM_ASSERT(lock);
spin_lock(&dlm->spinlock);
- DLM_ASSERT(list_empty(&lock->ast_list));
- list_add_tail(&lock->ast_list, &dlm->pending_asts);
+ __dlm_queue_ast(dlm, lock);
spin_unlock(&dlm->spinlock);
}
@@ -86,8 +90,12 @@
assert_spin_locked(&dlm->spinlock);
DLM_ASSERT(list_empty(&lock->bast_list));
+ DLM_ASSERT(!lock->bast_pending);
+ spin_lock(&lock->spinlock);
list_add_tail(&lock->bast_list, &dlm->pending_basts);
+ lock->bast_pending = 1;
+ spin_unlock(&lock->spinlock);
}
@@ -244,8 +252,18 @@
dlmprintk("lockres %.*s\n", res->lockname.len, res->lockname.name);
if (!dlm_is_recovery_lock(past->name, past->namelen))
down_read(&dlm->recovery_sem);
+
spin_lock(&res->spinlock);
-
+ if (res->state & DLM_LOCK_RES_RECOVERING) {
+ dlmprintk0("responding with DLM_RECOVERING!\n");
+ ret = DLM_RECOVERING;
+ goto unlock_out;
+ }
+ if (res->state & DLM_LOCK_RES_MIGRATING) {
+ dlmprintk0("responding with DLM_MIGRATING!\n");
+ ret = DLM_MIGRATING;
+ goto unlock_out;
+ }
/* try convert queue for both ast/bast */
head = &res->converting;
lock = NULL;
@@ -271,10 +289,12 @@
"name=%.*s, namelen=%u\n",
past->type == DLM_AST ? "" : "b",
cookie, locklen, name, locklen);
+
+ ret = DLM_NORMAL;
+unlock_out:
spin_unlock(&res->spinlock);
if (!dlm_is_recovery_lock(past->name, past->namelen))
up_read(&dlm->recovery_sem);
- ret = DLM_NORMAL;
goto leave;
do_ast:
@@ -328,6 +348,7 @@
dlm_proxy_ast past;
struct iovec iov[2];
size_t iovlen = 1;
+ int status;
dlmprintk("res %.*s, to=%u, type=%d, blocked_type=%d\n",
res->lockname.len, res->lockname.name, lock->ml.node,
@@ -353,9 +374,27 @@
dlm_proxy_ast_to_net(&past);
ret = net_send_message_iov(DLM_PROXY_AST_MSG, dlm->key, iov, iovlen,
- lock->ml.node, NULL);
+ lock->ml.node, &status);
if (ret < 0)
dlmprintk("(%d) dlm_send_proxy_ast: returning %d\n",
current->pid, ret);
+ else {
+ if (status == DLM_RECOVERING) {
+ dlmprintk("sent AST to node %u, it thinks this "
+ "node is DEAD!\n", lock->ml.node);
+ dlmprintk0("must die now. goodbye!\n");
+ BUG();
+ } else if (status == DLM_MIGRATING) {
+ dlmprintk("sent AST to node %u, it returned "
+ "DLM_MIGRATING! evil!\n", lock->ml.node);
+ dlmprintk0("must die now. goodbye!\n");
+ BUG();
+ } else if (status != DLM_NORMAL) {
+ dlmprintk("AST to node %u returned %d!\n",
+ lock->ml.node, status);
+ /* ignore it */
+ }
+ ret = 0;
+ }
return ret;
}
Modified: trunk/fs/ocfs2/dlm/dlmconvert.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmconvert.c 2005-03-21 21:17:54 UTC (rev 2025)
+++ trunk/fs/ocfs2/dlm/dlmconvert.c 2005-03-21 22:23:34 UTC (rev 2026)
@@ -229,7 +229,7 @@
dlmprintk("type=%d, convert_type=%d, busy=%d\n", lock->ml.type,
lock->ml.convert_type, res->state & DLM_LOCK_RES_IN_PROGRESS);
-
+
spin_lock(&res->spinlock);
if (res->state & DLM_LOCK_RES_RECOVERING) {
status = DLM_RECOVERING;
@@ -336,6 +336,15 @@
if (tmpret >= 0) {
// successfully sent and received
ret = status; // this is already a dlm_status
+ if (ret == DLM_RECOVERING) {
+ dlmprintk("node %u returned DLM_RECOVERING "
+ "from convert message!\n",
+ res->owner);
+ } else if (ret == DLM_MIGRATING) {
+ dlmprintk("node %u returned DLM_MIGRATING "
+ "from convert message!\n",
+ res->owner);
+ }
} else {
dlmprintk("error occurred in net_send_message: %d\n",
tmpret);
@@ -397,6 +406,18 @@
goto leave;
spin_lock(&res->spinlock);
+ if (res->state & DLM_LOCK_RES_RECOVERING) {
+ spin_unlock(&res->spinlock);
+ dlmprintk0("returning DLM_RECOVERING\n");
+ status = DLM_RECOVERING;
+ goto leave;
+ }
+ if (res->state & DLM_LOCK_RES_MIGRATING) {
+ spin_unlock(&res->spinlock);
+ dlmprintk0("returning DLM_MIGRATING\n");
+ status = DLM_MIGRATING;
+ goto leave;
+ }
list_for_each(iter, &res->granted) {
lock = list_entry(iter, dlm_lock, list);
if (lock->ml.cookie == cnv->cookie &&
Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c 2005-03-21 21:17:54 UTC (rev 2025)
+++ trunk/fs/ocfs2/dlm/dlmlock.c 2005-03-21 22:23:34 UTC (rev 2026)
@@ -110,7 +110,15 @@
lock->lksb->status = DLM_NORMAL;
status = DLM_NORMAL;
list_add_tail(&lock->list, &res->granted);
- call_ast = 1;
+
+ /* for the recovery lock, we can't allow the ast
+ * to be queued since the dlmthread is already
+ * frozen. but the recovery lock is always locked
+ * with LKM_NOQUEUE so we do not need the ast in
+ * this special case */
+ if (!dlm_is_recovery_lock(res->lockname.name,
+ res->lockname.len))
+ call_ast = 1;
} else {
/* for NOQUEUE request, unless we get the
* lock right away, return DLM_NOTQUEUED */
@@ -127,7 +135,8 @@
dlm_queue_ast(dlm, lock);
dlm_lockres_calc_usage(dlm, res);
- dlm_kick_thread(dlm, res);
+ if (status == DLM_NORMAL)
+ dlm_kick_thread(dlm, res);
return status;
}
@@ -149,11 +158,6 @@
res->lockname.name, flags);
spin_lock(&res->spinlock);
- if (res->state & DLM_LOCK_RES_RECOVERING) {
- spin_unlock(&res->spinlock);
- status = DLM_RECOVERING;
- goto bail;
- }
/* will exit this call with spinlock held */
__dlm_wait_on_lockres(res);
@@ -238,6 +242,8 @@
newlock->bast = NULL;
newlock->astdata = NULL;
newlock->ml.cookie = cookie;
+ newlock->ast_pending = 0;
+ newlock->bast_pending = 0;
}
/* handler for lock creation net message
@@ -303,10 +309,23 @@
if (!res)
goto leave;
+ spin_lock(&res->spinlock);
+ if (res->state & DLM_LOCK_RES_RECOVERING) {
+ dlmprintk0("returning DLM_RECOVERING\n");
+ status = DLM_RECOVERING;
+ spin_unlock(&res->spinlock);
+ goto leave;
+ }
+ if (res->state & DLM_LOCK_RES_MIGRATING) {
+ dlmprintk0("returning DLM_MIGRATING\n");
+ status = DLM_MIGRATING;
+ spin_unlock(&res->spinlock);
+ goto leave;
+ }
+ spin_unlock(&res->spinlock);
+
newlock->lockres = res;
-
status = dlmlock_master(dlm, res, newlock, create->flags);
-
leave:
if (status != DLM_NORMAL) {
if (newlock)
Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c 2005-03-21 21:17:54 UTC (rev 2025)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c 2005-03-21 22:23:34 UTC (rev 2026)
@@ -96,7 +96,7 @@
extern spinlock_t dlm_domain_lock;
extern struct list_head dlm_domains;
-void dlm_dump_all_mles(void)
+void dlm_dump_all_mles(char *data, int len)
{
struct list_head *iter;
dlm_ctxt *dlm;
@@ -121,25 +121,28 @@
const char *name,
unsigned int namelen);
static void dlm_put_mle(dlm_master_list_entry *mle);
+static void __dlm_put_mle(dlm_master_list_entry *mle);
static int dlm_find_mle(dlm_ctxt *dlm, dlm_master_list_entry **mle,
char *name, unsigned int namelen);
static int dlm_do_master_request(dlm_master_list_entry *mle, int to);
-static dlm_lock_resource *dlm_new_lockres(dlm_ctxt *dlm,
- const char *name,
- unsigned int namelen);
-static void dlm_init_lockres(dlm_ctxt *dlm,
- dlm_lock_resource *res,
- const char *name,
- unsigned int namelen);
static int dlm_wait_for_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res,
dlm_master_list_entry *mle);
static int dlm_restart_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res,
dlm_master_list_entry *mle);
+static void dlm_add_migration_mle(dlm_ctxt *dlm,
+ dlm_lock_resource *res,
+ dlm_master_list_entry *mle,
+ dlm_master_list_entry **oldmle,
+ const char *name, unsigned int namelen,
+ u8 new_master, u8 master);
+static u8 dlm_pick_migration_target(dlm_ctxt *dlm, dlm_lock_resource *res);
+static void dlm_remove_nonlocal_locks(dlm_ctxt *dlm, dlm_lock_resource *res);
+
/*
* MASTER LIST FUNCTIONS
*/
@@ -186,8 +189,22 @@
spin_unlock(&dlm->spinlock);
}
+/* remove from list and free */
+static void __dlm_put_mle(dlm_master_list_entry *mle)
+{
+ dlm_ctxt *dlm;
+ DLM_ASSERT(mle);
+ DLM_ASSERT(mle->dlm);
+ dlm = mle->dlm;
-/* remove from list and free */
+ assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&dlm->master_lock);
+
+ kref_put(&mle->mle_refs, dlm_mle_release);
+}
+
+
+/* must not have any spinlocks coming in */
static void dlm_put_mle(dlm_master_list_entry *mle)
{
dlm_ctxt *dlm;
@@ -197,7 +214,7 @@
spin_lock(&dlm->spinlock);
spin_lock(&dlm->master_lock);
- kref_put(&mle->mle_refs, dlm_mle_release);
+ __dlm_put_mle(mle);
spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock);
}
@@ -228,13 +245,20 @@
kref_init(&mle->mle_refs, dlm_mle_release);
memset(mle->response_map, 0, sizeof(mle->response_map));
mle->master = NM_MAX_NODES;
+ mle->new_master = NM_MAX_NODES;
mle->error = 0;
- if (mle->type == DLM_MLE_MASTER)
+ if (mle->type == DLM_MLE_MASTER) {
+ DLM_ASSERT(res);
mle->u.res = res;
- else {
+ } else if (mle->type == DLM_MLE_BLOCK) {
+ DLM_ASSERT(name);
strncpy(mle->u.name.name, name, namelen);
mle->u.name.len = namelen;
+ } else /* DLM_MLE_MIGRATION */ {
+ DLM_ASSERT(name);
+ strncpy(mle->u.name.name, name, namelen);
+ mle->u.name.len = namelen;
}
/* copy off the node_map and register hb callbacks on our copy */
@@ -350,12 +374,11 @@
* LOCK RESOURCE FUNCTIONS
*/
-static inline void dlm_set_lockres_owner(dlm_ctxt *dlm,
- dlm_lock_resource *res,
- u8 owner)
+void dlm_set_lockres_owner(dlm_ctxt *dlm, dlm_lock_resource *res, u8 owner)
{
assert_spin_locked(&res->spinlock);
+ dlmprintk("setting owner to %u\n", owner);
if (owner == dlm->node_num)
atomic_inc(&dlm->local_resources);
else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
@@ -366,9 +389,7 @@
res->owner = owner;
}
-static inline void dlm_change_lockres_owner(dlm_ctxt *dlm,
- dlm_lock_resource *res,
- u8 owner)
+void dlm_change_lockres_owner(dlm_ctxt *dlm, dlm_lock_resource *res, u8 owner)
{
assert_spin_locked(&res->spinlock);
@@ -445,10 +466,8 @@
spin_unlock(&dlm->spinlock);
}
-static void dlm_init_lockres(dlm_ctxt *dlm,
- dlm_lock_resource *res,
- const char *name,
- unsigned int namelen)
+void dlm_init_lockres(dlm_ctxt *dlm, dlm_lock_resource *res,
+ const char *name, unsigned int namelen)
{
char *qname;
@@ -486,9 +505,9 @@
memset(res->lvb, 0, DLM_LVB_LEN);
}
-static dlm_lock_resource *dlm_new_lockres(dlm_ctxt *dlm,
- const char *name,
- unsigned int namelen)
+dlm_lock_resource *dlm_new_lockres(dlm_ctxt *dlm,
+ const char *name,
+ unsigned int namelen)
{
dlm_lock_resource *res;
@@ -599,6 +618,26 @@
dlmprintk0("eek! master entry for nonexistent "
"lock!\n");
BUG();
+ } else if (tmpmle->type == DLM_MLE_MIGRATION) {
+ /* migration is in progress! */
+ /* the good news is that we now know the
+ * "current" master (mle->master). */
+
+ spin_unlock(&dlm->master_lock);
+
+ /* set the lockres owner and hash it */
+ spin_lock(&dlm->spinlock);
+ spin_lock(&res->spinlock);
+ dlm_set_lockres_owner(dlm, res, tmpmle->master);
+ __dlm_insert_lock(dlm, res);
+ spin_unlock(&res->spinlock);
+ spin_unlock(&dlm->spinlock);
+
+ /* master is known, detach */
+ dlm_mle_detach_hb_events(dlm, tmpmle);
+ dlm_put_mle(tmpmle);
+
+ goto wake_waiters;
}
}
if (!blocked) {
@@ -806,6 +845,9 @@
memset(&request, 0, sizeof(request));
request.node_idx = dlm->node_num;
+
+ DLM_ASSERT(mle->type != DLM_MLE_MIGRATION);
+
if (mle->type == DLM_MLE_BLOCK) {
request.namelen = mle->u.name.len;
strncpy(request.name, mle->u.name.name, request.namelen);
@@ -916,6 +958,16 @@
/* take care of the easy cases up front */
spin_lock(&res->spinlock);
+ if (res->state & DLM_LOCK_RES_RECOVERING) {
+ spin_unlock(&res->spinlock);
+ dlmprintk0("returning DLM_MASTER_RESP_ERROR "
+ "since res is being recovered\n");
+ response = DLM_MASTER_RESP_ERROR;
+ if (mle)
+ kfree(mle);
+ goto send_response;
+ }
+
if (res->owner == dlm->node_num) {
spin_unlock(&res->spinlock);
// dlmprintk0("this node is the master\n");
@@ -929,6 +981,7 @@
* caused all nodes up to this one to
* create mles. this node now needs to
* go back and clean those up. */
+#warning this needs to move to the work queue
ret = dlm_do_assert_master(dlm, res->lockname.name,
res->lockname.len,
nodemap);
@@ -964,6 +1017,20 @@
// dlmprintk0("this node is waiting for "
// "lockres to be mastered\n");
response = DLM_MASTER_RESP_NO;
+ } else if (tmpmle->type == DLM_MLE_MIGRATION) {
+ dlmprintk("aha! node %u is master, but trying "
+ "to migrate to node %u.\n",
+ tmpmle->master, tmpmle->new_master);
+ if (tmpmle->master == dlm->node_num) {
+ response = DLM_MASTER_RESP_YES;
+ dlmprintk("no owner on lockres, but this node "
+ "is trying to migrate it to %u?!\n",
+ tmpmle->new_master);
+ BUG();
+ } else {
+ /* the real master can respond on its own */
+ response = DLM_MASTER_RESP_NO;
+ }
} else {
// dlmprintk0("this node is attempting to "
// "master lockres\n");
@@ -1021,7 +1088,17 @@
spin_lock(&tmpmle->spinlock);
if (tmpmle->type == DLM_MLE_BLOCK)
response = DLM_MASTER_RESP_NO;
- else
+ else if (tmpmle->type == DLM_MLE_MIGRATION) {
+ dlmprintk("migration mle was found (%u->%u)\n",
+ tmpmle->master, tmpmle->new_master);
+ if (tmpmle->master == dlm->node_num) {
+ dlmprintk0("no lockres, but migration mle "
+ "says that this node is master!\n");
+ BUG();
+ }
+ /* real master can respond on its own */
+ response = DLM_MASTER_RESP_NO;
+ } else
response = DLM_MASTER_RESP_MAYBE;
set_bit(request->node_idx, tmpmle->maybe_map);
spin_unlock(&tmpmle->spinlock);
@@ -1064,6 +1141,7 @@
/* note that if this nodemap is empty, it returns 0 */
dlm_node_iter_init(nodemap, &iter);
while ((to = dlm_node_iter_next(&iter)) >= 0) {
+ int r = 0;
// dlmprintk("sending assert master to %d\n", to);
memset(&assert, 0, sizeof(assert));
assert.node_idx = dlm->node_num;
@@ -1072,12 +1150,19 @@
dlm_assert_master_to_net(&assert);
tmpret = net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
- &assert, sizeof(assert), to, NULL);
+ &assert, sizeof(assert), to, &r);
if (tmpret < 0) {
// TODO
// dlmprintk("assert_master returned %d!\n", tmpret);
ret = tmpret;
break;
+ } else if (r < 0) {
+ /* nothing returns this yet */
+ /* ok, something horribly messed. kill thyself. */
+ dlmprintk("during assert master of %.*s to %u, "
+ "got %d. BYE BYE!\n",
+ namelen, lockname, to, r);
+ BUG();
}
}
@@ -1141,6 +1226,7 @@
res = __dlm_lookup_lock(dlm, name, namelen);
if (res) {
spin_lock(&res->spinlock);
+ DLM_ASSERT(!(res->state & DLM_LOCK_RES_RECOVERING));
if (!mle) {
if (res->owner != assert->node_idx) {
dlmprintk("EEEEeeEEeeEEEK! assert_master from "
@@ -1148,7 +1234,7 @@
assert->node_idx, res->owner);
BUG();
}
- } else {
+ } else if (mle->type != DLM_MLE_MIGRATION) {
if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
dlmprintk("EEEEEEEEEEEEEEEEEK!!! got "
"assert_master from node %u, but %u "
@@ -1163,6 +1249,16 @@
"in-progress!\n", assert->node_idx);
BUG();
}
+ } else /* mle->type == DLM_MLE_MIGRATION */ {
+ /* should only be getting an assert from new master */
+ if (assert->node_idx != mle->new_master) {
+ dlmprintk("migration: got assert from %u, but "
+ "new master is %u, and old master "
+ "was %u\n", assert->node_idx,
+ mle->new_master, mle->master);
+ BUG();
+ }
+
}
spin_unlock(&res->spinlock);
}
@@ -1176,7 +1272,14 @@
atomic_set(&mle->woken, 1);
wake_up(&mle->wq);
spin_unlock(&mle->spinlock);
-
+
+ if (mle->type == DLM_MLE_MIGRATION && res) {
+ dlmprintk0("finishing off migration of lockres\n");
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_MIGRATING;
+ dlm_change_lockres_owner(dlm, res, mle->new_master);
+ spin_unlock(&res->spinlock);
+ }
/* master is known, detach if not already detached */
dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle);
@@ -1191,3 +1294,617 @@
dlm_put(dlm);
return 0;
}
+
+
+int dlm_migrate_lockres(dlm_ctxt *dlm, dlm_lock_resource *res, u8 target)
+{
+ dlm_master_list_entry *mle = NULL;
+ dlm_master_list_entry *oldmle = NULL;
+ dlm_migratable_lockres *mres = NULL;
+ int ret = -EINVAL;
+ const char *name;
+ unsigned int namelen;
+ int mle_added = 0;
+ struct list_head *queue, *iter;
+ int i;
+ dlm_lock *lock;
+ int empty = 1;
+
+ if (!dlm_grab(dlm))
+ return -EINVAL;
+
+ name = res->lockname.name;
+ namelen = res->lockname.len;
+
+ dlmprintk("migrating %.*s to %u\n", namelen, name, target);
+
+ spin_lock(&res->spinlock);
+ if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
+ dlmprintk0("cannot migrate lockres with unknown owner!\n");
+ spin_unlock(&res->spinlock);
+ goto leave;
+ }
+ dlmprintk0("checking queues...\n");
+ queue = &res->granted;
+ for (i=0; i<3; i++) {
+ list_for_each(iter, queue) {
+ lock = list_entry (iter, dlm_lock, list);
+ DLM_ASSERT(lock);
+ empty = 0;
+ if (lock->ml.node == dlm->node_num) {
+ dlmprintk("found a lock owned by this node "
+ "still on the %s queue! will not "
+ "migrate this lockres\n",
+ i==0 ? "granted" :
+ (i==1 ? "converting" : "blocked"));
+ spin_unlock(&res->spinlock);
+ ret = -ENOTEMPTY;
+ goto leave;
+ }
+ }
+ queue++;
+ }
+ dlmprintk0("all locks on this lockres are nonlocal. continuing\n");
+ spin_unlock(&res->spinlock);
+
+ if (empty) {
+ dlmprintk0("no locks were found on this lockres! done!\n");
+ ret = 0;
+ goto leave;
+ }
+
+ /* preallocate.. if this fails, abort */
+ ret = -ENOMEM;
+ mres = (dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);
+ if (!mres) {
+ dlmprintk0("failed to get free page!\n");
+ goto leave;
+ }
+
+ mle = kmalloc(sizeof(dlm_master_list_entry) + namelen, GFP_KERNEL);
+ if (!mle) {
+ ret = -ENOMEM;
+ goto leave;
+ }
+ ret = 0;
+
+ dlmprintk0("picking a migration node\n");
+ spin_lock(&dlm->spinlock);
+ /* pick a new node */
+ if (!test_bit(target, dlm->domain_map) ||
+ target >= NM_MAX_NODES) {
+ target = dlm_pick_migration_target(dlm, res);
+ }
+ dlmprintk("node %u chosen for migration\n", target);
+
+ if (target >= NM_MAX_NODES ||
+ !test_bit(target, dlm->domain_map)) {
+ /* target chosen is not alive */
+ ret = -EINVAL;
+ goto unlock;
+ }
+
+ dlmprintk("continuing with target = %u\n", target);
+
+ /* clear any existing master requests and
+ * add the migration mle to the list */
+ spin_lock(&dlm->master_lock);
+ dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen,
+ target, dlm->node_num);
+ mle_added = 1;
+ spin_unlock(&dlm->master_lock);
+
+ /* set migrating flag on lockres */
+ spin_lock(&res->spinlock);
+ res->state |= DLM_LOCK_RES_MIGRATING;
+ spin_unlock(&res->spinlock);
+
+ /* flush the last of the pending asts */
+ ret = dlm_flush_lockres_asts(dlm, res);
+ if (ret < 0) {
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_MIGRATING;
+ spin_unlock(&res->spinlock);
+ }
+
+unlock:
+ spin_unlock(&dlm->spinlock);
+
+ if (oldmle) {
+ /* master is known, detach if not already detached */
+ dlm_mle_detach_hb_events(dlm, oldmle);
+ dlm_put_mle(oldmle);
+ }
+
+ if (ret < 0) {
+ if (mle_added) {
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+ } else if (mle) {
+ kfree(mle);
+ }
+ goto leave;
+ }
+
+ /*
+ * at this point, we have a migration target, an mle
+ * in the master list, and the MIGRATING flag set on
+ * the lockres
+ */
+
+
+ /* notify new node and send all lock state */
+ /* call send_one_lockres with migration flag.
+ * this serves as notice to the target node that a
+ * migration is starting. */
+ ret = dlm_send_one_lockres(dlm, res, mres, target,
+ DLM_MRES_MIGRATION);
+
+ if (ret < 0) {
+ dlmprintk("migration to node %u failed with %d\n",
+ target, ret);
+ /* migration failed, detach and clean up mle */
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+ goto leave;
+ }
+
+ /* at this point, the target sends a message to all nodes,
+ * (using dlm_do_migrate_request). this node is skipped since
+ * we had to put an mle in the list to begin the process. this
+ * node now waits for target to do an assert master. this node
+ * will be the last one notified, ensuring that the migration
+ * is complete everywhere. if the target dies while this is
+ * going on, some nodes could potentially see the target as the
+ * master, so it is important that my recovery finds the migration
+ * mle and sets the master to UNKNONWN. */
+
+
+ /* wait for new node to assert master */
+ while (1) {
+ ret = wait_event_interruptible_timeout(mle->wq,
+ (atomic_read(&mle->woken) == 1),
+ msecs_to_jiffies(5000));
+
+ if (ret >= 0) {
+ if (atomic_read(&mle->woken) == 1 ||
+ res->owner == target)
+ break;
+
+ dlmprintk0("timed out during migration\n");
+ }
+ if (ret == -EINTR) {
+ /* migration failed, detach and clean up mle */
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+ goto leave;
+ }
+ /* TODO: if node died: stop, clean up, return error */
+ }
+
+ /* all done, set the owner, clear the flag */
+ spin_lock(&res->spinlock);
+ dlm_set_lockres_owner(dlm, res, target);
+ res->state &= ~DLM_LOCK_RES_MIGRATING;
+ dlm_remove_nonlocal_locks(dlm, res);
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+
+ /* master is known, detach if not already detached */
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+ ret = 0;
+
+leave:
+ /* TODO: cleanup */
+ if (mres)
+ free_page((unsigned long)mres);
+
+ dlm_put(dlm);
+ dlmprintk("woo. returning %d\n", ret);
+ return ret;
+}
+EXPORT_SYMBOL(dlm_migrate_lockres);
+
+
+/* last step in the migration process.
+ * original master calls this to free all of the dlm_lock
+ * structures that used to be for other nodes. */
+static void dlm_remove_nonlocal_locks(dlm_ctxt *dlm, dlm_lock_resource *res)
+{
+ struct list_head *iter, *iter2;
+ struct list_head *queue = &res->granted;
+ int i;
+ dlm_lock *lock;
+
+ assert_spin_locked(&res->spinlock);
+
+ DLM_ASSERT(res->owner != dlm->node_num);
+
+ for (i=0; i<3; i++) {
+ list_for_each_safe(iter, iter2, queue) {
+ lock = list_entry (iter, dlm_lock, list);
+ DLM_ASSERT(lock);
+ if (lock->ml.node != dlm->node_num) {
+ dlmprintk("freeing lock for node %u\n",
+ lock->ml.node);
+ list_del(&lock->list);
+ dlm_lockres_put(dlm, lock->lockres);
+ DLM_ASSERT(lock->lksb);
+ kfree(lock->lksb);
+ kfree(lock);
+ }
+ }
+ queue++;
+ }
+}
+
+int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res)
+{
+#warning need to implement dlm_flush_lockres_asts
+ return 0;
+}
+
+/* for now this is not too intelligent. we will
+ * need stats to make this do the right thing.
+ * this just finds the first lock on one of the
+ * queues and uses that node as the target. */
+static u8 dlm_pick_migration_target(dlm_ctxt *dlm, dlm_lock_resource *res)
+{
+ int i;
+ struct list_head *queue = &res->granted;
+ struct list_head *iter;
+ dlm_lock *lock;
+ int nodenum;
+
+ assert_spin_locked(&dlm->spinlock);
+
+ spin_lock(&res->spinlock);
+ for (i=0; i<3; i++) {
+ list_for_each(iter, queue) {
+ /* up to the caller to make sure this node
+ * is alive */
+ lock = list_entry (iter, dlm_lock, list);
+ if (lock->ml.node != dlm->node_num) {
+ spin_unlock(&res->spinlock);
+ return lock->ml.node;
+ }
+ }
+ queue++;
+ }
+ spin_unlock(&res->spinlock);
+ dlmprintk0("have not found a suitable target yet! "
+ "checking domain map\n");
+
+ /* ok now we're getting desperate. pick anyone alive. */
+ nodenum = -1;
+ while (1) {
+ nodenum = find_next_bit(dlm->domain_map,
+ NM_MAX_NODES, nodenum+1);
+ dlmprintk("found %d in domain map\n", nodenum);
+ if (nodenum >= NM_MAX_NODES)
+ break;
+ if (nodenum != dlm->node_num) {
+ dlmprintk("aha. picking %d\n", nodenum);
+ return nodenum;
+ }
+ }
+
+ dlmprintk0("giving up. no master to migrate to\n");
+ return DLM_LOCK_RES_OWNER_UNKNOWN;
+}
+
+
+
+/* this is called by the new master once all lockres
+ * data has been received */
+int dlm_do_migrate_request(dlm_ctxt *dlm, dlm_lock_resource *res,
+ u8 master, u8 new_master, dlm_node_iter *iter)
+{
+ dlm_migrate_request migrate;
+ int ret, status = 0;
+ int nodenum;
+
+ memset(&migrate, 0, sizeof(migrate));
+ migrate.namelen = res->lockname.len;
+ strncpy(migrate.name, res->lockname.name, migrate.namelen);
+ migrate.new_master = new_master;
+ migrate.master = master;
+
+ dlm_migrate_request_to_net(&migrate);
+
+ ret = 0;
+
+ /* send message to all nodes, except the master and myself */
+ while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
+ if (nodenum == master ||
+ nodenum == new_master)
+ continue;
+
+ ret = net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
+ &migrate, sizeof(migrate), nodenum, &status);
+ if (ret < 0)
+ dlmprintk("net_send_message returned %d!\n", ret);
+ else if (status < 0) {
+ dlmprintk("migrate request (node %u) returned %d!\n",
+ nodenum, status);
+ ret = status;
+ }
+ }
+
+ if (ret < 0)
+ dlmprintk("nasty error occurred. %d\n", ret);
+ dlmprintk("returning ret=%d\n", ret);
+ return ret;
+}
+
+
+/* if there is an existing mle for this lockres, we now know who the master is.
+ * (the one who sent us *this* message) we can clear it up right away.
+ * since the process that put the mle on the list still has a reference to it,
+ * we can unhash it now, set the master and wake the process. as a result,
+ * we will have no mle in the list to start with. now we can add an mle for
+ * the migration and this should be the only one found for those scanning the
+ * list. */
+int dlm_migrate_request_handler(net_msg *msg, u32 len, void *data)
+{
+ dlm_ctxt *dlm = data;
+ dlm_lock_resource *res;
+ dlm_migrate_request *migrate = (dlm_migrate_request *) msg->buf;
+ dlm_master_list_entry *mle = NULL, *oldmle = NULL;
+ const char *name;
+ unsigned int namelen;
+ int ret = 0;
+
+ if (!dlm_grab(dlm))
+ return -EINVAL;
+
+ dlm_migrate_request_to_host(migrate);
+ name = migrate->name;
+ namelen = migrate->namelen;
+
+ /* preallocate.. if this fails, abort */
+ mle = kmalloc(sizeof(dlm_master_list_entry) + namelen, GFP_KERNEL);
+ if (!mle) {
+ ret = -ENOMEM;
+ goto leave;
+ }
+
+ /* check for pre-existing lock */
+ spin_lock(&dlm->spinlock);
+ res = __dlm_lookup_lock(dlm, name, namelen);
+ spin_lock(&dlm->master_lock);
+
+ if (res) {
+ spin_lock(&res->spinlock);
+ if (res->state & DLM_LOCK_RES_RECOVERING) {
+ /* if all is working ok, this can only mean that we got
+ * a migrate request from a node that we now see as
+ * dead. what can we do here? drop it to the floor? */
+ spin_unlock(&res->spinlock);
+ dlmprintk0("grrrr. got a migrate request, but the "
+ "lockres is marked as recovering!");
+ kfree(mle);
+ ret = -EINVAL; /* need a better solution */
+ goto leave;
+ }
+ res->state |= DLM_LOCK_RES_MIGRATING;
+ spin_unlock(&res->spinlock);
+ }
+
+ dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen,
+ migrate->new_master, migrate->master);
+
+ spin_unlock(&dlm->master_lock);
+ spin_unlock(&dlm->spinlock);
+
+ if (oldmle) {
+ /* master is known, detach if not already detached */
+ dlm_mle_detach_hb_events(dlm, oldmle);
+ dlm_put_mle(oldmle);
+ }
+
+leave:
+ dlm_put(dlm);
+ return ret;
+}
+
+/* must be holding dlm->spinlock and dlm->master_lock
+ * when adding a migration mle, we can clear any other mles
+ * in the master list because we know with certainty that
+ * the master is "master". so we remove any old mle from
+ * the list after setting it's master field, and then add
+ * the new migration mle. this way we can hold with the rule
+ * of having only one mle for a given lock name at all times. */
+static void dlm_add_migration_mle(dlm_ctxt *dlm,
+ dlm_lock_resource *res,
+ dlm_master_list_entry *mle,
+ dlm_master_list_entry **oldmle,
+ const char *name, unsigned int namelen,
+ u8 new_master, u8 master)
+{
+ int found;
+
+ *oldmle = NULL;
+
+ dlmprintk0("\n");
+
+ found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
+ if (found) {
+ dlm_master_list_entry *tmp = *oldmle;
+ /* this is essentially what assert_master does */
+ spin_lock(&tmp->spinlock);
+ tmp->master = master;
+ atomic_set(&tmp->woken, 1);
+ wake_up(&tmp->wq);
+ /* remove it from the list so that only one
+ * mle will be found */
+ list_del(&tmp->list);
+ INIT_LIST_HEAD(&tmp->list);
+ spin_unlock(&tmp->spinlock);
+ }
+
+ /* now add a migration mle to the tail of the list */
+ dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
+ mle->new_master = new_master;
+ mle->master = master;
+ /* do this for consistency with other mle types */
+ set_bit(new_master, mle->maybe_map);
+ list_add(&mle->list, &dlm->master_list);
+}
+
+
+void dlm_clean_master_list(dlm_ctxt *dlm, u8 dead_node)
+{
+ struct list_head *iter, *iter2;
+ int bit;
+ dlm_master_list_entry *mle;
+ dlm_lock_resource *res;
+
+top:
+ assert_spin_locked(&dlm->spinlock);
+
+ /* clean the master list */
+ spin_lock(&dlm->master_lock);
+ list_for_each_safe(iter, iter2, &dlm->master_list) {
+ mle = list_entry(iter, dlm_master_list_entry, list);
+
+ DLM_ASSERT((mle->type == DLM_MLE_BLOCK) ||
+ (mle->type == DLM_MLE_MASTER) ||
+ (mle->type == DLM_MLE_MIGRATION));
+
+ /* MASTER mles are initiated locally. the waiting
+ * process will notice the node map change
+ * shortly. let that happen as normal. */
+ if (mle->type == DLM_MLE_MASTER)
+ continue;
+
+ bit = find_next_bit(mle->maybe_map, NM_MAX_NODES, 0);
+
+ /* BLOCK mles are initiated by other nodes.
+ * need to clean up if the dead node would have
+ * been the master. */
+ if (mle->type == DLM_MLE_BLOCK &&
+ bit != dead_node)
+ continue;
+
+ /* the rule for MIGRATION mles is that the master
+ * becomes UNKNOWN if *either* the original or
+ * the new master dies. all UNKNOWN lockreses
+ * are sent to whichever node becomes the recovery
+ * master. the new master is responsible for
+ * determining if there is still a master for
+ * this lockres, or if he needs to take over
+ * mastery. either way, this node should expect
+ * another message to resolve this. */
+ if (mle->type == DLM_MLE_MIGRATION &&
+ mle->master != dead_node &&
+ mle->new_master != dead_node)
+ continue;
+
+ /* if we have reached this point, this mle needs to
+ * be removed from the list and freed. */
+
+ /* unlinking list_head while in list_for_each_safe */
+ list_del_init(&mle->list);
+ atomic_set(&mle->woken, 1);
+ wake_up(&mle->wq);
+
+ if (mle->type == DLM_MLE_MIGRATION) {
+ dlmprintk("node %u died during migration from "
+ "%u to %u!\n", dead_node,
+ mle->master, mle->new_master);
+ /* if there is a lockres associated with this
+ * mle, find it and set its owner to UNKNOWN */
+ res = __dlm_lookup_lock(dlm, mle->u.name.name,
+ mle->u.name.len);
+ if (res) {
+ /* unfortunately if we hit this rare case, our
+ * lock ordering is messed. we need to drop
+ * the master lock so that we can take the
+ * lockres lock, meaning that we will have to
+ * restart from the head of list. */
+ spin_unlock(&dlm->master_lock);
+
+ /* move lockres onto recovery list */
+ spin_lock(&res->spinlock);
+ dlm_set_lockres_owner(dlm, res,
+ DLM_LOCK_RES_OWNER_UNKNOWN);
+ dlm_move_lockres_to_recovery_list(dlm, res);
+ spin_unlock(&res->spinlock);
+ __dlm_lockres_put(dlm, res);
+
+ /* dump the mle */
+ spin_lock(&dlm->master_lock);
+ __dlm_put_mle(mle);
+ spin_unlock(&dlm->master_lock);
+
+ /* restart */
+ goto top;
+ }
+ }
+
+ /* this may be the last reference */
+ __dlm_put_mle(mle);
+ }
+ spin_unlock(&dlm->master_lock);
+}
+
+
+int dlm_finish_migration(dlm_ctxt *dlm, dlm_lock_resource *res, u8 old_master)
+{
+ dlm_node_iter iter;
+ int ret = 0;
+
+ spin_lock(&dlm->spinlock);
+ dlm_node_iter_init(dlm->domain_map, &iter);
+ clear_bit(old_master, iter.node_map);
+ clear_bit(dlm->node_num, iter.node_map);
+ spin_unlock(&dlm->spinlock);
+
+ dlmprintk0("now time to do a migrate request to other nodes\n");
+ ret = dlm_do_migrate_request(dlm, res, old_master,
+ dlm->node_num, &iter);
+ if (ret < 0) {
+ dlmprintk("error %d\n", ret);
+ goto leave;
+ }
+
+retry:
+ dlmprintk0("doing assert master to all except the original node\n");
+ ret = dlm_do_assert_master(dlm, res->lockname.name,
+ res->lockname.len, iter.node_map);
+ if (ret < 0) {
+ dlmprintk("bad news. assert master returned %d "
+ "while trying to finish migration. retry?\n",
+ ret);
+ /* maybe we can be saved by updating the domain map */
+ spin_lock(&dlm->spinlock);
+ dlm_node_iter_init(dlm->domain_map, &iter);
+ clear_bit(old_master, iter.node_map);
+ clear_bit(dlm->node_num, iter.node_map);
+ spin_unlock(&dlm->spinlock);
+ goto retry;
+ }
+
+ memset(iter.node_map, 0, sizeof(iter.node_map));
+ set_bit(old_master, iter.node_map);
+ dlmprintk("doing assert master back to %u\n", old_master);
+ ret = dlm_do_assert_master(dlm, res->lockname.name,
+ res->lockname.len, iter.node_map);
+ if (ret < 0) {
+ dlmprintk("assert master to original master failed "
+ "with %d.\n", ret);
+ /* the only nonzero status here would be because of
+ * a dead original node. we're done. */
+ }
+
+ /* all done, set the owner, clear the flag */
+ spin_lock(&res->spinlock);
+ dlm_set_lockres_owner(dlm, res, dlm->node_num);
+ res->state &= ~DLM_LOCK_RES_MIGRATING;
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+leave:
+ return ret;
+}
Modified: trunk/fs/ocfs2/dlm/dlmmod.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.c 2005-03-21 21:17:54 UTC (rev 2025)
+++ trunk/fs/ocfs2/dlm/dlmmod.c 2005-03-21 22:23:34 UTC (rev 2026)
@@ -64,10 +64,11 @@
static int dlm_read_params(void);
static void __exit dlm_driver_exit (void);
-static void dlm_dump_all_lock_resources(void);
+static void dlm_dump_all_lock_resources(char *data, int len);
static void dlm_dump_lock_resources(dlm_ctxt *dlm);
static void dlm_dump_purge_list(dlm_ctxt *dlm);
-static void dlm_dump_all_purge_lists(void);
+static void dlm_dump_all_purge_lists(char *data, int len);
+static void dlm_trigger_migration(char *data, int len);
static int dlm_query_join_handler(net_msg *msg, u32 len, void *data);
static int dlm_assert_joined_handler(net_msg *msg, u32 len, void *data);
@@ -85,7 +86,7 @@
static LIST_HEAD(dlm_join_handlers);
-typedef void (dlm_debug_func_t)(void);
+typedef void (dlm_debug_func_t)(char *data, int len);
typedef struct _dlm_debug_funcs
{
@@ -94,17 +95,18 @@
} dlm_debug_funcs;
-
static dlm_debug_funcs dlm_debug_map[] = {
{ 'r', dlm_dump_all_lock_resources },
#ifdef DLM_MLE_DEBUG
{ 'm', dlm_dump_all_mles },
#endif
- { 'p', dlm_dump_all_purge_lists },
+ { 'p', dlm_dump_all_purge_lists },
+ { 'M', dlm_trigger_migration },
};
static int dlm_debug_map_sz = (sizeof(dlm_debug_map) /
sizeof(dlm_debug_funcs));
+
static ssize_t write_dlm_debug(struct file *file, const char __user *buf,
size_t count, loff_t *ppos)
{
@@ -121,10 +123,11 @@
return -EFAULT;
for (i=0; i < dlm_debug_map_sz; i++) {
- if (c == dlm_debug_map[i].key) {
- fn = dlm_debug_map[i].func;
+ dlm_debug_funcs *d = &dlm_debug_map[i];
+ if (c == d->key) {
+ fn = d->func;
if (fn)
- (fn)();
+ (fn)((char *)buf, count);
break;
}
}
@@ -299,13 +302,23 @@
goto error;
}
res = dlm_lockres_grab(dlm, lock->lockres);
+retry_convert:
down_read(&dlm->recovery_sem);
if (res->owner == dlm->node_num)
status = dlmconvert_master(dlm, res, lock, flags, mode);
else
status = dlmconvert_remote(dlm, res, lock, flags, mode);
-
+ if (status == DLM_RECOVERING || status == DLM_MIGRATING) {
+ /* for now, see how this works without sleeping
+ * and just retry right away. I suspect the reco
+ * or migration will complete fast enough that
+ * no waiting will be necessary */
+ dlmprintk0("retrying convert with migration or "
+ "recovery in progress\n");
+ up_read(&dlm->recovery_sem);
+ goto retry_convert;
+ }
} else {
/* LOCK request */
status = DLM_BADARGS;
@@ -335,6 +348,7 @@
dlmprintk("type=%d, flags = 0x%x\n", mode, flags);
dlmprintk("creating lock: lock=%p res=%p\n", lock, res);
+#warning move this into dlm_init_lock
memset(lock, 0, sizeof(dlm_lock));
INIT_LIST_HEAD(&lock->list);
INIT_LIST_HEAD(&lock->ast_list);
@@ -349,9 +363,12 @@
lock->bast = bast;
lock->astdata = data;
lock->lksb = lksb;
+ lock->ast_pending = 0;
+ lock->bast_pending = 0;
dlm_get_next_cookie(lock->ml.node, &lock->ml.cookie);
+retry_lock:
if (flags & LKM_VALBLK) {
dlmprintk("LKM_VALBLK passed by caller\n");
@@ -370,6 +387,14 @@
else
status = dlmlock_remote(dlm, res, lock, flags);
+ if (status == DLM_RECOVERING || status == DLM_MIGRATING) {
+ dlmprintk0("retrying lock with migration or "
+ "recovery in progress\n");
+ up_read(&dlm->recovery_sem);
+ down_read(&dlm->recovery_sem);
+ goto retry_lock;
+ }
+
if (status != DLM_NORMAL) {
lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
goto up_error;
@@ -431,6 +456,7 @@
DLM_ASSERT(lock);
DLM_ASSERT(res);
+retry:
dlmprintk("lock=%p res=%p\n", lock, res);
if (res->owner == dlm->node_num) {
@@ -445,6 +471,12 @@
"call_ast is %d\n", status, call_ast);
}
+ if (status == DLM_RECOVERING ||
+ status == DLM_MIGRATING) {
+ dlmprintk0("retrying unlock due to pending recovery "
+ "or migration\n");
+ goto retry;
+ }
if (call_ast) {
dlmprintk("calling unlockast(%p, %d)\n",
data, lksb->status);
@@ -691,6 +723,7 @@
dlm_unregister_domain_handlers(dlm);
dlm_complete_thread(dlm);
+ dlm_complete_recovery_thread(dlm);
/* We've left the domain. Now we can take ourselves out of the
* list and allow the kref stuff to help us free the
@@ -705,8 +738,37 @@
static void dlm_migrate_all_locks(dlm_ctxt *dlm)
{
+ int i, ret;
+ dlm_lock_resource *res;
+ struct list_head *iter;
+
dlmprintk("Migrating locks from domain %s\n", dlm->name);
- /* TODO: Migrate locks here. */
+ spin_lock(&dlm->spinlock);
+ for (i=0; i<DLM_HASH_SIZE; i++) {
+ while (!list_empty(&dlm->resources[i])) {
+ res = list_entry(dlm->resources[i].next,
+ dlm_lock_resource, list);
+ /* this should unhash the lockres
+ * and exit with dlm->spinlock */
+ dlmprintk("purging res=%p\n", res);
+ if (res->state & DLM_LOCK_RES_DIRTY ||
+ !list_empty(&res->dirty)) {
+ dlmprintk0("this is probably a bug, dirty\n");
+ /* HACK! this should absolutely go.
+ * need to figure out why some empty
+ * lockreses are still marked dirty */
+ dlm_shuffle_lists(dlm, res);
+ spin_lock(&res->spinlock);
+ list_del_init(&res->dirty);
+ res->state &= ~DLM_LOCK_RES_DIRTY;
+ spin_unlock(&res->spinlock);
+ }
+ dlm_purge_lockres(dlm, res);
+ }
+ }
+ spin_unlock(&dlm->spinlock);
+
+ dlmprintk("DONE Migrating locks from domain %s\n", dlm->name);
}
static int dlm_no_joining_node(dlm_ctxt *dlm)
@@ -1207,6 +1269,8 @@
struct domain_join_ctxt *ctxt;
enum dlm_query_join_response response;
+ dlmprintk0("\n");
+
ctxt = kmalloc(sizeof(struct domain_join_ctxt), GFP_KERNEL);
if (!ctxt) {
dlmprintk("No memory for domain_join_ctxt\n");
@@ -1286,6 +1350,7 @@
kfree(ctxt);
}
+ dlmprintk("returning %d\n", status);
return status;
}
@@ -1361,6 +1426,56 @@
if (status)
goto bail;
+ status = net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
+ sizeof(dlm_migrate_request),
+ dlm_migrate_request_handler,
+ dlm, &dlm->dlm_domain_handlers);
+ if (status)
+ goto bail;
+
+ status = net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
+ DLM_MIG_LOCKRES_MAX_LEN,
+ dlm_mig_lockres_handler,
+ dlm, &dlm->dlm_domain_handlers);
+ if (status)
+ goto bail;
+
+ status = net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
+ sizeof(dlm_master_requery),
+ dlm_master_requery_handler,
+ dlm, &dlm->dlm_domain_handlers);
+ if (status)
+ goto bail;
+
+ status = net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
+ sizeof(dlm_lock_request),
+ dlm_request_all_locks_handler,
+ dlm, &dlm->dlm_domain_handlers);
+ if (status)
+ goto bail;
+
+ status = net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
+ sizeof(dlm_reco_data_done),
+ dlm_reco_data_done_handler,
+ dlm, &dlm->dlm_domain_handlers);
+ if (status)
+ goto bail;
+
+ status = net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
+ sizeof(dlm_begin_reco),
+ dlm_begin_reco_handler,
+ dlm, &dlm->dlm_domain_handlers);
+ if (status)
+ goto bail;
+
+ status = net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
+ sizeof(dlm_finalize_reco),
+ dlm_finalize_reco_handler,
+ dlm, &dlm->dlm_domain_handlers);
+ if (status)
+ goto bail;
+
+
bail:
if (status)
dlm_unregister_domain_handlers(dlm);
@@ -1382,12 +1497,16 @@
goto bail;
}
+
+
status = dlm_launch_thread(dlm);
if (status < 0) {
dlmprintk("could not launch dlm thread!\n");
goto bail;
}
+
+
do {
status = dlm_try_to_join_domain(dlm);
@@ -1409,6 +1528,13 @@
goto bail;
}
+ status = dlm_launch_recovery_thread(dlm);
+ if (status < 0) {
+ dlmprintk("could not launch dlm recovery thread!\n");
+ goto bail;
+ }
+
+
spin_lock(&dlm_domain_lock);
dlm->num_joins++;
dlm->dlm_state = DLM_CTXT_JOINED;
@@ -1465,14 +1591,24 @@
INIT_LIST_HEAD(&dlm->dirty_list);
INIT_LIST_HEAD(&dlm->reco.resources);
INIT_LIST_HEAD(&dlm->reco.received);
+ INIT_LIST_HEAD(&dlm->reco.node_data);
INIT_LIST_HEAD(&dlm->purge_list);
INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
INIT_LIST_HEAD(&dlm->pending_asts);
INIT_LIST_HEAD(&dlm->pending_basts);
+ dlmprintk("dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
+ dlm->recovery_map, &(dlm->recovery_map[0]));
+
+ memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
+ memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
+ memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
+
dlm->dlm_thread_task = NULL;
+ dlm->dlm_reco_thread_task = NULL;
init_waitqueue_head(&dlm->dlm_thread_wq);
+ init_waitqueue_head(&dlm->dlm_reco_thread_wq);
INIT_LIST_HEAD(&dlm->master_list);
INIT_LIST_HEAD(&dlm->mle_hb_events);
init_rwsem(&dlm->recovery_sem);
@@ -1482,12 +1618,14 @@
dlm->reco.new_master = NM_INVALID_SLOT_NUM;
dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
- dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
- dlm->reco.next_seq = 0;
atomic_set(&dlm->local_resources, 0);
atomic_set(&dlm->remote_resources, 0);
atomic_set(&dlm->unknown_resources, 0);
+ spin_lock_init(&dlm->work_lock);
+ INIT_LIST_HEAD(&dlm->work_list);
+ INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work, dlm);
+
kref_init(&dlm->dlm_refs, dlm_ctxt_release);
dlm->dlm_state = DLM_CTXT_NEW;
@@ -1498,6 +1636,41 @@
return dlm;
}
+void dlm_dispatch_work(void *data)
+{
+ dlm_ctxt *dlm = (dlm_ctxt *)data;
+ LIST_HEAD(tmp_list);
+ struct list_head *iter, *iter2;
+ dlm_work_item *item;
+ dlm_workfunc_t *workfunc;
+
+ DLM_ASSERT(dlm);
+
+ spin_lock(&dlm->work_lock);
+ list_splice_init(&dlm->work_list, &tmp_list);
+ spin_unlock(&dlm->work_lock);
+
+ list_for_each_safe(iter, iter2, &tmp_list) {
+ item = list_entry(iter, dlm_work_item, list);
+ DLM_ASSERT(item);
+ workfunc = item->func;
+ list_del_init(&item->list);
+
+ /* already have ref on dlm to avoid having
+ * it disappear. just double-check. */
+ DLM_ASSERT(item->dlm == dlm);
+ DLM_ASSERT(workfunc);
+
+ /* this is allowed to sleep and
+ * call network stuff */
+ workfunc(item, item->data);
+
+ dlm_put(dlm);
+ kfree(item);
+ }
+}
+
+
/*
* dlm_register_domain: one-time setup per "domain"
*/
@@ -1599,7 +1772,7 @@
current->state = TASK_RUNNING;
}
-static void dlm_dump_all_lock_resources(void)
+static void dlm_dump_all_lock_resources(char *data, int len)
{
dlm_ctxt *dlm;
struct list_head *iter;
@@ -1695,7 +1868,7 @@
spin_unlock(&dlm->spinlock);
}
-static void dlm_dump_all_purge_lists(void)
+static void dlm_dump_all_purge_lists(char *data, int len)
{
dlm_ctxt *dlm;
struct list_head *iter;
@@ -1708,5 +1881,100 @@
spin_unlock(&dlm_domain_lock);
}
+static void dlm_trigger_migration(char *data, int len)
+{
+ dlm_lock_resource *res;
+ dlm_ctxt *dlm;
+ char *resname;
+ char *domainname;
+ char *tmp, *buf = NULL;
+
+ if (len >= PAGE_SIZE) {
+ printk("user passed too much data: %d bytes\n", len);
+ return;
+ }
+ if (len < 5) {
+ printk("user passed too little data: %d bytes\n", len);
+ return;
+ }
+ buf = kmalloc(len+1, GFP_KERNEL);
+ if (!buf) {
+ printk("could not alloc %d bytes\n", len);
+ return;
+ }
+ if (strncpy_from_user(buf, data, len) < len) {
+ printk("failed to get all user data. done.\n");
+ goto leave;
+ }
+ buf[len]='\0';
+ dlmprintk("got this data from user: %s\n", buf);
+
+ tmp = buf;
+ if (*tmp != 'M') {
+ printk("bad data\n");
+ goto leave;
+ }
+ tmp++;
+ if (*tmp != ' ') {
+ printk("bad data\n");
+ goto leave;
+ }
+ tmp++;
+ domainname = tmp;
+
+ while (*tmp) {
+ if (*tmp == ' ')
+ break;
+ tmp++;
+ }
+ if (!*tmp || !*(tmp+1)) {
+ printk("bad data\n");
+ goto leave;
+ }
+
+ *tmp = '\0'; // null term the domainname
+ tmp++;
+ resname = tmp;
+ while (*tmp) {
+ if (*tmp == '\n' ||
+ *tmp == ' ' ||
+ *tmp == '\r') {
+ *tmp = '\0';
+ break;
+ }
+ tmp++;
+ }
+
+ printk("now looking up domain %s, lockres %s\n",
+ domainname, resname);
+ spin_lock(&dlm_domain_lock);
+ dlm = __dlm_lookup_domain(domainname);
+ spin_unlock(&dlm_domain_lock);
+
+ if (!dlm_grab(dlm)) {
+ printk("bad dlm!\n");
+ goto leave;
+ }
+
+ res = dlm_lookup_lock(dlm, resname, strlen(resname));
+ if (!res) {
+ printk("bad lockres!\n");
+ dlm_put(dlm);
+ goto leave;
+ }
+
+ printk("woo! found dlm=%p, lockres=%p\n", dlm, res);
+ {
+ int ret;
+ ret = dlm_migrate_lockres(dlm, res, NM_MAX_NODES);
+ printk("dlm_migrate_lockres returned %d\n", ret);
+ }
+ dlm_lockres_put(dlm, res);
+ dlm_put(dlm);
+
+leave:
+ kfree(buf);
+}
+
module_init (dlm_driver_entry);
module_exit (dlm_driver_exit);
Modified: trunk/fs/ocfs2/dlm/dlmmod.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.h 2005-03-21 21:17:54 UTC (rev 2025)
+++ trunk/fs/ocfs2/dlm/dlmmod.h 2005-03-21 22:23:34 UTC (rev 2026)
@@ -29,7 +29,10 @@
#include <linux/kref.h>
-#if 0
+// #undef USE_DLMPRINTK
+#define USE_DLMPRINTK 1
+
+#ifndef USE_DLMPRINTK
#define dlmprintk(x, arg...)
#define dlmprintk0(x)
#else
@@ -176,8 +179,10 @@
DLM_NO_CONTROL_DEVICE, /* 38: Cannot set options on opened device */
DLM_MAXSTATS, /* 39: upper limit for return code validation */
- DLM_RECOVERING /* 40: our lame addition to allow caller to fail a lock
+ DLM_RECOVERING, /* 40: our lame addition to allow caller to fail a lock
request if it is being recovered */
+ DLM_MIGRATING, /* 40: our lame addition to allow caller to fail a lock
+ request if it is being migrated */
} dlm_status;
@@ -186,10 +191,9 @@
{
struct list_head resources;
struct list_head received; // list of dlm_reco_lock_infos received from other nodes during recovery
+ struct list_head node_data;
u8 new_master;
u8 dead_node;
- u8 sending_node;
- u32 next_seq;
unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
} dlm_recovery_ctxt;
@@ -237,11 +241,66 @@
struct hb_callback_func dlm_hb_up;
struct hb_callback_func dlm_hb_down;
struct task_struct *dlm_thread_task;
+ struct task_struct *dlm_reco_thread_task;
wait_queue_head_t dlm_thread_wq;
+ wait_queue_head_t dlm_reco_thread_wq;
+ struct work_struct dispatched_work;
+ struct list_head work_list;
+ spinlock_t work_lock;
struct list_head dlm_domain_handlers;
};
+
+/* these keventd work queue items are for less-frequently
+ * called functions that cannot be directly called from the
+ * net message handlers for some reason, usually because
+ * they need to send net messages of their own. */
+void dlm_dispatch_work(void *data);
+
+typedef struct _dlm_lock_resource dlm_lock_resource;
+typedef struct _dlm_work_item dlm_work_item;
+
+typedef void (dlm_workfunc_t)(dlm_work_item *, void *);
+
+typedef struct _dlm_request_all_locks_priv
+{
+ u8 reco_master;
+ u8 dead_node;
+} dlm_request_all_locks_priv;
+
+typedef struct _dlm_mig_lockres_priv
+{
+ dlm_lock_resource *lockres;
+ u8 real_master;
+} dlm_mig_lockres_priv;
+
+struct _dlm_work_item
+{
+ struct list_head list;
+ dlm_workfunc_t *func;
+ dlm_ctxt *dlm;
+ void *data;
+ union {
+ dlm_request_all_locks_priv ral;
+ dlm_mig_lockres_priv ml;
+ } u;
+};
+
+static inline void dlm_init_work_item(dlm_ctxt *dlm, dlm_work_item *i,
+ dlm_workfunc_t *f, void *data)
+{
+ DLM_ASSERT(i);
+ DLM_ASSERT(f);
+ memset(i, 0, sizeof(dlm_work_item));
+ i->func = f;
+ INIT_LIST_HEAD(&i->list);
+ i->data = data;
+ i->dlm = dlm; /* must have already done a dlm_grab on this! */
+}
+
+
+
static inline void __dlm_set_joining_node(struct _dlm_ctxt *dlm,
u8 node)
{
@@ -256,10 +315,11 @@
#define DLM_LOCK_RES_READY 0x00000004
#define DLM_LOCK_RES_DIRTY 0x00000008
#define DLM_LOCK_RES_IN_PROGRESS 0x00000010
+#define DLM_LOCK_RES_MIGRATING 0x00000020
#define DLM_PURGE_INTERVAL_MS (8 * 1000)
-typedef struct _dlm_lock_resource
+struct _dlm_lock_resource
{
/* WARNING: Please see the comment in dlm_init_lockres before
* adding fields here. */
@@ -286,7 +346,7 @@
u16 state;
struct qstr lockname;
char lvb[DLM_LVB_LEN];
-} dlm_lock_resource;
+};
typedef void (dlm_astlockfunc_t)(void *);
typedef void (dlm_bastlockfunc_t)(void *, int);
@@ -325,6 +385,8 @@
dlm_bastlockfunc_t *bast;
void *astdata;
dlm_lockstatus *lksb;
+ unsigned ast_pending:1,
+ bast_pending:1;
} dlm_lock;
@@ -346,7 +408,8 @@
enum dlm_mle_type {
DLM_MLE_BLOCK,
- DLM_MLE_MASTER
+ DLM_MLE_MASTER,
+ DLM_MLE_MIGRATION
};
typedef struct _dlm_lock_name
@@ -370,6 +433,7 @@
unsigned long response_map[BITS_TO_LONGS(NM_MAX_NODES)];
unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
u8 master;
+ u8 new_master;
u8 error;
enum dlm_mle_type type; // BLOCK or MASTER
union {
@@ -380,6 +444,11 @@
struct hb_callback_func mle_hb_down;
} dlm_master_list_entry;
+typedef struct _dlm_node_iter
+{
+ unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
+ int curnode;
+} dlm_node_iter;
#define DLM_MASTER_REQUEST_MSG 500
@@ -389,14 +458,20 @@
#define DLM_CONVERT_LOCK_MSG 504
#define DLM_PROXY_AST_MSG 505
#define DLM_UNLOCK_LOCK_MSG 506
-
-#define DLM_RECO_NODE_DATA_MSG 507
-
+#define DLM_UNUSED_MSG2 507
+#define DLM_MIGRATE_REQUEST_MSG 508
+#define DLM_MIG_LOCKRES_MSG 509
#define DLM_QUERY_JOIN_MSG 510
#define DLM_ASSERT_JOINED_MSG 511
#define DLM_CANCEL_JOIN_MSG 512
#define DLM_EXIT_DOMAIN_MSG 513
+#define DLM_MASTER_REQUERY_MSG 514
+#define DLM_LOCK_REQUEST_MSG 515
+#define DLM_RECO_DATA_DONE_MSG 516
+#define DLM_BEGIN_RECO_MSG 517
+#define DLM_FINALIZE_RECO_MSG 518
+
typedef struct _dlm_reco_node_data
{
int state;
@@ -431,32 +506,71 @@
u8 node_idx;
u8 namelen;
u16 pad1;
- u32 pad2;
+ u32 flags;
u8 name[NM_MAX_NAME_LEN];
} dlm_master_request;
-typedef struct _dlm_master_request_resp
+typedef struct _dlm_assert_master
{
u8 node_idx;
- u8 response;
u8 namelen;
+ u16 pad1;
+ u32 flags;
+
+ u8 name[NM_MAX_NAME_LEN];
+} dlm_assert_master;
+
+typedef struct _dlm_migrate_request
+{
+ u8 master;
+ u8 new_master;
+ u8 namelen;
u8 pad1;
u32 pad2;
-
u8 name[NM_MAX_NAME_LEN];
-} dlm_master_request_resp;
+} dlm_migrate_request;
-typedef struct _dlm_assert_master
+typedef struct _dlm_master_requery
{
+ u8 pad1;
+ u8 pad2;
u8 node_idx;
u8 namelen;
- u16 pad1;
- u32 pad2;
-
+ u32 pad3;
u8 name[NM_MAX_NAME_LEN];
-} dlm_assert_master;
+} dlm_master_requery;
+#define DLM_MRES_RECOVERY 0x01
+#define DLM_MRES_MIGRATION 0x02
+#define DLM_MRES_ALL_DONE 0x04
+
+// NET_MAX_PAYLOAD_BYTES is roughly 4080
+// 240 * 16 = 3840
+// 3840 + 112 = 3952 bytes
+// leaves us about 128 bytes
+#define DLM_MAX_MIGRATABLE_LOCKS 240
+
+typedef struct _dlm_migratable_lockres
+{
+ u8 master;
+ u8 lockname_len;
+ u8 num_locks; // locks sent in this structure
+ u8 flags;
+ u32 total_locks; // locks to be sent for this migration cookie
+ u64 mig_cookie; // cookie for this lockres migration
+ // or zero if not needed
+ // 16 bytes
+ u8 lockname[DLM_LOCKID_NAME_MAX];
+ // 48 bytes
+ u8 lvb[DLM_LVB_LEN];
+ // 112 bytes
+ dlm_migratable_lock ml[0]; // 16 bytes each, begins at byte 112
+} dlm_migratable_lockres;
+#define DLM_MIG_LOCKRES_MAX_LEN (sizeof(dlm_migratable_lockres) + \
+ (sizeof(dlm_migratable_lock) * \
+ DLM_MAX_MIGRATABLE_LOCKS) )
+
typedef struct _dlm_create_lock
{
u64 cookie;
@@ -524,6 +638,36 @@
JOIN_OK_NO_MAP,
};
+typedef struct _dlm_lock_request
+{
+ u8 node_idx;
+ u8 dead_node;
+ u16 pad1;
+ u32 pad2;
+} dlm_lock_request;
+
+typedef struct _dlm_reco_data_done
+{
+ u8 node_idx;
+ u8 dead_node;
+ u16 pad1;
+ u32 pad2;
+
+ /* unused for now */
+ /* eventually we can use this to attempt
+ * lvb recovery based on each node's info */
+ u8 reco_lvb[DLM_LVB_LEN];
+} dlm_reco_data_done;
+
+typedef struct _dlm_begin_reco
+{
+ u8 node_idx;
+ u8 dead_node;
+ u16 pad1;
+ u32 pad2;
+} dlm_begin_reco;
+
+
typedef struct _dlm_query_join_request
{
u8 node_idx;
@@ -554,6 +698,15 @@
u8 pad1[3];
} dlm_exit_domain;
+typedef struct _dlm_finalize_reco
+{
+ u8 node_idx;
+ u8 dead_node;
+ u16 pad1;
+ u32 pad2;
+} dlm_finalize_reco;
+
+
static inline void dlm_query_join_request_to_net(dlm_query_join_request *m)
{
/* do nothing */
@@ -588,27 +741,36 @@
}
static inline void dlm_master_request_to_net(dlm_master_request *m)
{
- /* do nothing */
+ m->flags = htonl(m->flags);
}
static inline void dlm_master_request_to_host(dlm_master_request *m)
{
- /* do nothing */
+ m->flags = ntohl(m->flags);
}
-static inline void dlm_master_request_resp_to_net(dlm_master_request_resp *m)
+static inline void dlm_assert_master_to_net(dlm_assert_master *m)
{
+ m->flags = htonl(m->flags);
+}
+static inline void dlm_assert_master_to_host(dlm_assert_master *m)
+{
+ m->flags = ntohl(m->flags);
+}
+
+static inline void dlm_migrate_request_to_net(dlm_migrate_request *m)
+{
/* do nothing */
}
-static inline void dlm_master_request_resp_to_host(dlm_master_request_resp *m)
+static inline void dlm_migrate_request_to_host(dlm_migrate_request *m)
{
/* do nothing */
}
-static inline void dlm_assert_master_to_net(dlm_assert_master *m)
+static inline void dlm_master_requery_to_net(dlm_master_requery *m)
{
/* do nothing */
}
-static inline void dlm_assert_master_to_host(dlm_assert_master *m)
+static inline void dlm_master_requery_to_host(dlm_master_requery *m)
{
/* do nothing */
}
@@ -656,8 +818,78 @@
a->cookie = be64_to_cpu(a->cookie);
a->flags = ntohl(a->flags);
}
+static inline void dlm_migratable_lock_to_net(dlm_migratable_lock *ml)
+{
+ ml->cookie = cpu_to_be64(ml->cookie);
+}
+static inline void dlm_migratable_lock_to_host(dlm_migratable_lock *ml)
+{
+ ml->cookie = be64_to_cpu(ml->cookie);
+}
+static inline void dlm_lock_request_to_net(dlm_lock_request *r)
+{
+ /* do nothing */
+}
+static inline void dlm_lock_request_to_host(dlm_lock_request *r)
+{
+ /* do nothing */
+}
+static inline void dlm_reco_data_done_to_net(dlm_reco_data_done *r)
+{
+ /* do nothing */
+}
+static inline void dlm_reco_data_done_to_host(dlm_reco_data_done *r)
+{
+ /* do nothing */
+}
+static inline void dlm_begin_reco_to_net(dlm_begin_reco *r)
+{
+ /* do nothing */
+}
+static inline void dlm_begin_reco_to_host(dlm_begin_reco *r)
+{
+ /* do nothing */
+}
+static inline void dlm_finalize_reco_to_net(dlm_finalize_reco *f)
+{
+ /* do nothing */
+}
+static inline void dlm_finalize_reco_to_host(dlm_finalize_reco *f)
+{
+ /* do nothing */
+}
+static inline void dlm_migratable_lockres_to_net(dlm_migratable_lockres *mr)
+{
+ int i, nr = mr->total_locks;
+
+ DLM_ASSERT(nr >= 0);
+ DLM_ASSERT(nr <= DLM_MAX_MIGRATABLE_LOCKS);
+
+ mr->total_locks = htonl(mr->total_locks);
+ mr->mig_cookie = cpu_to_be64(mr->mig_cookie);
+
+ for (i=0; i<nr; i++)
+ dlm_migratable_lock_to_net(&(mr->ml[i]));
+}
+
+static inline void dlm_migratable_lockres_to_host(dlm_migratable_lockres *mr)
+{
+ int i, nr;
+
+ mr->total_locks = ntohl(mr->total_locks);
+ mr->mig_cookie = be64_to_cpu(mr->mig_cookie);
+
+ nr = mr->total_locks;
+ DLM_ASSERT(nr >= 0);
+ DLM_ASSERT(nr <= DLM_MAX_MIGRATABLE_LOCKS);
+
+ for (i=0; i<nr; i++)
+ dlm_migratable_lock_to_host(&(mr->ml[i]));
+}
+
+
int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
@@ -672,7 +904,10 @@
void dlm_thread_run_lock_resources(dlm_ctxt *dlm);
int dlm_launch_thread(dlm_ctxt *dlm);
void dlm_complete_thread(dlm_ctxt *dlm);
+int dlm_launch_recovery_thread(dlm_ctxt *dlm);
+void dlm_complete_recovery_thread(dlm_ctxt *dlm);
void dlm_flush_asts(dlm_ctxt *dlm);
+int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res);
dlm_status dlmlock(dlm_ctxt *dlm,
int mode,
@@ -725,8 +960,11 @@
dlm_ctxt *dlm_grab(dlm_ctxt *dlm);
int dlm_domain_fully_joined(dlm_ctxt *dlm);
+int __dlm_lockres_unused(dlm_lock_resource *res);
+void __dlm_lockres_calc_usage(dlm_ctxt *dlm, dlm_lock_resource *res);
void dlm_lockres_calc_usage(dlm_ctxt *dlm,
dlm_lock_resource *res);
+void dlm_purge_lockres(dlm_ctxt *dlm, dlm_lock_resource *lockres);
void __dlm_lockres_get(dlm_lock_resource *res);
dlm_lock_resource *dlm_lockres_grab(dlm_ctxt *dlm,
@@ -745,9 +983,15 @@
dlm_lock_resource * dlm_lookup_lock(dlm_ctxt *dlm,
const char *name,
unsigned int len);
+
+void dlm_change_lockres_owner(dlm_ctxt *dlm, dlm_lock_resource *res, u8 owner);
+void dlm_set_lockres_owner(dlm_ctxt *dlm, dlm_lock_resource *res, u8 owner);
dlm_lock_resource * dlm_get_lock_resource(dlm_ctxt *dlm,
const char *lockid,
int flags);
+dlm_lock_resource *dlm_new_lockres(dlm_ctxt *dlm,
+ const char *name,
+ unsigned int namelen);
int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
int dlm_refresh_lock_resource(dlm_ctxt *dlm, dlm_lock_resource *res);
@@ -787,19 +1031,38 @@
int dlm_hb_node_dead(dlm_ctxt *dlm, int node);
int __dlm_hb_node_dead(dlm_ctxt *dlm, int node);
+int dlm_migrate_lockres(dlm_ctxt *dlm, dlm_lock_resource *res, u8 target);
+int dlm_finish_migration(dlm_ctxt *dlm, dlm_lock_resource *res, u8 old_master);
+
int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
int dlm_master_request_handler(net_msg *msg, u32 len, void *data);
-int dlm_master_request_resp_handler(net_msg *msg, u32 len, void *data);
int dlm_assert_master_handler(net_msg *msg, u32 len, void *data);
+int dlm_migrate_request_handler(net_msg *msg, u32 len, void *data);
+int dlm_mig_lockres_handler(net_msg *msg, u32 len, void *data);
+int dlm_master_requery_handler(net_msg *msg, u32 len, void *data);
+int dlm_request_all_locks_handler(net_msg *msg, u32 len, void *data);
+int dlm_reco_data_done_handler(net_msg *msg, u32 len, void *data);
+int dlm_begin_reco_handler(net_msg *msg, u32 len, void *data);
+int dlm_finalize_reco_handler(net_msg *msg, u32 len, void *data);
+
+int dlm_send_one_lockres(dlm_ctxt *dlm, dlm_lock_resource *res,
+ dlm_migratable_lockres *mres,
+ u8 send_to, u8 flags);
+void dlm_move_lockres_to_recovery_list(dlm_ctxt *dlm, dlm_lock_resource *res);
+void dlm_init_lockres(dlm_ctxt *dlm, dlm_lock_resource *res,
+ const char *name, unsigned int namelen);
+
/* will exit holding res->spinlock, but may drop in function */
void __dlm_wait_on_lockres_flags(dlm_lock_resource *res, int flags);
/* will exit holding res->spinlock, but may drop in function */
static inline void __dlm_wait_on_lockres(dlm_lock_resource *res)
{
- __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_IN_PROGRESS);
+ __dlm_wait_on_lockres_flags(res, (DLM_LOCK_RES_IN_PROGRESS|
+ DLM_LOCK_RES_RECOVERING|
+ DLM_LOCK_RES_MIGRATING));
}
void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie);
@@ -810,12 +1073,15 @@
struct nm_node *node, int idx);
int dlm_do_assert_master(dlm_ctxt *dlm, const char *lockname,
unsigned int namelen, void *nodemap);
+int dlm_do_migrate_request(dlm_ctxt *dlm, dlm_lock_resource *res,
+ u8 master, u8 new_master, dlm_node_iter *iter);
+void dlm_clean_master_list(dlm_ctxt *dlm, u8 dead_node);
#define DLM_MLE_DEBUG 1
#ifdef DLM_MLE_DEBUG
-void dlm_dump_all_mles(void);
+void dlm_dump_all_mles(char *data, int len);
#endif
@@ -874,7 +1140,8 @@
if (dlm != mle->dlm)
return 0;
- if (mle->type == DLM_MLE_BLOCK) {
+ if (mle->type == DLM_MLE_BLOCK ||
+ mle->type == DLM_MLE_MIGRATION) {
if (namelen != mle->u.name.len ||
strncmp(name, mle->u.name.name, namelen)!=0)
return 0;
@@ -903,11 +1170,6 @@
return ret;
}
-typedef struct _dlm_node_iter
-{
- unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
- int curnode;
-} dlm_node_iter;
static inline void dlm_node_iter_init(unsigned long *map, dlm_node_iter *iter)
{
Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-03-21 21:17:54 UTC (rev 2025)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-03-21 22:23:34 UTC (rev 2026)
@@ -37,6 +37,8 @@
#include <linux/blkdev.h>
#include <linux/socket.h>
#include <linux/inet.h>
+#include <linux/timer.h>
+#include <linux/kthread.h>
#include "cluster/heartbeat.h"
@@ -48,21 +50,1388 @@
static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node);
-int dlm_recovery_thread(void *data);
+static int dlm_recovery_thread(void *data);
void dlm_complete_recovery_thread(dlm_ctxt *dlm);
int dlm_launch_recovery_thread(dlm_ctxt *dlm);
void dlm_kick_recovery_thread(dlm_ctxt *dlm);
-
-u8 dlm_pick_recovery_master(dlm_ctxt *dlm, u8 *new_dead_node);
-static int dlm_remaster_locks_local(dlm_ctxt *dlm);
-int dlm_init_recovery_area(dlm_ctxt *dlm);
+int dlm_do_recovery(dlm_ctxt *dlm);
+
+int dlm_pick_recovery_master(dlm_ctxt *dlm);
+static int dlm_remaster_locks(dlm_ctxt *dlm, u8 dead_node);
+int dlm_init_recovery_area(dlm_ctxt *dlm, u8 dead_node);
int dlm_request_all_locks(dlm_ctxt *dlm, u8 request_from, u8 dead_node);
void dlm_destroy_recovery_area(dlm_ctxt *dlm, u8 dead_node);
-#define DLM_RECOVERY_THREAD_MS 2000
+static inline int dlm_num_locks_in_lockres(dlm_lock_resource *res);
+static void dlm_init_migratable_lockres(dlm_migratable_lockres *mres,
+ const char *lockname, int namelen,
+ int total_locks, u64 cookie,
+ u8 flags, u8 master);
+static int dlm_send_mig_lockres_msg(dlm_ctxt *dlm,
+ dlm_migratable_lockres *mres,
+ u8 send_to,
+ dlm_lock_resource *res,
+ int total_locks);
+static int dlm_lockres_master_requery(dlm_ctxt *dlm, dlm_lock_resource *res,
+ u8 *real_master);
+static int dlm_process_recovery_data(dlm_ctxt *dlm, dlm_lock_resource *res,
+ dlm_migratable_lockres *mres);
+static int dlm_do_master_requery(dlm_ctxt *dlm, dlm_lock_resource *res,
+ u8 nodenum, u8 *real_master);
+static int dlm_send_finalize_reco_message(dlm_ctxt *dlm);
+static int dlm_send_all_done_msg(dlm_ctxt *dlm, u8 dead_node, u8 send_to);
+static int dlm_send_begin_reco_message(dlm_ctxt *dlm, u8 dead_node);
+static void dlm_move_reco_locks_to_list(dlm_ctxt *dlm, struct list_head *list,
+ u8 dead_node);
+static void dlm_finish_local_lockres_recovery(dlm_ctxt *dlm, u8 dead_node,
+ u8 new_master);
+static void dlm_reco_ast(void *astdata);
+static void dlm_reco_bast(void *astdata, int blocked_type);
+static void dlm_reco_unlock_ast(void *astdata, dlm_status st);
+static void dlm_request_all_locks_worker(dlm_work_item *item, void *data);
+static void dlm_mig_lockres_worker(dlm_work_item *item, void *data);
+static u64 dlm_get_next_mig_cookie(void);
+static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED;
+static spinlock_t dlm_mig_cookie_lock = SPIN_LOCK_UNLOCKED;
+static u64 dlm_mig_cookie = 1;
+static u64 dlm_get_next_mig_cookie(void)
+{
+ u64 c;
+ spin_lock(&dlm_mig_cookie_lock);
+ c = dlm_mig_cookie;
+ if (dlm_mig_cookie == (~0ULL))
+ dlm_mig_cookie = 1;
+ else
+ dlm_mig_cookie++;
+ spin_unlock(&dlm_mig_cookie_lock);
+ return c;
+}
+
+static inline void dlm_reset_recovery(dlm_ctxt *dlm)
+{
+ spin_lock(&dlm->spinlock);
+ clear_bit(dlm->reco.dead_node, dlm->recovery_map);
+ dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
+ dlm->reco.new_master = NM_INVALID_SLOT_NUM;
+ spin_unlock(&dlm->spinlock);
+}
+
+
+
+
+/*
+ * RECOVERY THREAD
+ */
+
+void dlm_kick_recovery_thread(dlm_ctxt *dlm)
+{
+ /* wake the recovery thread
+ * this will wake the reco thread in one of three places
+ * 1) sleeping with no recovery happening
+ * 2) sleeping with recovery mastered elsewhere
+ * 3) recovery mastered here, waiting on reco data */
+
+ wake_up(&dlm->dlm_reco_thread_wq);
+}
+
+/* Launch the recovery thread */
+int dlm_launch_recovery_thread(dlm_ctxt *dlm)
+{
+ dlmprintk0("starting dlm recovery thread...\n");
+
+ dlm->dlm_reco_thread_task = kthread_run(dlm_recovery_thread, dlm,
+ "dlm_reco_thread");
+ if (IS_ERR(dlm->dlm_reco_thread_task)) {
+ dlm->dlm_reco_thread_task = NULL;
+ dlmprintk("unable to launch dlm recovery thread, error=%ld",
+ PTR_ERR(dlm->dlm_reco_thread_task));
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+void dlm_complete_recovery_thread(dlm_ctxt *dlm)
+{
+ if (dlm->dlm_reco_thread_task) {
+ dlmprintk0("waiting for dlm recovery thread to exit\n");
+ kthread_stop(dlm->dlm_reco_thread_task);
+ dlm->dlm_reco_thread_task = NULL;
+ }
+}
+
+
+
+/*
+ * this is lame, but here's how recovery works...
+ * 1) all recovery threads cluster wide will work on recovering
+ * ONE node at a time
+ * 2) negotiate who will take over all the locks for the dead node.
+ * thats right... ALL the locks.
+ * 3) once a new master is chosen, everyone scans all locks
+ * and moves aside those mastered by the dead guy
+ * 4) each of these locks should be locked until recovery is done
+ * 5) the new master collects up all of secondary lock queue info
+ * one lock at a time, forcing each node to communicate back
+ * before continuing
+ * 6) each secondary lock queue responds with the full known lock info
+ * 7) once the new master has run all its locks, it sends a ALLDONE!
+ * message to everyone
+ * 8) upon receiving this message, the secondary queue node unlocks
+ * and responds to the ALLDONE
+ * 9) once the new master gets responses from everyone, he unlocks
+ * everything and recovery for this dead node is done
+ *10) go back to 2) while there are still dead nodes
+ *
+ */
+
+
+#define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000)
+
+static int dlm_recovery_thread(void *data)
+{
+ int status;
+ dlm_ctxt *dlm = data;
+ unsigned long timeout = msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS);
+
+ dlmprintk("dlm thread running for %s...\n", dlm->name);
+
+ while (!kthread_should_stop()) {
+ status = dlm_do_recovery(dlm);
+ if (status == -EAGAIN) {
+ /* do not sleep, recheck immediately. */
+ continue;
+ }
+ if (status < 0)
+ dlmprintk("dlm_do_recovery returned %d\n", status);
+ wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq,
+ kthread_should_stop(),
+ timeout);
+ }
+
+ dlmprintk0("quitting DLM recovery thread\n");
+ return 0;
+}
+
+
+int dlm_do_recovery(dlm_ctxt *dlm)
+{
+ int status = 0;
+
+ spin_lock(&dlm->spinlock);
+
+ /* check to see if the new master has died */
+ if (dlm->reco.new_master != NM_INVALID_SLOT_NUM &&
+ test_bit(dlm->reco.new_master, dlm->recovery_map)) {
+ dlmprintk("new master %u died while recovering %u!\n",
+ dlm->reco.new_master, dlm->reco.dead_node);
+ /* unset the new_master, leave dead_node */
+ dlm->reco.new_master = NM_INVALID_SLOT_NUM;
+ }
+
+ /* select a target to recover */
+ if (dlm->reco.dead_node == NM_INVALID_SLOT_NUM) {
+ int bit;
+
+ bit = find_next_bit (dlm->recovery_map, NM_MAX_NODES+1, 0);
+ if (bit >= NM_MAX_NODES || bit < 0)
+ dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
+ else
+ dlm->reco.dead_node = bit;
+ } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
+ /* BUG? */
+ dlmprintk("dead_node %u no longer in recovery map!\n",
+ dlm->reco.dead_node);
+ dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
+ }
+
+ if (dlm->reco.dead_node == NM_INVALID_SLOT_NUM) {
+ // dlmprintk0("nothing to recover! sleeping now!\n");
+ spin_unlock(&dlm->spinlock);
+ /* return to main thread loop and sleep. */
+ return 0;
+ }
+ dlmprintk("recovery thread found node %u in the recovery map!\n",
+ dlm->reco.dead_node);
+ spin_unlock(&dlm->spinlock);
+
+ /* take write barrier */
+ /* (stops the list reshuffling thread, proxy ast handling) */
+ down_write(&dlm->recovery_sem);
+
+ if (dlm->reco.new_master == dlm->node_num)
+ goto master_here;
+
+ if (dlm->reco.new_master == NM_INVALID_SLOT_NUM) {
+ /* choose a new master */
+ if (!dlm_pick_recovery_master(dlm)) {
+ /* already notified everyone. go. */
+ dlm->reco.new_master = dlm->node_num;
+ goto master_here;
+ }
+ dlmprintk0("another node will master this "
+ "recovery session. wait.\n");
+ } else {
+ dlmprintk("RECOVERY! new_master=%u, this node=%u, "
+ "dead_node=%u\n", dlm->reco.new_master,
+ dlm->node_num, dlm->reco.dead_node);
+ }
+
+ /* it is safe to start everything back up here
+ * because all of the dead node's lock resources
+ * have been marked as in-recovery */
+ up_write(&dlm->recovery_sem);
+
+ /* sleep out in main dlm_recovery_thread loop. */
+ return 0;
+
+master_here:
+ dlmprintk("RECOVERY! mastering recovery of %u HERE!\n",
+ dlm->reco.dead_node);
+
+ status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
+ if (status < 0) {
+ dlmprintk("error remastering locks for node %u!!!! "
+ "retrying!\n", dlm->reco.dead_node);
+ } else {
+ /* success! see if any other nodes need recovery */
+ dlm_reset_recovery(dlm);
+ }
+ up_write(&dlm->recovery_sem);
+
+ /* continue and look for another dead node */
+ return -EAGAIN;
+}
+
+static int dlm_remaster_locks(dlm_ctxt *dlm, u8 dead_node)
+{
+ int status = 0;
+ dlm_reco_node_data *ndata;
+ struct list_head *iter;
+ int all_nodes_done;
+ int destroy = 0;
+ int pass = 0;
+
+/* +- if this node is the new master, init the temp recovery area */
+/* |- poll each live node for lock state */
+/* |- collect the data from each node until node says it's done, or dead */
+/* +--- if node died, throw away temp recovery area, keep new_master and dead_node, goto "select a target" */
+/* |- apply all temp area changes to real lock */
+/* +- send ALL DONE message to each node */
+
+ status = dlm_init_recovery_area(dlm, dead_node);
+ if (status < 0)
+ goto leave;
+
+ /* safe to access the node data list without a lock, since this
+ * process is the only one to change the list */
+ list_for_each(iter, &dlm->reco.node_data) {
+ ndata = list_entry (iter, dlm_reco_node_data, list);
+ DLM_ASSERT(ndata->state == DLM_RECO_NODE_DATA_INIT);
+ ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
+
+ dlmprintk("requesting lock info from node %u\n",
+ ndata->node_num);
+
+ if (ndata->node_num == dlm->node_num) {
+ ndata->state = DLM_RECO_NODE_DATA_DONE;
+ continue;
+ }
+
+ status = dlm_request_all_locks(dlm, ndata->node_num, dead_node);
+ if (status < 0) {
+ destroy = 1;
+ goto leave;
+ }
+
+ switch (ndata->state) {
+ case DLM_RECO_NODE_DATA_INIT:
+ case DLM_RECO_NODE_DATA_FINALIZE_SENT:
+ case DLM_RECO_NODE_DATA_REQUESTED:
+ DLM_ASSERT(0);
+ break;
+ case DLM_RECO_NODE_DATA_DEAD:
+ dlmprintk("eek. node %u died after requesting recovery info for node %u\n",
+ ndata->node_num, dead_node);
+ // start all over
+ destroy = 1;
+ status = -EAGAIN;
+ goto leave;
+ case DLM_RECO_NODE_DATA_REQUESTING:
+ ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
+ dlmprintk("now receiving recovery data from node %u for dead node %u\n",
+ ndata->node_num, dead_node);
+ break;
+ case DLM_RECO_NODE_DATA_RECEIVING:
+ dlmprintk("already receiving recovery data from node %u for dead node %u\n",
+ ndata->node_num, dead_node);
+ break;
+ case DLM_RECO_NODE_DATA_DONE:
+ dlmprintk("already DONE receiving recovery data from node %u for dead node %u\n",
+ ndata->node_num, dead_node);
+ break;
+ }
+ }
+
+ dlmprintk0("done requesting all lock info\n");
+
+ /* nodes should be sending reco data now
+ * just need to wait */
+
+ while (1) {
+ /* check all the nodes now to see if we are
+ * done, or if anyone died */
+ all_nodes_done = 1;
+ spin_lock(&dlm_reco_state_lock);
+ list_for_each(iter, &dlm->reco.node_data) {
+ ndata = list_entry (iter, dlm_reco_node_data, list);
+
+ dlmprintk("checking reco state of node %u\n",
+ ndata->node_num);
+ switch (ndata->state) {
+ case DLM_RECO_NODE_DATA_INIT:
+ case DLM_RECO_NODE_DATA_REQUESTING:
+ dlmprintk("bad ndata state for node %u:"
+ " state=%d\n",
+ ndata->node_num,
+ ndata->state);
+ BUG();
+ break;
+ case DLM_RECO_NODE_DATA_DEAD:
+ dlmprintk("eek. node %u died after requesting recovery info for node %u\n",
+ ndata->node_num, dead_node);
+ spin_unlock(&dlm_reco_state_lock);
+ // start all over
+ destroy = 1;
+ status = -EAGAIN;
+ goto leave;
+ case DLM_RECO_NODE_DATA_RECEIVING:
+ case DLM_RECO_NODE_DATA_REQUESTED:
+ all_nodes_done = 0;
+ break;
+ case DLM_RECO_NODE_DATA_DONE:
+ break;
+ case DLM_RECO_NODE_DATA_FINALIZE_SENT:
+ break;
+ }
+ }
+ spin_unlock(&dlm_reco_state_lock);
+
+ dlmprintk("pass #%d, all_nodes_done?: %s\n",
+ ++pass, all_nodes_done?"yes":"no");
+ if (all_nodes_done) {
+ int ret;
+
+ /* all nodes are now in DLM_RECO_NODE_DATA_DONE state
+ * just send a finalize message to everyone and
+ * clean up */
+ dlmprintk0("all nodes are done! send finalize\n");
+ ret = dlm_send_finalize_reco_message(dlm);
+ if (ret < 0) {
+ dlmprintk("dlm_send_finalize_reco_message "
+ "returned %d\n", ret);
+ }
+ spin_lock(&dlm->spinlock);
+ dlm_finish_local_lockres_recovery(dlm, dead_node,
+ dlm->node_num);
+ spin_unlock(&dlm->spinlock);
+ dlmprintk0("should be done with recovery!\n");
+ destroy = 1;
+ status = ret;
+ break;
+ }
+ /* wait to be signalled, with periodic timeout
+ * to check for node death */
+ wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq,
+ kthread_should_stop(),
+ msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS));
+
+ }
+
+leave:
+ if (destroy)
+ dlm_destroy_recovery_area(dlm, dead_node);
+ dlmprintk("returning status=%d\n", status);
+ return status;
+}
+
+int dlm_init_recovery_area(dlm_ctxt *dlm, u8 dead_node)
+{
+ int num=0;
+ dlm_reco_node_data *ndata;
+
+ spin_lock(&dlm->spinlock);
+ memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map));
+ /* nodes can only be removed (by dying) after dropping
+ * this lock, and death will be trapped later, so this should do */
+ spin_unlock(&dlm->spinlock);
+
+ while (1) {
+ num = find_next_bit (dlm->reco.node_map, NM_MAX_NODES, num);
+ if (num >= NM_MAX_NODES) {
+ break;
+ }
+ DLM_ASSERT(num != dead_node);
+
+ ndata = kmalloc(sizeof(dlm_reco_node_data), GFP_KERNEL);
+ if (!ndata) {
+ dlm_destroy_recovery_area(dlm, dead_node);
+ return -ENOMEM;
+ }
+ memset(ndata, 0, sizeof(dlm_reco_node_data));
+ ndata->node_num = num;
+ ndata->state = DLM_RECO_NODE_DATA_INIT;
+ INIT_LIST_HEAD(&ndata->granted);
+ INIT_LIST_HEAD(&ndata->converting);
+ INIT_LIST_HEAD(&ndata->blocked);
+ spin_lock(&dlm_reco_state_lock);
+ list_add_tail(&ndata->list, &dlm->reco.node_data);
+ spin_unlock(&dlm_reco_state_lock);
+ num++;
+ }
+
+ return 0;
+}
+
+void dlm_destroy_recovery_area(dlm_ctxt *dlm, u8 dead_node)
+{
+ struct list_head *iter, *iter2;
+ dlm_reco_node_data *ndata;
+ LIST_HEAD(tmplist);
+
+ spin_lock(&dlm_reco_state_lock);
+ list_splice_init(&dlm->reco.node_data, &tmplist);
+ spin_unlock(&dlm_reco_state_lock);
+
+#warning this probably needs to be smarter
+ list_for_each_safe(iter, iter2, &tmplist) {
+ ndata = list_entry (iter, dlm_reco_node_data, list);
+ kfree(ndata);
+ }
+}
+
+int dlm_request_all_locks(dlm_ctxt *dlm, u8 request_from, u8 dead_node)
+{
+ dlm_lock_request lr;
+ dlm_status ret;
+
+ dlmprintk0("\n");
+
+
+ dlmprintk("dlm_request_all_locks: dead node is %u, sending request "
+ "to %u\n", dead_node, request_from);
+
+ memset(&lr, 0, sizeof(lr));
+ lr.node_idx = dlm->node_num;
+ lr.dead_node = dead_node;
+
+ // send message
+ ret = DLM_NOLOCKMGR;
+ dlm_lock_request_to_net(&lr);
+ ret = net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key,
+ &lr, sizeof(lr),
+ request_from, NULL);
+ if (ret < 0)
+ dlmprintk("error occurred in net_send_message: %d\n", ret);
+
+ // return from here, then
+ // sleep until all received or error
+ return ret;
+
+}
+
+int dlm_request_all_locks_handler(net_msg *msg, u32 len, void *data)
+{
+ dlm_ctxt *dlm = data;
+ dlm_lock_request *lr = (dlm_lock_request *)msg->buf;
+ char *buf = NULL;
+ dlm_work_item *item = NULL;
+
+ if (!dlm_grab(dlm))
+ return -EINVAL;
+
+ dlm_lock_request_to_host(lr);
+ DLM_ASSERT(dlm);
+ DLM_ASSERT(lr->dead_node == dlm->reco.dead_node);
+
+ item = (dlm_work_item *)kmalloc(sizeof(dlm_work_item), GFP_KERNEL);
+ if (!item) {
+ dlm_put(dlm);
+ return -ENOMEM;
+ }
+
+ /* this will get freed by dlm_request_all_locks_worker */
+ buf = (char *) __get_free_page(GFP_KERNEL);
+ if (!buf) {
+ kfree(item);
+ dlm_put(dlm);
+ return -ENOMEM;
+ }
+
+ /* queue up work for dlm_request_all_locks_worker */
+ memset(item, 0, sizeof(dlm_work_item));
+ dlm_grab(dlm); /* get an extra ref for the work item */
+ dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf);
+ item->u.ral.reco_master = lr->node_idx;
+ item->u.ral.dead_node = lr->dead_node;
+ spin_lock(&dlm->work_lock);
+ list_add_tail(&item->list, &dlm->work_list);
+ spin_unlock(&dlm->work_lock);
+ schedule_work(&dlm->dispatched_work);
+
+ dlm_put(dlm);
+ return 0;
+}
+
+static void dlm_request_all_locks_worker(dlm_work_item *item, void *data)
+{
+ dlm_migratable_lockres *mres;
+ dlm_lock_resource *res;
+ dlm_ctxt *dlm;
+ LIST_HEAD(resources);
+ struct list_head *iter;
+ int ret;
+ u8 dead_node, reco_master;
+
+ /* do a whole s-load of asserts */
+ DLM_ASSERT(item);
+ dlm = item->dlm;
+ DLM_ASSERT(dlm);
+
+ dead_node = item->u.ral.dead_node;
+ reco_master = item->u.ral.reco_master;
+ DLM_ASSERT(dead_node == dlm->reco.dead_node);
+ DLM_ASSERT(reco_master == dlm->reco.new_master);
+
+ DLM_ASSERT(data);
+ mres = (dlm_migratable_lockres *)data;
+
+ /* lock resources should have already been moved to the
+ * dlm->reco.resources list. now move items from that list
+ * to a temp list if the dead owner matches. note that the
+ * whole cluster recovers only one node at a time, so we
+ * can safely move UNKNOWN lock resources for each recovery
+ * session. */
+ dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
+
+ /* now we can begin blasting lockreses without the dlm lock */
+ list_for_each(iter, &resources) {
+ res = list_entry (iter, dlm_lock_resource, recovering);
+ ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
+ DLM_MRES_RECOVERY);
+ if (ret < 0) {
+ dlmprintk("send_one_lockres returned %d\n",
+ ret);
+ }
+ }
+
+ /* move the resources back to the list */
+ spin_lock(&dlm->spinlock);
+ list_splice_init(&resources, &dlm->reco.resources);
+ spin_unlock(&dlm->spinlock);
+
+ ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
+ if (ret < 0)
+ dlmprintk("recovery data-done message "
+ "returned %d\n", ret);
+
+ free_page((unsigned long)data);
+}
+
+
+static int dlm_send_all_done_msg(dlm_ctxt *dlm, u8 dead_node, u8 send_to)
+{
+ int ret, tmpret;
+ dlm_reco_data_done *done_msg;
+
+ done_msg = (dlm_reco_data_done *)kmalloc(sizeof(dlm_reco_data_done),
+ GFP_KERNEL);
+ if (!done_msg)
+ return -ENOMEM;
+
+ memset(done_msg, 0, sizeof(dlm_reco_data_done));
+ done_msg->node_idx = dlm->node_num;
+ done_msg->dead_node = dead_node;
+ dlm_reco_data_done_to_net(done_msg);
+
+ ret = net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
+ sizeof(done_msg), send_to, &tmpret);
+ if (ret >= 0)
+ ret = tmpret;
+ return ret;
+}
+
+
+int dlm_reco_data_done_handler(net_msg *msg, u32 len, void *data)
+{
+ dlm_ctxt *dlm = data;
+ dlm_reco_data_done *done = (dlm_reco_data_done *)msg->buf;
+ struct list_head *iter;
+ dlm_reco_node_data *ndata = NULL;
+ int ret = -EINVAL;
+
+ if (!dlm_grab(dlm))
+ return -EINVAL;
+
+ dlm_reco_data_done_to_host(done);
+ DLM_ASSERT(dlm);
+ DLM_ASSERT(done->dead_node == dlm->reco.dead_node);
+
+ spin_lock(&dlm_reco_state_lock);
+ list_for_each(iter, &dlm->reco.node_data) {
+ ndata = list_entry (iter, dlm_reco_node_data, list);
+ if (ndata->node_num != done->node_idx)
+ continue;
+
+ switch (ndata->state) {
+ case DLM_RECO_NODE_DATA_INIT:
+ case DLM_RECO_NODE_DATA_REQUESTING:
+ case DLM_RECO_NODE_DATA_DEAD:
+ case DLM_RECO_NODE_DATA_DONE:
+ case DLM_RECO_NODE_DATA_FINALIZE_SENT:
+ dlmprintk("bad ndata state for node %u:"
+ " state=%d\n",
+ ndata->node_num,
+ ndata->state);
+ BUG();
+ break;
+ case DLM_RECO_NODE_DATA_RECEIVING:
+ case DLM_RECO_NODE_DATA_REQUESTED:
+ dlmprintk("node %u is DONE sending "
+ "recovery data!\n",
+ ndata->node_num);
+ ndata->state = DLM_RECO_NODE_DATA_DONE;
+ ret = 0;
+ break;
+ }
+ }
+ spin_unlock(&dlm_reco_state_lock);
+
+ /* wake the recovery thread, some node is done */
+ if (!ret)
+ dlm_kick_recovery_thread(dlm);
+
+ if (ret < 0)
+ dlmprintk("failed to find recovery node data for node %u\n",
+ done->node_idx);
+ dlm_put(dlm);
+ return ret;
+}
+
+static void dlm_move_reco_locks_to_list(dlm_ctxt *dlm, struct list_head *list,
+ u8 dead_node)
+{
+ dlm_lock_resource *res;
+ struct list_head *iter, *iter2;
+
+ spin_lock(&dlm->spinlock);
+ list_for_each_safe(iter, iter2, &dlm->reco.resources) {
+ res = list_entry (iter, dlm_lock_resource, recovering);
+ if (dlm_is_recovery_lock(res->lockname.name,
+ res->lockname.len))
+ continue;
+ if (res->owner == dead_node) {
+ dlmprintk("found lockres owned by dead node while "
+ "doing recovery for node %u. sending it.\n",
+ dead_node);
+ list_del(&res->recovering);
+ list_add_tail(&res->recovering, list);
+ } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
+ dlmprintk("found UNKNOWN owner while doing recovery "
+ "for node %u. sending it.\n", dead_node);
+ list_del(&res->recovering);
+ list_add_tail(&res->recovering, list);
+ }
+ }
+ spin_unlock(&dlm->spinlock);
+}
+
+static inline int dlm_num_locks_in_lockres(dlm_lock_resource *res)
+{
+ int total_locks = 0;
+ struct list_head *iter, *queue = &res->granted;
+ int i;
+
+ for (i=0; i<3; i++) {
+ list_for_each(iter, queue)
+ total_locks++;
+ queue++;
+ }
+ return total_locks;
+}
+
+
+static int dlm_send_mig_lockres_msg(dlm_ctxt *dlm,
+ dlm_migratable_lockres *mres,
+ u8 send_to,
+ dlm_lock_resource *res,
+ int total_locks)
+{
+ u64 mig_cookie = mres->mig_cookie;
+ int mres_total_locks = mres->total_locks;
+ int sz, ret = 0, status = 0;
+ u8 orig_flags = mres->flags,
+ orig_master = mres->master;
+
+ DLM_ASSERT(mres->num_locks <= DLM_MAX_MIGRATABLE_LOCKS);
+ if (!mres->num_locks)
+ return 0;
+
+ sz = sizeof(dlm_migratable_lockres) +
+ (mres->num_locks * sizeof(dlm_migratable_lock));
+
+ /* add an all-done flag if we reached the last lock */
+ orig_flags = mres->flags;
+ DLM_ASSERT(total_locks <= mres_total_locks);
+ if (total_locks == mres_total_locks)
+ mres->flags |= DLM_MRES_ALL_DONE;
+
+ /* convert to net byteorder */
+ dlm_migratable_lockres_to_net(mres);
+
+ /* send it */
+ ret = net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
+ sz, send_to, &status);
+ if (ret < 0) {
+ dlmprintk("net_send_message returned %d\n", ret);
+ } else {
+ /* might get an -ENOMEM back here */
+ ret = status;
+ if (ret < 0)
+ dlmprintk("reco data got status=%d\n", ret);
+ }
+
+ /* zero and reinit the message buffer */
+ dlm_init_migratable_lockres(mres, res->lockname.name,
+ res->lockname.len, mres_total_locks,
+ mig_cookie, orig_flags, orig_master);
+ return ret;
+}
+
+static void dlm_init_migratable_lockres(dlm_migratable_lockres *mres,
+ const char *lockname, int namelen,
+ int total_locks, u64 cookie,
+ u8 flags, u8 master)
+{
+ /* mres here is one full page */
+ memset(mres, 0, PAGE_SIZE);
+ mres->lockname_len = namelen;
+ memcpy(mres->lockname, lockname, namelen);
+ mres->num_locks = 0;
+ mres->total_locks = total_locks;
+ mres->mig_cookie = cookie;
+ mres->flags = flags;
+ mres->master = master;
+}
+
+
+/* returns 1 if this lock fills the network structure,
+ * 0 otherwise */
+static int dlm_add_lock_to_array(dlm_lock *lock,
+ dlm_migratable_lockres *mres, int queue)
+{
+ dlm_migratable_lock *ml;
+ int lock_num = mres->num_locks;
+
+ ml = &(mres->ml[lock_num]);
+ ml->cookie = lock->ml.cookie;
+ ml->type = lock->ml.type;
+ ml->convert_type = lock->ml.convert_type;
+ ml->highest_blocked = lock->ml.highest_blocked;
+ ml->list = queue;
+ if (lock->lksb) {
+ ml->flags = lock->lksb->flags;
+ if (ml->flags & DLM_LKSB_PUT_LVB) {
+ /* NOTE: because we only support NL, PR and EX locks
+ * there can be only one lock on this lockres with
+ * this flag, and it must be currently an EX.
+ * this means this node had a pending LVB change
+ * when the master died. we should send his lvb
+ * over and attach it to the lksb on the other side */
+ DLM_ASSERT(ml->type == LKM_EXMODE);
+ DLM_ASSERT(mres->lvb[0] == 0);
+ memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
+ }
+ }
+ ml->node = lock->ml.node;
+ mres->num_locks++;
+ /* we reached the max, send this network message */
+ if (mres->num_locks == DLM_MAX_MIGRATABLE_LOCKS)
+ return 1;
+ return 0;
+}
+
+
+int dlm_send_one_lockres(dlm_ctxt *dlm, dlm_lock_resource *res,
+ dlm_migratable_lockres *mres,
+ u8 send_to, u8 flags)
+{
+ struct list_head *queue, *iter;
+ int total_locks, i;
+ u64 mig_cookie = 0;
+ dlm_lock *lock;
+ int ret = 0;
+
+ DLM_ASSERT(flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION));
+
+ dlmprintk("sending to %u\n", send_to);
+
+ total_locks = dlm_num_locks_in_lockres(res);
+ if (total_locks > DLM_MAX_MIGRATABLE_LOCKS) {
+ /* rare, but possible */
+ dlmprintk("argh. lockres has %d locks. this will "
+ "require more than one network packet to "
+ "migrate\n", total_locks);
+ mig_cookie = dlm_get_next_mig_cookie();
+ }
+
+ dlm_init_migratable_lockres(mres, res->lockname.name,
+ res->lockname.len, total_locks,
+ mig_cookie, flags, res->owner);
+
+ total_locks = 0;
+ queue = &res->granted;
+ for (i=0; i<3; i++) {
+ list_for_each(iter, queue) {
+ lock = list_entry (iter, dlm_lock, list);
+
+ /* add another lock. */
+ total_locks++;
+ if (!dlm_add_lock_to_array(lock, mres, i))
+ continue;
+
+ /* this filled the lock message,
+ * we must send it immediately. */
+ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
+ res, total_locks);
+ if (ret < 0) {
+ // TODO
+ }
+ }
+ queue++;
+ }
+ /* flush any remaining locks */
+ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
+ if (ret < 0) {
+ // TODO
+ }
+ return ret;
+}
+
+
+/*
+ * this message will contain no more than one page worth of
+ * recovery data, and it will work on only one lockres.
+ * there may be many locks in this page, and we may need to wait
+ * for additional packets to complete all the locks (rare, but
+ * possible).
+ */
+/*
+ * NOTE: the allocation error cases here are scary
+ * we really cannot afford to fail an alloc in recovery
+ * do we spin? returning an error only delays the problem really
+ */
+
+int dlm_mig_lockres_handler(net_msg *msg, u32 len, void *data)
+{
+ dlm_ctxt *dlm = data;
+ dlm_migratable_lockres *mres = (dlm_migratable_lockres *)msg->buf;
+ int ret = 0;
+ u8 real_master;
+ char *buf = NULL;
+ dlm_work_item *item = NULL;
+ dlm_lock_resource *res = NULL;
+
+ if (!dlm_grab(dlm))
+ return -EINVAL;
+
+ dlm_migratable_lockres_to_host(mres);
+
+ DLM_ASSERT(mres->flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION));
+
+ real_master = mres->master;
+ if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {
+ /* cannot migrate a lockres with no master */
+ DLM_ASSERT(mres->flags & DLM_MRES_RECOVERY);
+ }
+
+ dlmprintk("%s message received from node %u\n",
+ (mres->flags & DLM_MRES_RECOVERY) ?
+ "recovery" : "migration", mres->master);
+ if (mres->flags & DLM_MRES_ALL_DONE)
+ dlmprintk0("all done flag. all lockres data received!\n");
+
+ ret = -ENOMEM;
+ buf = kmalloc(msg->data_len, GFP_KERNEL);
+ item = (dlm_work_item *)kmalloc(sizeof(dlm_work_item), GFP_KERNEL);
+ if (!buf || !item)
+ goto leave;
+
+ /* lookup the lock to see if we have a secondary queue for this
+ * already... just add the locks in and this will have its owner
+ * and RECOVERY flag changed when it completes. */
+ res = dlm_lookup_lock(dlm, mres->lockname, mres->lockname_len);
+ if (res) {
+ /* this will get a ref on res */
+ /* mark it as recovering/migrating and hash it */
+#warning add checks of existing flags here
+ spin_lock(&res->spinlock);
+ if (mres->flags & DLM_MRES_RECOVERY)
+ res->state |= DLM_LOCK_RES_RECOVERING;
+ else
+ res->state |= DLM_LOCK_RES_MIGRATING;
+ spin_unlock(&res->spinlock);
+ } else {
+ /* need to allocate, just like if it was
+ * mastered here normally */
+ res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len);
+ if (!res)
+ goto leave;
+
+ /* to match the ref that we would have gotten if
+ * dlm_lookup_lock had succeeded */
+ __dlm_lockres_get(res);
+
+ /* mark it as recovering/migrating and hash it */
+ if (mres->flags & DLM_MRES_RECOVERY)
+ res->state |= DLM_LOCK_RES_RECOVERING;
+ else
+ res->state |= DLM_LOCK_RES_MIGRATING;
+
+ spin_lock(&dlm->spinlock);
+ __dlm_insert_lock(dlm, res);
+ spin_unlock(&dlm->spinlock);
+ }
+
+ /* at this point we have allocated everything we need,
+ * and we have a hashed lockres with an extra ref and
+ * the proper res->state flags. */
+ ret = 0;
+ if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
+ /* migration cannot have an unknown master */
+ DLM_ASSERT(mres->flags & DLM_MRES_RECOVERY);
+ dlmprintk("recovery has passed me a lockres with an "
+ "unknown owner.. will need to requery: "
+ "%.*s\n", mres->lockname_len, mres->lockname);
+ } else {
+#warning is this the right time to do this?
+ spin_lock(&res->spinlock);
+ dlm_change_lockres_owner(dlm, res, dlm->node_num);
+ spin_unlock(&res->spinlock);
+ }
+
+ /* queue up work for dlm_mig_lockres_worker */
+ memset(item, 0, sizeof(dlm_work_item));
+ dlm_grab(dlm); /* get an extra ref for the work item */
+ memcpy(buf, msg->buf, msg->data_len); /* copy the whole message */
+ dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf);
+ item->u.ml.lockres = res; /* already have a ref */
+ item->u.ml.real_master = real_master;
+ spin_lock(&dlm->work_lock);
+ list_add_tail(&item->list, &dlm->work_list);
+ spin_unlock(&dlm->work_lock);
+ schedule_work(&dlm->dispatched_work);
+
+leave:
+ dlm_put(dlm);
+ if (ret < 0) {
+ if (buf)
+ kfree(buf);
+ if (item)
+ kfree(item);
+ }
+ dlmprintk("returning ret=%d\n", ret);
+ return ret;
+}
+
+
+static void dlm_mig_lockres_worker(dlm_work_item *item, void *data)
+{
+ dlm_ctxt *dlm = data;
+ dlm_migratable_lockres *mres;
+ dlm_status status = DLM_NORMAL;
+ int ret = 0;
+ dlm_lock_resource *res;
+ u8 real_master;
+
+ DLM_ASSERT(item);
+ dlm = item->dlm;
+ DLM_ASSERT(dlm);
+
+ DLM_ASSERT(data);
+ mres = (dlm_migratable_lockres *)data;
+
+ res = item->u.ml.lockres;
+ DLM_ASSERT(res);
+ real_master = item->u.ml.real_master;
+
+ if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {
+ /* this case is super-rare. only occurs if
+ * node death happens during migration. */
+again:
+ ret = dlm_lockres_master_requery(dlm, res, &real_master);
+ if (ret < 0) {
+ dlmprintk("ugh. awful place to fail. ret=%d\n",
+ ret);
+ goto again;
+ }
+ if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {
+ dlmprintk("lockres %.*s not claimed. "
+ "this node will take it.\n",
+ res->lockname.len, res->lockname.name);
+ } else {
+ dlmprintk("master need to respond to sender "
+ "that node %u still owns %.*s\n",
+ real_master, res->lockname.len,
+ res->lockname.name);
+ /* cannot touch this lockres */
+ goto leave;
+ }
+ }
+
+ ret = dlm_process_recovery_data(dlm, res, mres);
+ if (ret < 0)
+ dlmprintk("dlm_process_recovery_data returned %d\n", ret);
+ else
+ dlmprintk0("woo dlm_process_recovery_data succeeded\n");
+
+ if ((mres->flags & (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) ==
+ (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) {
+ ret = dlm_finish_migration(dlm, res, mres->master);
+ if (ret < 0)
+ dlmprintk("finish migration returned %d\n", ret);
+ }
+
+leave:
+ kfree(data);
+ dlmprintk("returning ret=%d\n", ret);
+ return ret;
+
+}
+
+
+
+static int dlm_lockres_master_requery(dlm_ctxt *dlm, dlm_lock_resource *res,
+ u8 *real_master)
+{
+ dlm_node_iter iter;
+ int nodenum;
+ int ret = 0;
+
+ *real_master = DLM_LOCK_RES_OWNER_UNKNOWN;
+
+ /* we only reach here if one of the two nodes in a
+ * migration died while the migration was in progress.
+ * at this point we need to requery the master. we
+ * know that the new_master got as far as creating
+ * an mle on at least one node, but we do not know
+ * if any nodes had actually cleared the mle and set
+ * the master to the new_master. the old master
+ * is supposed to set the owner to UNKNOWN in the
+ * event of a new_master death, so the only possible
+ * responses that we can get from nodes here are
+ * that the master is new_master, or that the master
+ * is UNKNOWN.
+ * if all nodes come back with UNKNOWN then we know
+ * the lock needs remastering here.
+ * if any node comes back with a valid master, check
+ * to see if that master is the one that we are
+ * recovering. if so, then the new_master died and
+ * we need to remaster this lock. if not, then the
+ * new_master survived and that node will respond to
+ * other nodes about the owner.
+ * if there is an owner, this node needs to dump this
+ * lockres and alert the sender that this lockres
+ * was rejected. */
+ spin_lock(&dlm->spinlock);
+ dlm_node_iter_init(dlm->domain_map, &iter);
+ spin_unlock(&dlm->spinlock);
+
+ while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+ ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
+ if (ret < 0) {
+ dlmprintk("ugh. bad place to fail. ret=%d\n", ret);
+ BUG();
+ /* TODO: need to figure a way to restart this */
+ }
+ if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
+ dlmprintk("aha! lock master is %u\n",
+ *real_master);
+ break;
+ }
+ }
+ return ret;
+}
+
+
+static int dlm_do_master_requery(dlm_ctxt *dlm, dlm_lock_resource *res,
+ u8 nodenum, u8 *real_master)
+{
+ int ret = -EINVAL;
+ dlm_master_requery req;
+ int status = DLM_LOCK_RES_OWNER_UNKNOWN;
+
+ memset(&req, 0, sizeof(req));
+ req.node_idx = dlm->node_num;
+ req.namelen = res->lockname.len;
+ strncpy(req.name, res->lockname.name, res->lockname.len);
+
+ dlm_master_requery_to_net(&req);
+ ret = net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key,
+ &req, sizeof(req), nodenum, &status);
+ if (ret < 0)
+ dlmprintk("net_send_message returned %d!\n", ret);
+ else {
+ DLM_ASSERT(status >= 0);
+ DLM_ASSERT(status <= DLM_LOCK_RES_OWNER_UNKNOWN);
+ *real_master = (u8) (status & 0xff);
+ dlmprintk("node %u responded to master requery with %u\n",
+ nodenum, *real_master);
+ ret = 0;
+ }
+leave:
+ return ret;
+}
+
+
+/* this function cannot error, so unless the sending
+ * or receiving of the message failed, the owner can
+ * be trusted */
+int dlm_master_requery_handler(net_msg *msg, u32 len, void *data)
+{
+ dlm_ctxt *dlm = data;
+ dlm_master_requery *req = (dlm_master_requery *)msg->buf;
+ dlm_lock_resource *res = NULL;
+ int master = DLM_LOCK_RES_OWNER_UNKNOWN;
+
+ if (!dlm_grab(dlm)) {
+ /* since the domain has gone away on this
+ * node, the proper response is UNKNOWN */
+ return master;
+ }
+
+ dlm_master_requery_to_host(req);
+
+ spin_lock(&dlm->spinlock);
+ res = __dlm_lookup_lock(dlm, req->name, req->namelen);
+ if (res) {
+ spin_lock(&res->spinlock);
+ master = res->owner;
+ if (master == dlm->node_num) {
+#warning need to broadcast here that i own this
+ dlmprintk0("need to broadcast here that i own this\n");
+ }
+ spin_unlock(&res->spinlock);
+ }
+ spin_unlock(&dlm->spinlock);
+
+ dlm_put(dlm);
+ return master;
+}
+
+static inline struct list_head * dlm_list_num_to_pointer(dlm_lock_resource *res,
+ int list_num)
+{
+ struct list_head *ret;
+ DLM_ASSERT(res);
+ DLM_ASSERT(list_num >= 0);
+ DLM_ASSERT(list_num <= 2);
+ ret = &(res->granted);
+ ret += list_num;
+ return ret;
+}
+/* TODO: do ast flush business
+ * TODO: do MIGRATING and RECOVERING spinning
+ */
+
+/*
+* NOTE about in-flight requests during migration:
+*
+* Before attempting the migrate, the master has marked the lockres as
+* MIGRATING and then flushed all of its pending ASTS. So any in-flight
+* requests either got queued before the MIGRATING flag got set, in which
+* case the lock data will reflect the change and a return message is on
+* the way, or the request failed to get in before MIGRATING got set. In
+* this case, the caller will be told to spin and wait for the MIGRATING
+* flag to be dropped, then recheck the master.
+* This holds true for the convert, cancel and unlock cases, and since lvb
+* updates are tied to these same messages, it applies to lvb updates as
+* well. For the lock case, there is no way a lock can be on the master
+* queue and not be on the secondary queue since the lock is always added
+* locally first. This means that the new target node will never be sent
+* a lock that he doesn't already have on the list.
+* In total, this means that the local lock is correct and should not be
+* updated to match the one sent by the master. Any messages sent back
+* from the master before the MIGRATING flag will bring the lock properly
+* up-to-date, and the change will be ordered properly for the waiter.
+* We will *not* attempt to modify the lock underneath the waiter.
+*/
+
+static int dlm_process_recovery_data(dlm_ctxt *dlm, dlm_lock_resource *res,
+ dlm_migratable_lockres *mres)
+{
+ dlm_migratable_lock *ml;
+ struct list_head *queue;
+ dlm_lock *newlock = NULL;
+ dlm_lockstatus *lksb = NULL;
+ int ret = 0;
+ int i;
+ struct list_head *iter;
+ dlm_lock *lock = NULL;
+
+ dlmprintk("running %d locks for this lockres\n", mres->num_locks);
+ for (i=0; i<mres->num_locks; i++) {
+ ml = &(mres->ml[i]);
+ DLM_ASSERT(ml->highest_blocked == LKM_IVMODE);
+ newlock = NULL;
+ lksb = NULL;
+
+ queue = dlm_list_num_to_pointer(res, ml->list);
+
+ /* if the lock is for the local node it needs to
+ * be moved to the proper location within the queue.
+ * do not allocate a new lock structure. */
+ if (ml->node == dlm->node_num) {
+ /* MIGRATION ONLY! */
+ DLM_ASSERT(mres->flags & DLM_MRES_MIGRATION);
+
+ spin_lock(&res->spinlock);
+ list_for_each(iter, queue) {
+ lock = list_entry (iter, dlm_lock, list);
+ if (lock->ml.cookie != ml->cookie)
+ lock = NULL;
+ else
+ break;
+ }
+
+ /* lock is always created locally first, and
+ * destroyed locally last. it must be on the list */
+ if (!lock) {
+ dlmprintk("could not find local lock with "
+ "cookie %llu!\n", ml->cookie);
+ BUG();
+ }
+ DLM_ASSERT(lock->ml.node == ml->node);
+
+ /* see NOTE above about why we do not update
+ * to match the master here */
+
+ /* move the lock to its proper place */
+ list_del(&lock->list);
+ list_add_tail(&lock->list, queue);
+ spin_unlock(&res->spinlock);
+
+ dlmprintk0("just reordered a local lock!\n");
+ continue;
+ }
+
+ /* lock is for another node. */
+ newlock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
+ lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
+ if (!newlock || !lksb) {
+ ret = -ENOMEM;
+ goto leave;
+ }
+
+ memset(newlock, 0, sizeof(dlm_lock));
+ memset(lksb, 0, sizeof(dlm_lockstatus));
+
+ dlm_init_lock(newlock, ml->type, ml->node, ml->cookie);
+ newlock->lksb = lksb;
+ __dlm_lockres_get(res);
+ newlock->lockres = res;
+ lksb->lockid = newlock;
+ lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
+
+ if (ml->convert_type != LKM_IVMODE) {
+ DLM_ASSERT(queue == &res->converting);
+ newlock->ml.convert_type = ml->convert_type;
+ }
+ lksb->flags |= (ml->flags &
+ (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
+ if (lksb->flags & DLM_LKSB_PUT_LVB) {
+ /* other node was trying to update
+ * lvb when node died. recreate the
+ * lksb with the updated lvb. */
+ memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
+ }
+
+ /* NOTE:
+ * wrt lock queue ordering and recovery:
+ * 1. order of locks on granted queue is
+ * meaningless.
+ * 2. order of locks on converting queue is
+ * LOST with the node death. sorry charlie.
+ * 3. order of locks on the blocked queue is
+ * also LOST.
+ * order of locks does not affect integrity, it
+ * just means that a lock request may get pushed
+ * back in line as a result of the node death.
+ * also note that for a given node the lock order
+ * for its secondary queue locks is preserved
+ * relative to each other, but clearly *not*
+ * preserved relative to locks from other nodes.
+ */
+ spin_lock(&res->spinlock);
+ list_add_tail(&newlock->list, queue);
+ spin_unlock(&res->spinlock);
+ }
+ dlmprintk0("done running all the locks\n");
+
+leave:
+ if (ret < 0) {
+ dlmprintk("error occurred while processing recovery "
+ "data! %d\n", ret);
+ if (newlock)
+ kfree(newlock);
+ if (lksb)
+ kfree(lksb);
+ }
+ dlmprintk("returning %d\n", ret);
+ return ret;
+}
+
+void dlm_move_lockres_to_recovery_list(dlm_ctxt *dlm, dlm_lock_resource *res)
+{
+ res->state |= DLM_LOCK_RES_RECOVERING;
+ if (!list_empty(&res->recovering))
+ list_del(&res->recovering);
+ list_add_tail(&res->recovering, &dlm->reco.resources);
+}
+
+/* removes all recovered locks from the recovery list.
+ * sets the res->owner to the new master.
+ * unsets the RECOVERY flag and wakes waiters. */
+static void dlm_finish_local_lockres_recovery(dlm_ctxt *dlm, u8 dead_node,
+ u8 new_master)
+{
+ struct list_head *iter, *iter2;
+ dlm_lock_resource *res;
+
+ dlmprintk0("\n");
+
+ assert_spin_locked(&dlm->spinlock);
+
+ list_for_each_safe(iter, iter2, &dlm->reco.resources) {
+ res = list_entry (iter, dlm_lock_resource, recovering);
+ if (res->owner == dead_node) {
+ list_del_init(&res->recovering);
+ spin_lock(&res->spinlock);
+ res->owner = new_master;
+ res->state &= ~DLM_LOCK_RES_RECOVERING;
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+ }
+
+ }
+}
+
+
+
+
#warning may need to change kfree to put_lock and refcounting here
static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node)
{
@@ -71,16 +1440,35 @@
dlm_lock *lock;
int i;
struct list_head *bucket;
-
+
+
+ /* purge any stale mles */
+ dlm_clean_master_list(dlm, dead_node);
+
+ /*
+ * now clean up all lock resources. there are two rules:
+ *
+ * 1) if the dead node was the master, move the lockres
+ * to the recovering list. set the RECOVERING flag.
+ * this lockres needs to be cleaned up before it can
+ * be used further.
+ *
+ * 2) if this node was the master, remove all locks from
+ * each of the lockres queues that were owned by the
+ * dead node. once recovery finishes, the dlm thread
+ * can be kicked again to see if any ASTs or BASTs
+ * need to be fired as a result.
+ */
for (i=0; i<DLM_HASH_SIZE; i++) {
bucket = &(dlm->resources[i]);
list_for_each(iter, bucket) {
res = list_entry (iter, dlm_lock_resource, list);
+ if (dlm_is_recovery_lock(res->lockname.name,
+ res->lockname.len))
+ continue;
spin_lock(&res->spinlock);
if (res->owner == dead_node) {
- res->state |= DLM_LOCK_RES_RECOVERING;
- list_del(&res->recovering);
- list_add_tail(&res->recovering, &dlm->reco.resources);
+ dlm_move_lockres_to_recovery_list(dlm, res);
} else if (res->owner == dlm->node_num) {
list_for_each_safe(iter2, tmpiter, &res->granted) {
lock = list_entry (iter2, dlm_lock, list);
@@ -199,35 +1587,212 @@
return ret;
}
-u8 dlm_pick_recovery_master(dlm_ctxt *dlm, u8 *new_dead_node)
+static void dlm_reco_ast(void *astdata)
{
- u8 master = 0;
-#if 0
+ dlmprintk0("ast for recovery lock fired!\n");
+}
+static void dlm_reco_bast(void *astdata, int blocked_type)
+{
+ dlmprintk0("bast for recovery lock fired!\n");
+}
+static void dlm_reco_unlock_ast(void *astdata, dlm_status st)
+{
+ dlmprintk0("unlockast for recovery lock fired!\n");
+}
+
+
+int dlm_pick_recovery_master(dlm_ctxt *dlm)
+{
dlm_status ret;
dlm_lockstatus lksb;
+ int status = -EINVAL;
+retry:
+ memset(&lksb, 0, sizeof(lksb));
+
ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
if (ret == DLM_NORMAL) {
- // I am master
- // send message to all nodes saying that I am beginning a recovery session for node XX,
- // then call dlmunlock???
+ /* I am master, send message to all nodes saying
+ * that I am beginning a recovery session */
+ status = dlm_send_begin_reco_message(dlm,
+ dlm->reco.dead_node);
+ /* recovery lock is a special case. ast will not get fired,
+ * so just go ahead and unlock it. */
+ ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm);
+ if (ret != DLM_NORMAL) {
+ /* this would really suck. this could only happen
+ * if there was a network error during the unlock
+ * because of node death. this means the unlock
+ * is actually "done" and the lock structure is
+ * even freed. we can continue, but only
+ * because this specific lock name is special. */
+ dlmprintk("ack! dlmunlock returned %d\n", ret);
+ }
+
+ if (status < 0) {
+ dlmprintk0("failed to send recovery message. "
+ "must retry with new node map.\n");
+ goto retry;
+ }
} else if (ret == DLM_NOTQUEUED) {
- // another node is master
- // wait on reco.new_master != NM_INVALID_SLOT_NUM
+ /* another node is master. wait on
+ * reco.new_master != NM_INVALID_SLOT_NUM */
+ status = -EEXIST;
}
- // at this point, every node in this domain should have reco.new_master and .dead_node set, even
- // if they have not discovered the dead node on their own
- //
- //
- // atomic_set(&dlm->reco.thread.woken, 0);
- // 232 status = util_wait_atomic_eq(&dlm->reco.thread.thread_wq,
- // 233 &dlm->reco.thread.woken,
- // 234 1, DLM_RECOVERY_THREAD_MS);
- //
-#endif
- return master;
+ return status;
}
+
+static int dlm_send_begin_reco_message(dlm_ctxt *dlm, u8 dead_node)
+{
+ dlm_begin_reco br;
+ int ret = 0;
+ dlm_node_iter iter;
+ int nodenum;
+ int status;
+
+ dlmprintk0("\n");
+
+ dlmprintk("dead node is %u\n", dead_node);
+
+ spin_lock(&dlm->spinlock);
+ dlm_node_iter_init(dlm->domain_map, &iter);
+ spin_unlock(&dlm->spinlock);
+
+ clear_bit(dead_node, iter.node_map);
+
+ memset(&br, 0, sizeof(br));
+ br.node_idx = dlm->node_num;
+ br.dead_node = dead_node;
+ dlm_begin_reco_to_net(&br);
+
+ while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+ ret = 0;
+ if (nodenum == dead_node) {
+ dlmprintk("not sending begin reco to dead node "
+ "%u\n", dead_node);
+ continue;
+ }
+ if (nodenum == dlm->node_num) {
+ dlmprintk0("not sending begin reco to self\n");
+ continue;
+ }
+
+ ret = -EINVAL;
+ dlmprintk("attempting to send begin reco msg to %d\n",
+ nodenum);
+ ret = net_send_message(DLM_BEGIN_RECO_MSG, dlm->key,
+ &br, sizeof(br),
+ nodenum, &status);
+ if (ret >= 0)
+ ret = status;
+ if (ret < 0) {
+ dlmprintk("error occurred in "
+ "net_send_message: %d\n", ret);
+ break;
+ }
+ }
+
+ return ret;
+}
+
+int dlm_begin_reco_handler(net_msg *msg, u32 len, void *data)
+{
+ dlm_ctxt *dlm = data;
+ dlm_begin_reco *br = (dlm_begin_reco *)msg->buf;
+
+ /* ok to return 0, domain has gone away */
+ if (!dlm_grab(dlm))
+ return 0;
+
+ dlm_begin_reco_to_host(br);
+
+ dlmprintk("node %u wants to recover node %u\n",
+ br->node_idx, br->dead_node);
+ spin_lock(&dlm->spinlock);
+#warning need to do more here
+ if (dlm->reco.new_master != NM_INVALID_SLOT_NUM) {
+ dlmprintk("new_master already set to %u! "
+ "that node had better be dead!!!\n",
+ dlm->reco.new_master);
+ }
+ if (dlm->reco.dead_node != NM_INVALID_SLOT_NUM) {
+ dlmprintk("dead_node already set to %u!\n",
+ dlm->reco.dead_node);
+ }
+ dlm->reco.new_master = br->node_idx;
+ dlm->reco.dead_node = br->dead_node;
+ spin_unlock(&dlm->spinlock);
+
+ dlm_kick_recovery_thread(dlm);
+ dlm_put(dlm);
+ return 0;
+}
+
+static int dlm_send_finalize_reco_message(dlm_ctxt *dlm)
+{
+ int ret = 0;
+ dlm_finalize_reco fr;
+ dlm_node_iter iter;
+ int nodenum;
+ int status;
+
+ dlmprintk("finishing recovery for node %u\n", dlm->reco.dead_node);
+
+ spin_lock(&dlm->spinlock);
+ dlm_node_iter_init(dlm->domain_map, &iter);
+ spin_unlock(&dlm->spinlock);
+
+ memset(&fr, 0, sizeof(fr));
+ fr.node_idx = dlm->node_num;
+ fr.dead_node = dlm->reco.dead_node;
+ dlm_finalize_reco_to_net(&fr);
+
+ while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+ ret = net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
+ &fr, sizeof(fr),
+ nodenum, &status);
+ if (ret >= 0)
+ ret = status;
+ if (ret < 0) {
+ dlmprintk("error occurred in "
+ "net_send_message: %d\n", ret);
+ break;
+ }
+ }
+
+ return ret;
+}
+
+int dlm_finalize_reco_handler(net_msg *msg, u32 len, void *data)
+{
+ dlm_ctxt *dlm = data;
+ dlm_finalize_reco *fr = (dlm_finalize_reco *)msg->buf;
+
+ /* ok to return 0, domain has gone away */
+ if (!dlm_grab(dlm))
+ return 0;
+
+ dlm_finalize_reco_to_host(fr);
+
+ dlmprintk("node %u finalizing recovery of node %u\n",
+ fr->node_idx, fr->dead_node);
+
+ spin_lock(&dlm->spinlock);
+
+ DLM_ASSERT (dlm->reco.new_master == fr->node_idx);
+ DLM_ASSERT (dlm->reco.dead_node == fr->dead_node);
+
+ dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
+
+ spin_unlock(&dlm->spinlock);
+
+ dlm_reset_recovery(dlm);
+
+ dlm_kick_recovery_thread(dlm);
+ dlm_put(dlm);
+ return 0;
+}
Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c 2005-03-21 21:17:54 UTC (rev 2025)
+++ trunk/fs/ocfs2/dlm/dlmthread.c 2005-03-21 22:23:34 UTC (rev 2026)
@@ -55,9 +55,9 @@
static int dlm_thread(void *data);
struct task_struct *dlm_thread_task;
-#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->group_index)
+#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
-static int __dlm_lockres_unused(dlm_lock_resource *res)
+int __dlm_lockres_unused(dlm_lock_resource *res)
{
if (list_empty(&res->granted) &&
list_empty(&res->converting) &&
@@ -67,12 +67,14 @@
return 0;
}
+
/* Call whenever you may have added or deleted something from one of
* the lockres queue's. This will figure out whether it belongs on the
* unused list or not and does the appropriate thing. */
-static void __dlm_lockres_calc_usage(dlm_ctxt *dlm,
- dlm_lock_resource *res)
+void __dlm_lockres_calc_usage(dlm_ctxt *dlm, dlm_lock_resource *res)
{
+ dlmprintk0("\n");
+
assert_spin_locked(&dlm->spinlock);
assert_spin_locked(&res->spinlock);
@@ -96,6 +98,7 @@
void dlm_lockres_calc_usage(dlm_ctxt *dlm,
dlm_lock_resource *res)
{
+ dlmprintk0("\n");
spin_lock(&dlm->spinlock);
spin_lock(&res->spinlock);
@@ -107,13 +110,12 @@
/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
* to do migration, but will re-acquire before exit. */
-static void dlm_purge_lockres(dlm_ctxt *dlm,
- dlm_lock_resource *lockres)
+void dlm_purge_lockres(dlm_ctxt *dlm, dlm_lock_resource *lockres)
{
int master;
+ int ret;
- /* Since we can't migrate locks yet, for now we only handle
- * non locally mastered locks. */
+ dlmprintk0("\n");
spin_lock(&lockres->spinlock);
master = lockres->owner == dlm->node_num;
spin_unlock(&lockres->spinlock);
@@ -124,9 +126,30 @@
/* Non master is the easy case -- no migration required, just
* quit. */
if (!master)
- __dlm_unhash_lock(dlm, lockres);
+ goto finish;
- /* TODO: Wheee! Migrate lockres here! */
+ /* Wheee! Migrate lockres here! */
+ spin_unlock(&dlm->spinlock);
+again:
+
+ ret = dlm_migrate_lockres(dlm, lockres, NM_MAX_NODES);
+ if (ret == -ENOTEMPTY) {
+ dlmprintk0("lockres still has local locks! for "
+ "now, this will BUG.\n");
+ BUG();
+ } else if (ret < 0) {
+ dlmprintk0("migrate failed, trying it again\n");
+ goto again;
+ }
+
+ spin_lock(&dlm->spinlock);
+
+finish:
+ if (!list_empty(&lockres->purge)) {
+ list_del_init(&lockres->purge);
+ dlm->purge_count--;
+ }
+ __dlm_unhash_lock(dlm, lockres);
}
static void dlm_run_purge_list(dlm_ctxt *dlm)
@@ -172,7 +195,9 @@
/* This may drop and reacquire the dlm spinlock if it
* has to do migration. */
+ dlmprintk0("calling dlm_purge_lockres!\n");
dlm_purge_lockres(dlm, lockres);
+ dlmprintk0("DONE calling dlm_purge_lockres!\n");
}
spin_unlock(&dlm->spinlock);
@@ -185,8 +210,13 @@
struct list_head *head;
int can_grant = 1;
- dlmprintk("shuffle res %.*s\n", res->lockname.len, res->lockname.name);
+ DLM_ASSERT(res);
+ // dlmprintk("res->lockname.len=%d\n", res->lockname.len);
+ // dlmprintk("res->lockname.name=%p\n", res->lockname.name);
+ // dlmprintk("shuffle res %.*s\n", res->lockname.len,
+ // res->lockname.name);
+
spin_lock(&res->spinlock);
converting:
@@ -319,6 +349,7 @@
/* must have NO locks when calling this */
void dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res)
{
+ dlmprintk("dlm=%p, res=%p\n", dlm, res);
if (res) {
spin_lock(&dlm->spinlock);
spin_lock(&res->spinlock);
@@ -393,6 +424,7 @@
lock = list_entry(iter, dlm_lock, ast_list);
res = lock->lockres;
dlmprintk0("delivering an ast for this lockres\n");
+ DLM_ASSERT(lock->ast_pending);
list_del_init(&lock->ast_list);
if (lock->ml.node != dlm->node_num) {
@@ -400,12 +432,18 @@
dlmprintk("eek\n");
} else
dlm_do_local_ast(dlm, res, lock);
+
+ spin_lock(&lock->spinlock);
+ lock->ast_pending = 0;
+ spin_unlock(&lock->spinlock);
}
list_for_each_safe(iter, iter2, &bast_tmp) {
lock = list_entry(iter, dlm_lock, bast_list);
res = lock->lockres;
+ DLM_ASSERT(lock->bast_pending);
+
/* get the highest blocked lock, and reset */
spin_lock(&lock->spinlock);
DLM_ASSERT(lock->ml.highest_blocked > LKM_IVMODE);
@@ -422,6 +460,10 @@
dlmprintk0("eeek\n");
} else
dlm_do_local_bast(dlm, res, lock, hi);
+
+ spin_lock(&lock->spinlock);
+ lock->bast_pending = 0;
+ spin_unlock(&lock->spinlock);
}
}
@@ -445,12 +487,15 @@
list_for_each_safe(iter, tmpiter, &dlm->dirty_list) {
res = list_entry(iter, dlm_lock_resource, dirty);
+ DLM_ASSERT(res);
spin_lock(&res->spinlock);
list_del_init(&res->dirty);
res->state &= ~DLM_LOCK_RES_DIRTY;
BUG_ON(res->owner != dlm->node_num);
spin_unlock(&res->spinlock);
+ dlmprintk("calling dlm_shuffle_lists with "
+ "dlm=%p, res=%p\n", dlm, res);
dlm_shuffle_lists(dlm, res);
spin_lock(&res->spinlock);
Modified: trunk/fs/ocfs2/dlm/dlmunlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmunlock.c 2005-03-21 21:17:54 UTC (rev 2025)
+++ trunk/fs/ocfs2/dlm/dlmunlock.c 2005-03-21 22:23:34 UTC (rev 2026)
@@ -301,6 +301,17 @@
queue=&res->granted;
found = 0;
spin_lock(&res->spinlock);
+ if (res->state & DLM_LOCK_RES_RECOVERING) {
+ dlmprintk0("returning DLM_RECOVERING\n");
+ status = DLM_RECOVERING;
+ goto leave;
+ }
+ if (res->state & DLM_LOCK_RES_MIGRATING) {
+ dlmprintk0("returning DLM_MIGRATING\n");
+ status = DLM_MIGRATING;
+ goto leave;
+ }
+
for (i=0; i<3; i++) {
list_for_each(iter, queue) {
lock = list_entry(iter, dlm_lock, list);
@@ -345,6 +356,7 @@
status = lksb->status;
}
+leave:
if (res)
dlm_lockres_put(dlm, res);
More information about the Ocfs2-commits
mailing list