[Ocfs2-commits] khackel commits r2384 - trunk/fs/ocfs2/dlm
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Fri Jun 10 01:25:09 CDT 2005
Author: khackel
Signed-off-by: mfasheh
Date: 2005-06-10 01:25:08 -0500 (Fri, 10 Jun 2005)
New Revision: 2384
Modified:
trunk/fs/ocfs2/dlm/dlmcommon.h
trunk/fs/ocfs2/dlm/dlmmaster.c
trunk/fs/ocfs2/dlm/dlmrecovery.c
trunk/fs/ocfs2/dlm/dlmthread.c
Log:
* fixes bug 379: fully implements lock mastery recovery after another
node dies while the local node is attempting to master a lock
* adds a nolock version of dlm_kick_thread to be called at the end
of recovery while still holding dlm and lockres locks
* de-complicate some of the error paths in lock mastery to move forward
in some safe cases that did not require special handling
* remove the error field in the master list entry
* added a slightly more fine grained checking of node up/down events
during lock mastery, to avoid many costly error paths
Signed-off-by: mfasheh
Modified: trunk/fs/ocfs2/dlm/dlmcommon.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmcommon.h 2005-06-10 02:28:10 UTC (rev 2383)
+++ trunk/fs/ocfs2/dlm/dlmcommon.h 2005-06-10 06:25:08 UTC (rev 2384)
@@ -312,7 +312,6 @@
unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
u8 master;
u8 new_master;
- u8 error;
enum dlm_mle_type type; // BLOCK or MASTER
struct o2hb_callback_func mle_hb_up;
struct o2hb_callback_func mle_hb_down;
@@ -890,6 +889,8 @@
u8 dlm_nm_this_node(dlm_ctxt *dlm);
void dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res);
+void __dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res);
+
int dlm_nm_init(dlm_ctxt *dlm);
int dlm_heartbeat_init(dlm_ctxt *dlm);
Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c 2005-06-10 02:28:10 UTC (rev 2383)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c 2005-06-10 06:25:08 UTC (rev 2384)
@@ -38,6 +38,7 @@
#include <linux/socket.h>
#include <linux/inet.h>
#include <linux/spinlock.h>
+#include <linux/delay.h>
#include "cluster/heartbeat.h"
@@ -59,13 +60,13 @@
struct list_head *iter;
int i = 0, refs;
char *type;
- char err, attached;
+ char attached;
u8 master;
unsigned int namelen;
const char *name;
mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
- mlog(ML_NOTICE, " ####: type refs owner events? err? lockname\n");
+ mlog(ML_NOTICE, " ####: type refs owner events? lockname\n");
spin_lock(&dlm->master_lock);
list_for_each(iter, &dlm->master_list) {
@@ -74,7 +75,6 @@
k = &mle->mle_refs;
type = (mle->type == DLM_MLE_BLOCK ? "BLK" : "MAS");
- err = (mle->error ? 'Y' : 'N');
refs = atomic_read(&k->refcount);
master = mle->master;
attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
@@ -87,8 +87,8 @@
name = mle->u.res->lockname.name;
}
- mlog(ML_NOTICE, " #%3d: %3s %3d %3u %c %c (%d)%.*s\n",
- i, type, refs, master, attached, err,
+ mlog(ML_NOTICE, " #%3d: %3s %3d %3u %c (%d)%.*s\n",
+ i, type, refs, master, attached,
namelen, namelen, name);
}
@@ -137,7 +137,7 @@
dlm_master_list_entry *mle,
int blocked);
static int dlm_restart_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res,
- dlm_master_list_entry *mle);
+ dlm_master_list_entry *mle, int blocked);
static int dlm_add_migration_mle(dlm_ctxt *dlm,
dlm_lock_resource *res,
dlm_master_list_entry *mle,
@@ -150,7 +150,28 @@
static void dlm_mark_lockres_migrating(dlm_ctxt *dlm, dlm_lock_resource *res);
+static int dlm_is_host_down(int errno)
+{
+ switch (errno) {
+ case -EBADF:
+ case -ECONNREFUSED:
+ case -ENOTCONN:
+ case -ECONNRESET:
+ case -EPIPE:
+ case -EHOSTDOWN:
+ case -EHOSTUNREACH:
+ case -ETIMEDOUT:
+ case -ECONNABORTED:
+ case -ENETDOWN:
+ case -ENETUNREACH:
+ case -ENETRESET:
+ case -ESHUTDOWN:
+ return 1;
+ }
+ return 0;
+}
+
/*
* MASTER LIST FUNCTIONS
*/
@@ -249,7 +270,6 @@
memset(mle->response_map, 0, sizeof(mle->response_map));
mle->master = O2NM_MAX_NODES;
mle->new_master = O2NM_MAX_NODES;
- mle->error = 0;
if (mle->type == DLM_MLE_MASTER) {
DLM_ASSERT(res);
@@ -661,6 +681,7 @@
goto wait;
}
+redo_request:
ret = -EINVAL;
dlm_node_iter_init(mle->vote_map, &iter);
while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
@@ -675,18 +696,26 @@
wait:
/* keep going until the response map includes all nodes */
- ret = -EAGAIN;
- while (ret == -EAGAIN) {
- ret = dlm_wait_for_lock_mastery(dlm, res, mle, blocked);
- if (ret == -EINVAL) {
- mlog(ML_ERROR, "some error occurred. restarting "
- "lock mastery!\n");
- /* TODO: figure out how restart this */
+ ret = dlm_wait_for_lock_mastery(dlm, res, mle, blocked);
+ if (ret < 0) {
+ if (blocked) {
+ if (mle->type == DLM_MLE_MASTER) {
+ mlog(0, "mle changed to a MASTER due "
+ "to node death. restart.\n");
+ goto redo_request;
+ }
+ /* should never happen for a BLOCK */
+ mlog(ML_ERROR, "mle type=%d\n", mle->type);
BUG();
}
+ mlog(0, "node map changed, redo the "
+ "master request now\n");
+ goto redo_request;
}
- if (ret == 0)
- mlog(0, "lockres mastered by %u\n", res->owner);
+
+ mlog(0, "lockres mastered by %u\n", res->owner);
+ /* make sure we never continue without this */
+ DLM_ASSERT(res->owner != O2NM_MAX_NODES);
/* master is known, detach if not already detached */
dlm_mle_detach_hb_events(dlm, mle);
@@ -707,10 +736,14 @@
int blocked)
{
u8 m;
- int ret = 0, tmpret, bit;
- int map_changed = 0, voting_done = 0;
- int assert = 0, sleep;
+ int ret, bit;
+ int map_changed, voting_done;
+ int assert, sleep;
+recheck:
+ ret = 0;
+ assert = 0;
+
/* check if another node has already become the owner */
spin_lock(&res->spinlock);
if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
@@ -727,20 +760,16 @@
sizeof(mle->vote_map)) == 0);
/* restart if we hit any errors */
- if (mle->error || map_changed) {
- if (mle->error) {
- mlog(0, "another node got error %d, restarting\n",
- mle->error);
- mle->error = 0;
+ if (map_changed) {
+ mlog(0, "node map changed, restarting\n");
+ ret = dlm_restart_lock_mastery(dlm, res, mle, blocked);
+ spin_unlock(&mle->spinlock);
+ if (ret < 0) {
+ mlog_errno(ret);
+ goto leave;
}
- if (map_changed)
- mlog(0, "node map changed, restarting\n");
- spin_unlock(&mle->spinlock);
- tmpret = dlm_restart_lock_mastery(dlm, res, mle);
- if (tmpret < 0)
- mlog_errno(tmpret);
- ret = -EINVAL;
- goto leave;
+ mlog(0, "restart lock mastery succeeded, rechecking now\n");
+ goto recheck;
}
if (m != O2NM_MAX_NODES) {
@@ -770,19 +799,13 @@
/* sleep if we haven't finished voting yet */
if (sleep) {
atomic_set(&mle->woken, 0);
- ret = wait_event_interruptible_timeout(mle->wq,
- (atomic_read(&mle->woken) == 1),
- msecs_to_jiffies(5000));
-
- if (ret >= 0 && !atomic_read(&mle->woken)) {
- mlog(0, "timed out during lock mastery: "
- "vote_map=%0lx, response_map=%0lx\n",
- mle->vote_map[0], mle->response_map[0]);
+ wait_event(mle->wq, (atomic_read(&mle->woken) == 1));
+ if (res->owner == O2NM_MAX_NODES) {
+ mlog(0, "waiting again\n");
+ goto recheck;
}
- /* unless we are aborting, need to recheck and
- * maybe sleep again */
- if (ret != -ERESTARTSYS)
- ret = -EAGAIN;
+ mlog(0, "done waiting, master is %u\n", res->owner);
+ ret = 0;
goto leave;
}
@@ -792,14 +815,16 @@
ret = dlm_do_assert_master(dlm, res->lockname.name,
res->lockname.len, mle->vote_map, 0);
if (ret) {
+ /* This is a failure in the network path,
+ * not in the response to the assert_master
+ * (any nonzero response is a BUG on this node).
+ * Most likely a socket just got disconnected
+ * due to node death. */
mlog_errno(ret);
-
- tmpret = dlm_restart_lock_mastery(dlm, res, mle);
- if (tmpret < 0)
- mlog_errno(tmpret);
- ret = -EINVAL;
- goto leave;
}
+ /* no longer need to restart lock mastery.
+ * all living nodes have been contacted. */
+ ret = 0;
}
/* set the lockres owner */
@@ -811,24 +836,186 @@
return ret;
}
+struct dlm_bitmap_diff_iter
+{
+ int curnode;
+ unsigned long *orig_bm;
+ unsigned long *cur_bm;
+ unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
+};
+
+enum dlm_node_state_change
+{
+ NODE_DOWN = -1,
+ NODE_NO_CHANGE = 0,
+ NODE_UP
+};
+
+static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
+ unsigned long *orig_bm,
+ unsigned long *cur_bm)
+{
+ unsigned long p1, p2;
+ int i;
+
+ iter->curnode = -1;
+ iter->orig_bm = orig_bm;
+ iter->cur_bm = cur_bm;
+
+ for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
+ p1 = *(iter->orig_bm + i);
+ p2 = *(iter->cur_bm + i);
+ iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
+ }
+}
+
+static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
+ enum dlm_node_state_change *state)
+{
+ int bit;
+
+ if (iter->curnode >= O2NM_MAX_NODES)
+ return -ENOENT;
+
+ bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
+ iter->curnode+1);
+ if (bit >= O2NM_MAX_NODES) {
+ iter->curnode = O2NM_MAX_NODES;
+ return -ENOENT;
+ }
+
+ /* if it was there in the original then this node died */
+ if (test_bit(bit, iter->orig_bm))
+ *state = NODE_DOWN;
+ else
+ *state = NODE_UP;
+
+ iter->curnode = bit;
+ return bit;
+}
+
+
static int dlm_restart_lock_mastery(dlm_ctxt *dlm, dlm_lock_resource *res,
- dlm_master_list_entry *mle)
+ dlm_master_list_entry *mle, int blocked)
{
- mlog(0, "something happened such that the whole master process needs "
- "to be restarted!\n");
- return 0;
+ struct dlm_bitmap_diff_iter bdi;
+ enum dlm_node_state_change sc;
+ int node;
+ int ret = 0;
+
+ mlog(0, "something happened such that the "
+ "master process may need to be restarted!\n");
+
+ assert_spin_locked(&mle->spinlock);
+
+ dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
+ node = dlm_bitmap_diff_iter_next(&bdi, &sc);
+ while (node >= 0) {
+ if (sc == NODE_UP) {
+ /* a node came up. easy. might not even need
+ * to talk to it if its node number is higher
+ * or if we are already blocked. */
+ mlog(0, "node up! %d\n", node);
+ if (blocked)
+ goto next;
+
+ if (node > dlm->node_num) {
+ mlog(0, "node > this node. skipping.\n");
+ goto next;
+ }
+
+ /* redo the master request, but only for the new node */
+ mlog(0, "sending request to new node\n");
+ clear_bit(node, mle->response_map);
+ set_bit(node, mle->vote_map);
+ } else {
+ mlog(ML_ERROR, "node down! %d\n", node);
+
+ /* if the node wasn't involved in mastery skip it */
+ if (!test_bit(node, mle->maybe_map))
+ goto next;
+
+ /* if we're already blocked on lock mastery, and the
+ * dead node wasn't the expected master, or there is
+ * another node in the maybe_map, keep waiting */
+ if (blocked) {
+ int lowest = find_next_bit(mle->maybe_map,
+ O2NM_MAX_NODES, 0);
+
+ /* act like it was never there */
+ clear_bit(node, mle->maybe_map);
+ clear_bit(node, mle->vote_map);
+ clear_bit(node, mle->response_map);
+
+ if (node != lowest)
+ goto next;
+
+ mlog(ML_ERROR, "expected master %u died while "
+ "this node was blocked waiting on it!\n",
+ node);
+ lowest = find_next_bit(mle->maybe_map,
+ O2NM_MAX_NODES,
+ lowest+1);
+ if (lowest < O2NM_MAX_NODES) {
+ mlog(0, "still blocked. waiting "
+ "on %u now\n", lowest);
+ goto next;
+ }
+
+ /* mle is an MLE_BLOCK, but there is now
+ * nothing left to block on. we need to return
+ * all the way back out and try again with
+ * an MLE_MASTER. dlm_do_local_recovery_cleanup
+ * has already run, so the mle refcount is ok */
+ mlog(0, "no longer blocking. we can "
+ "try to master this here\n");
+ mle->type = DLM_MLE_MASTER;
+ memset(mle->maybe_map, 0,
+ sizeof(mle->maybe_map));
+ memset(mle->response_map, 0,
+ sizeof(mle->maybe_map));
+ memcpy(mle->vote_map, mle->node_map,
+ sizeof(mle->node_map));
+ mle->u.res = res;
+
+ ret = -EAGAIN;
+ goto next;
+ }
+
+ if (node > dlm->node_num)
+ goto next;
+
+ mlog(0, "dead node in map!\n");
+ /* yuck. go back and re-contact all nodes
+ * in the vote_map, removing this node. */
+ clear_bit(node, mle->maybe_map);
+ clear_bit(node, mle->vote_map);
+ memset(mle->response_map, 0,
+ sizeof(mle->response_map));
+ }
+ ret = -EAGAIN;
+next:
+ node = dlm_bitmap_diff_iter_next(&bdi, &sc);
+ }
+ return ret;
}
/*
* DLM_MASTER_REQUEST_MSG
+ *
+ * returns: 0 on success,
+ * -errno on a network error
+ *
+ * on error, the caller should assume the target node is "dead"
+ *
*/
static int dlm_do_master_request(dlm_master_list_entry *mle, int to)
{
dlm_ctxt *dlm = mle->dlm;
dlm_master_request request;
- int ret, response=0;
+ int ret, response=0, resend;
memset(&request, 0, sizeof(request));
request.node_idx = dlm->node_num;
@@ -845,43 +1032,68 @@
}
dlm_master_request_to_net(&request);
+again:
ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
sizeof(request), to, &response);
if (ret < 0) {
- mlog_errno(ret);
+ if (ret == -ESRCH) {
+ /* should never happen */
+ mlog(ML_ERROR, "TCP stack not ready!\n");
+ BUG();
+ } else if (ret == -EINVAL) {
+ mlog(ML_ERROR, "bad args passed to o2net!\n");
+ BUG();
+ } else if (ret == -ENOMEM) {
+ mlog(ML_ERROR, "out of memory while trying to send "
+ "network message! retrying\n");
+ /* this is totally crude */
+ msleep(50);
+ goto again;
+ } else if (!dlm_is_host_down(ret)) {
+ /* not a network error. bad. */
+ mlog_errno(ret);
+ mlog(ML_ERROR, "unhandled error!");
+ BUG();
+ }
+ /* all other errors should be network errors,
+ * and likely indicate node death */
+ mlog(ML_ERROR, "link to %d went down!\n", to);
goto out;
}
+ ret = 0;
+ resend = 0;
spin_lock(&mle->spinlock);
switch (response) {
case DLM_MASTER_RESP_YES:
set_bit(to, mle->response_map);
- // mlog(0, "woot! node %u is the "
- // "master!\n", to);
+ mlog(0, "node %u is the master, response=YES\n", to);
mle->master = to;
break;
case DLM_MASTER_RESP_NO:
- // mlog(0, "node %u is not the "
- // "master, not in-progress\n", to);
+ mlog(0, "node %u not master, response=NO\n", to);
set_bit(to, mle->response_map);
break;
case DLM_MASTER_RESP_MAYBE:
- // mlog(0, "node %u is not the "
- // "master, but IS in-progress\n", to);
+ mlog(0, "node %u not master, response=MAYBE\n", to);
set_bit(to, mle->response_map);
set_bit(to, mle->maybe_map);
break;
case DLM_MASTER_RESP_ERROR:
- mlog(0, "node %u hit an -ENOMEM! try everything "
- "again\n", to);
- mle->error = 1;
+ mlog(0, "node %u hit an error, resending\n", to);
+ resend = 1;
+ response = 0;
break;
default:
- mlog(0, "bad response! %u\n", response);
- ret = -EINVAL;
- break;
+ mlog(ML_ERROR, "bad response! %u\n", response);
+ BUG();
}
spin_unlock(&mle->spinlock);
+ if (resend) {
+ /* this is also totally crude */
+ msleep(50);
+ goto again;
+ }
out:
return ret;
@@ -1112,9 +1324,6 @@
int ret = 0;
DLM_ASSERT(namelen <= O2NM_MAX_NAME_LEN);
- DLM_ASSERT(dlm);
- DLM_ASSERT(lockname);
- DLM_ASSERT(nodemap);
/* note that if this nodemap is empty, it returns 0 */
dlm_node_iter_init(nodemap, &iter);
@@ -1132,10 +1341,15 @@
tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
&assert, sizeof(assert), to, &r);
if (tmpret < 0) {
- // TODO
- // mlog(0, "assert_master returned %d!\n", tmpret);
- ret = tmpret;
- break;
+ mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);
+ if (!dlm_is_host_down(tmpret)) {
+ mlog(ML_ERROR, "unhandled error!\n");
+ BUG();
+ }
+ /* a node died. finish out the rest of the nodes. */
+ mlog(ML_ERROR, "link to %d went down!\n", to);
+ /* any nonzero status return will do */
+ ret = tmpret;
} else if (r < 0) {
/* ok, something horribly messed. kill thyself. */
mlog(ML_ERROR,"during assert master of %.*s to %u, "
@@ -1351,36 +1565,35 @@
request_from = item->u.am.request_from;
flags = item->u.am.flags;
- do {
- spin_lock(&dlm->spinlock);
- memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
- spin_unlock(&dlm->spinlock);
+ spin_lock(&dlm->spinlock);
+ memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
+ spin_unlock(&dlm->spinlock);
- clear_bit(dlm->node_num, nodemap);
- if (ignore_higher) {
- /* if is this just to clear up mles for nodes below
- * this node, do not send the message to the original
- * caller or any node number higher than this */
- clear_bit(request_from, nodemap);
- bit = dlm->node_num;
- while (1) {
- bit = find_next_bit(nodemap, O2NM_MAX_NODES,
- bit+1);
- if (bit >= O2NM_MAX_NODES)
- break;
- clear_bit(bit, nodemap);
- }
+ clear_bit(dlm->node_num, nodemap);
+ if (ignore_higher) {
+ /* if is this just to clear up mles for nodes below
+ * this node, do not send the message to the original
+ * caller or any node number higher than this */
+ clear_bit(request_from, nodemap);
+ bit = dlm->node_num;
+ while (1) {
+ bit = find_next_bit(nodemap, O2NM_MAX_NODES,
+ bit+1);
+ if (bit >= O2NM_MAX_NODES)
+ break;
+ clear_bit(bit, nodemap);
}
+ }
- ret = dlm_do_assert_master(dlm, res->lockname.name,
- res->lockname.len,
- nodemap, flags);
- if (ret < 0) {
- /* no choice but to try again.
- * maybe a node died. */
- mlog_errno(ret);
- }
- } while (ret < 0);
+ /* this call now finishes out the nodemap
+ * even if one or more nodes die */
+ ret = dlm_do_assert_master(dlm, res->lockname.name,
+ res->lockname.len,
+ nodemap, flags);
+ if (ret < 0) {
+ /* no need to restart, we are done */
+ mlog_errno(ret);
+ }
dlm_lockres_put(res);
@@ -1944,10 +2157,10 @@
void dlm_clean_master_list(dlm_ctxt *dlm, u8 dead_node)
{
struct list_head *iter, *iter2;
- int bit;
dlm_master_list_entry *mle;
dlm_lock_resource *res;
+ mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
top:
assert_spin_locked(&dlm->spinlock);
@@ -1966,15 +2179,38 @@
if (mle->type == DLM_MLE_MASTER)
continue;
- bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
/* BLOCK mles are initiated by other nodes.
* need to clean up if the dead node would have
* been the master. */
- if (mle->type == DLM_MLE_BLOCK &&
- bit != dead_node)
+ if (mle->type == DLM_MLE_BLOCK) {
+ int bit;
+
+ spin_lock(&mle->spinlock);
+ bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
+ if (bit != dead_node) {
+ mlog(0, "mle found, but dead node %u would "
+ "not have been master\n", dead_node);
+ spin_unlock(&mle->spinlock);
+ } else {
+ /* must drop the refcount by one since the
+ * assert_master will never arrive. this
+ * may result in the mle being unlinked and
+ * freed, but there may still be a process
+ * waiting in the dlmlock path which is fine. */
+ mlog(ML_ERROR, "node %u was expected master\n",
+ dead_node);
+ atomic_set(&mle->woken, 1);
+ spin_unlock(&mle->spinlock);
+ wake_up(&mle->wq);
+ /* final put will take care of list removal */
+ __dlm_put_mle(mle);
+ }
continue;
+ }
+ /* everything else is a MIGRATION mle */
+
/* the rule for MIGRATION mles is that the master
* becomes UNKNOWN if *either* the original or
* the new master dies. all UNKNOWN lockreses
@@ -1984,51 +2220,51 @@
* this lockres, or if he needs to take over
* mastery. either way, this node should expect
* another message to resolve this. */
- if (mle->type == DLM_MLE_MIGRATION &&
- mle->master != dead_node &&
+ if (mle->master != dead_node &&
mle->new_master != dead_node)
continue;
/* if we have reached this point, this mle needs to
* be removed from the list and freed. */
- /* unlinking list_head while in list_for_each_safe */
+ /* remove from the list early. NOTE: unlinking
+ * list_head while in list_for_each_safe */
+ spin_lock(&mle->spinlock);
list_del_init(&mle->list);
atomic_set(&mle->woken, 1);
+ spin_unlock(&mle->spinlock);
wake_up(&mle->wq);
- if (mle->type == DLM_MLE_MIGRATION) {
- mlog(0, "node %u died during migration from "
- "%u to %u!\n", dead_node,
- mle->master, mle->new_master);
- /* if there is a lockres associated with this
- * mle, find it and set its owner to UNKNOWN */
- res = __dlm_lookup_lockres(dlm, mle->u.name.name,
- mle->u.name.len);
- if (res) {
- /* unfortunately if we hit this rare case, our
- * lock ordering is messed. we need to drop
- * the master lock so that we can take the
- * lockres lock, meaning that we will have to
- * restart from the head of list. */
- spin_unlock(&dlm->master_lock);
-
- /* move lockres onto recovery list */
- spin_lock(&res->spinlock);
- dlm_set_lockres_owner(dlm, res,
- DLM_LOCK_RES_OWNER_UNKNOWN);
- dlm_move_lockres_to_recovery_list(dlm, res);
- spin_unlock(&res->spinlock);
- dlm_lockres_put(res);
+ mlog(0, "node %u died during migration from "
+ "%u to %u!\n", dead_node,
+ mle->master, mle->new_master);
+ /* if there is a lockres associated with this
+ * mle, find it and set its owner to UNKNOWN */
+ res = __dlm_lookup_lockres(dlm, mle->u.name.name,
+ mle->u.name.len);
+ if (res) {
+ /* unfortunately if we hit this rare case, our
+ * lock ordering is messed. we need to drop
+ * the master lock so that we can take the
+ * lockres lock, meaning that we will have to
+ * restart from the head of list. */
+ spin_unlock(&dlm->master_lock);
+
+ /* move lockres onto recovery list */
+ spin_lock(&res->spinlock);
+ dlm_set_lockres_owner(dlm, res,
+ DLM_LOCK_RES_OWNER_UNKNOWN);
+ dlm_move_lockres_to_recovery_list(dlm, res);
+ spin_unlock(&res->spinlock);
+ dlm_lockres_put(res);
- /* dump the mle */
- spin_lock(&dlm->master_lock);
- __dlm_put_mle(mle);
- spin_unlock(&dlm->master_lock);
+ /* dump the mle */
+ spin_lock(&dlm->master_lock);
+ __dlm_put_mle(mle);
+ spin_unlock(&dlm->master_lock);
- /* restart */
- goto top;
- }
+ /* restart */
+ goto top;
}
/* this may be the last reference */
@@ -2057,21 +2293,16 @@
goto leave;
}
-retry:
mlog(0, "doing assert master to all except the original node\n");
+ /* this call now finishes out the nodemap
+ * even if one or more nodes die */
ret = dlm_do_assert_master(dlm, res->lockname.name,
res->lockname.len, iter.node_map,
DLM_ASSERT_MASTER_FINISH_MIGRATION);
if (ret < 0) {
+ /* no longer need to retry. all living nodes contacted. */
mlog_errno(ret);
-
- /* maybe we can be saved by updating the domain map */
- spin_lock(&dlm->spinlock);
- dlm_node_iter_init(dlm->domain_map, &iter);
- clear_bit(old_master, iter.node_map);
- clear_bit(dlm->node_num, iter.node_map);
- spin_unlock(&dlm->spinlock);
- goto retry;
+ ret = 0;
}
memset(iter.node_map, 0, sizeof(iter.node_map));
@@ -2085,6 +2316,7 @@
"with %d.\n", ret);
/* the only nonzero status here would be because of
* a dead original node. we're done. */
+ ret = 0;
}
/* all done, set the owner, clear the flag */
Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-06-10 02:28:10 UTC (rev 2383)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-06-10 06:25:08 UTC (rev 2384)
@@ -1456,6 +1456,7 @@
spin_lock(&res->spinlock);
res->owner = new_master;
res->state &= ~DLM_LOCK_RES_RECOVERING;
+ __dlm_kick_thread(dlm, res);
spin_unlock(&res->spinlock);
wake_up(&res->wq);
}
Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c 2005-06-10 02:28:10 UTC (rev 2383)
+++ trunk/fs/ocfs2/dlm/dlmthread.c 2005-06-10 06:25:08 UTC (rev 2384)
@@ -414,28 +414,39 @@
return;
}
-/* must have NO locks when calling this */
+/* must have NO locks when calling this with res !=NULL * */
void dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res)
{
mlog_entry("dlm=%p, res=%p\n", dlm, res);
if (res) {
spin_lock(&dlm->spinlock);
spin_lock(&res->spinlock);
+ __dlm_kick_thread(dlm, res);
+ spin_unlock(&res->spinlock);
+ spin_unlock(&dlm->spinlock);
+ } else
+ wake_up(&dlm->dlm_thread_wq);
+}
+void __dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res)
+{
+ mlog_entry("dlm=%p, res=%p\n", dlm, res);
+ if (res) {
+ assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&res->spinlock);
+
/* don't shuffle secondary queues */
if ((res->owner == dlm->node_num) &&
!(res->state & DLM_LOCK_RES_DIRTY)) {
list_add_tail(&res->dirty, &dlm->dirty_list);
res->state |= DLM_LOCK_RES_DIRTY;
}
-
- spin_unlock(&res->spinlock);
- spin_unlock(&dlm->spinlock);
}
wake_up(&dlm->dlm_thread_wq);
}
+
/* Launch the NM thread for the mounted volume */
int dlm_launch_thread(dlm_ctxt *dlm)
{
More information about the Ocfs2-commits
mailing list