[Ocfs2-commits] jlbec commits r2790 - in branches/ocfs2-1.2-cert: . patches
svn-commits@oss.oracle.com
svn-commits at oss.oracle.com
Mon Mar 27 16:29:14 CST 2006
Author: jlbec
Signed-off-by: mfasheh
Date: 2006-03-27 16:29:12 -0600 (Mon, 27 Mar 2006)
New Revision: 2790
Added:
branches/ocfs2-1.2-cert/patches/debug-mastery.patch
branches/ocfs2-1.2-cert/patches/dlm-eloop.patch
branches/ocfs2-1.2-cert/patches/dlm-kurt-mar20-2.patch
branches/ocfs2-1.2-cert/patches/em-silence-eexist.patch
branches/ocfs2-1.2-cert/patches/extra-em-trace3.patch
branches/ocfs2-1.2-cert/patches/fix-purge-lockres.patch
branches/ocfs2-1.2-cert/patches/hold-recovery-ref.patch
branches/ocfs2-1.2-cert/patches/leave-other-dead-nodes-on-recovery-list.patch
branches/ocfs2-1.2-cert/patches/lockres-release-info.patch
branches/ocfs2-1.2-cert/patches/lvb-recovery-fix.patch
branches/ocfs2-1.2-cert/patches/mar20-full-3.patch
branches/ocfs2-1.2-cert/patches/mar24-create-lock-handler.patch
branches/ocfs2-1.2-cert/patches/mastery-restart-recovery.patch
branches/ocfs2-1.2-cert/patches/ocfs2-disable_lvbs1.patch
branches/ocfs2-1.2-cert/patches/ocfs2-extend_file_more_info_on_oops.patch
branches/ocfs2-1.2-cert/patches/ocfs2-journal_start_stop_msgs.patch
branches/ocfs2-1.2-cert/patches/ocfs2-reco_nofs.patch
branches/ocfs2-1.2-cert/patches/ocfs2-validate_lvb_contents1.patch
branches/ocfs2-1.2-cert/patches/ocfs2_dlm-do_lvb_puts_inline2.patch
branches/ocfs2-1.2-cert/patches/series
branches/ocfs2-1.2-cert/patches/two-stage-finalize.patch
Modified:
branches/ocfs2-1.2-cert/
Log:
o Added quilt patches as of 2006.03.27 14:28
Signed-off-by: mfasheh
Property changes on: branches/ocfs2-1.2-cert
___________________________________________________________________
Name: svn:ignore
- configure
Config.make
config.cache
config.log
config.status
autom4te.cache
*.rpm
*.tar.gz
.*.sw?
+ configure
Config.make
config.cache
config.log
config.status
autom4te.cache
*.rpm
*.tar.gz
.*.sw?
.pc
Added: branches/ocfs2-1.2-cert/patches/debug-mastery.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/debug-mastery.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/debug-mastery.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,46 @@
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmmaster.c 2006-03-21 13:11:58.618620000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c 2006-03-21 13:19:43.675820000 -0800
+@@ -1621,6 +1621,8 @@ again:
+ dlm_node_iter_init(nodemap, &iter);
+ while ((to = dlm_node_iter_next(&iter)) >= 0) {
+ int r = 0;
++ struct dlm_master_list_entry *mle = NULL;
++
+ mlog(0, "sending assert master to %d (%.*s)\n", to,
+ namelen, lockname);
+ memset(&assert, 0, sizeof(assert));
+@@ -1645,7 +1647,16 @@ again:
+ /* ok, something horribly messed. kill thyself. */
+ mlog(ML_ERROR,"during assert master of %.*s to %u, "
+ "got %d.\n", namelen, lockname, to, r);
+- dlm_dump_lock_resources(dlm);
++ spin_lock(&dlm->spinlock);
++ spin_lock(&dlm->master_lock);
++ if (dlm_find_mle(dlm, &mle, (char *)lockname,
++ namelen)) {
++ dlm_print_one_mle(mle);
++ __dlm_put_mle(mle);
++ }
++ spin_unlock(&dlm->master_lock);
++ spin_unlock(&dlm->spinlock);
++ // dlm_dump_lock_resources(dlm);
+ BUG();
+ } else if (r == EAGAIN) {
+ mlog(0, "%.*s: node %u create mles on other "
+@@ -1909,12 +1920,13 @@ done:
+
+ kill:
+ /* kill the caller! */
++ __dlm_print_one_lock_resource(res);
+ spin_unlock(&res->spinlock);
+ spin_unlock(&dlm->spinlock);
+ dlm_lockres_put(res);
+ mlog(ML_ERROR, "Bad message received from another node. Dumping state "
+ "and killing the other node now! This node is OK and can continue.\n");
+- dlm_dump_lock_resources(dlm);
++ // dlm_dump_lock_resources(dlm);
+ dlm_put(dlm);
+ return -EINVAL;
+ }
Added: branches/ocfs2-1.2-cert/patches/dlm-eloop.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/dlm-eloop.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/dlm-eloop.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,25 @@
+Index: fs/ocfs2/dlm/dlmmaster.c
+===================================================================
+--- fs/ocfs2/dlm/dlmmaster.c.orig 2006-03-22 14:36:02.303204000 -0800
++++ fs/ocfs2/dlm/dlmmaster.c 2006-03-23 18:44:36.290960000 -0800
+@@ -983,12 +983,14 @@ recheck:
+ spin_unlock(&res->spinlock);
+ /* this will cause the master to re-assert across
+ * the whole cluster, freeing up mles */
+- ret = dlm_do_master_request(mle, res->owner);
+- if (ret < 0) {
+- /* give recovery a chance to run */
+- mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
+- msleep(500);
+- goto recheck;
++ if (res->owner != dlm->node_num) {
++ ret = dlm_do_master_request(mle, res->owner);
++ if (ret < 0) {
++ /* give recovery a chance to run */
++ mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
++ msleep(500);
++ goto recheck;
++ }
+ }
+ ret = 0;
+ goto leave;
Added: branches/ocfs2-1.2-cert/patches/dlm-kurt-mar20-2.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/dlm-kurt-mar20-2.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/dlm-kurt-mar20-2.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,326 @@
+diff -u dlmthread.c dlmthread.c
+--- fs/ocfs2/dlm/dlmthread.c (working copy)
++++ fs/ocfs2/dlm/dlmthread.c (working copy)
+@@ -39,6 +39,7 @@
+ #include <linux/inet.h>
+ #include <linux/timer.h>
+ #include <linux/kthread.h>
++#include <linux/delay.h>
+
+
+ #include "cluster/heartbeat.h"
+@@ -166,6 +167,7 @@
+ } else if (ret < 0) {
+ mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
+ lockres->lockname.len, lockres->lockname.name);
++ msleep(100);
+ goto again;
+ }
+
+diff -u dlmmaster.c dlmmaster.c
+--- fs/ocfs2/dlm/dlmmaster.c (working copy)
++++ fs/ocfs2/dlm/dlmmaster.c (working copy)
+@@ -1519,15 +1519,12 @@
+ mlog_errno(-ENOMEM);
+ goto send_response;
+ }
+- spin_lock(&dlm->spinlock);
+- dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
+- name, namelen);
+- spin_unlock(&dlm->spinlock);
+ goto way_up_top;
+ }
+
+ // mlog(0, "this is second time thru, already allocated, "
+ // "add the block.\n");
++ dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
+ set_bit(request->node_idx, mle->maybe_map);
+ list_add(&mle->list, &dlm->master_list);
+ response = DLM_MASTER_RESP_NO;
+@@ -1700,7 +1697,7 @@
+ if (bit >= O2NM_MAX_NODES) {
+ /* not necessarily an error, though less likely.
+ * could be master just re-asserting. */
+- mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
++ mlog(ML_NOTICE, "no bits set in the maybe_map, but %u "
+ "is asserting! (%.*s)\n", assert->node_idx,
+ namelen, name);
+ } else if (bit != assert->node_idx) {
+@@ -1712,13 +1709,30 @@
+ * number winning the mastery will respond
+ * YES to mastery requests, but this node
+ * had no way of knowing. let it pass. */
+- mlog(ML_ERROR, "%u is the lowest node, "
++ mlog(ML_NOTICE, "%u is the lowest node, "
+ "%u is asserting. (%.*s) %u must "
+ "have begun after %u won.\n", bit,
+ assert->node_idx, namelen, name, bit,
+ assert->node_idx);
+ }
+ }
++ if (mle->type == DLM_MLE_MIGRATION) {
++ if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
++ mlog(ML_NOTICE, "%s:%.*s: got cleanup assert"
++ " from %u for migration\n",
++ dlm->name, namelen, name,
++ assert->node_idx);
++ } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
++ mlog(ML_NOTICE, "%s:%.*s: got unrelated assert"
++ " from %u for migration, ignoring\n",
++ dlm->name, namelen, name,
++ assert->node_idx);
++ __dlm_put_mle(mle);
++ spin_unlock(&dlm->master_lock);
++ spin_unlock(&dlm->spinlock);
++ goto done;
++ }
++ }
+ }
+ spin_unlock(&dlm->master_lock);
+
+@@ -2228,6 +2242,9 @@
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+ dlm_put_mle_inuse(mle);
++ spin_lock(&res->spinlock);
++ res->state &= ~DLM_LOCK_RES_MIGRATING;
++ spin_unlock(&res->spinlock);
+ goto leave;
+ }
+
+@@ -2257,8 +2274,8 @@
+ /* avoid hang during shutdown when migrating lockres
+ * to a node which also goes down */
+ if (dlm_is_node_dead(dlm, target)) {
+- mlog(0, "%s:%.*s: expected migration target %u "
+- "is no longer up. restarting.\n",
++ mlog(ML_NOTICE, "%s:%.*s: expected migration "
++ "target %u is no longer up, restarting\n",
+ dlm->name, res->lockname.len,
+ res->lockname.name, target);
+ ret = -ERESTARTSYS;
+@@ -2269,6 +2286,9 @@
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+ dlm_put_mle_inuse(mle);
++ spin_lock(&res->spinlock);
++ res->state &= ~DLM_LOCK_RES_MIGRATING;
++ spin_unlock(&res->spinlock);
+ goto leave;
+ }
+ /* TODO: if node died: stop, clean up, return error */
+@@ -2671,6 +2691,7 @@
+ /* remove it from the list so that only one
+ * mle will be found */
+ list_del_init(&tmp->list);
++ __dlm_mle_detach_hb_events(dlm, mle);
+ }
+ spin_unlock(&tmp->spinlock);
+ }
+@@ -2764,14 +2785,15 @@
+
+ /* remove from the list early. NOTE: unlinking
+ * list_head while in list_for_each_safe */
++ __dlm_mle_detach_hb_events(dlm, mle);
+ spin_lock(&mle->spinlock);
+ list_del_init(&mle->list);
+ atomic_set(&mle->woken, 1);
+ spin_unlock(&mle->spinlock);
+ wake_up(&mle->wq);
+
+- mlog(0, "node %u died during migration from "
+- "%u to %u!\n", dead_node,
++ mlog(ML_NOTICE, "%s: node %u died during migration from "
++ "%u to %u!\n", dlm->name, dead_node,
+ mle->master, mle->new_master);
+ /* if there is a lockres associated with this
+ * mle, find it and set its owner to UNKNOWN */
+diff -u dlmrecovery.c dlmrecovery.c
+--- fs/ocfs2/dlm/dlmrecovery.c (working copy)
++++ fs/ocfs2/dlm/dlmrecovery.c (working copy)
+@@ -835,6 +835,7 @@
+ struct list_head *iter;
+ int ret;
+ u8 dead_node, reco_master;
++ int skip_all_done = 0;
+
+ dlm = item->dlm;
+ dead_node = item->u.ral.dead_node;
+@@ -874,12 +875,21 @@
+ dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
+
+ /* now we can begin blasting lockreses without the dlm lock */
++
++ /* any errors returned will be due to the new_master dying,
++ * the dlm_reco_thread should detect this */
+ list_for_each(iter, &resources) {
+ res = list_entry (iter, struct dlm_lock_resource, recovering);
+ ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
+ DLM_MRES_RECOVERY);
+- if (ret < 0)
++ if (ret < 0) {
+ mlog_errno(ret);
++ mlog(ML_ERROR, "%s: node %u went down while sending "
++ "recovery state for dead node %u\n", dlm->name,
++ reco_master, dead_node);
++ skip_all_done = 1;
++ break;
++ }
+ }
+
+ /* move the resources back to the list */
+@@ -887,9 +897,15 @@
+ list_splice_init(&resources, &dlm->reco.resources);
+ spin_unlock(&dlm->spinlock);
+
+- ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
+- if (ret < 0)
+- mlog_errno(ret);
++ if (skip_all_done) {
++ ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
++ if (ret < 0) {
++ mlog_errno(ret);
++ mlog(ML_ERROR, "%s: node %u went down while sending "
++ "recovery all-done for dead node %u\n", dlm->name,
++ reco_master, dead_node);
++ }
++ }
+
+ free_page((unsigned long)data);
+ }
+@@ -909,8 +925,14 @@
+
+ ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
+ sizeof(done_msg), send_to, &tmpret);
+- /* negative status is ignored by the caller */
+- if (ret >= 0)
++ if (ret < 0) {
++ if (!dlm_is_host_down(ret)) {
++ mlog_errno(ret);
++ mlog(ML_ERROR, "%s: unknown error sending data-done "
++ "to %u\n", dlm->name, send_to);
++ BUG();
++ }
++ } else
+ ret = tmpret;
+ return ret;
+ }
+@@ -1136,8 +1158,9 @@
+ ml->type == LKM_PRMODE) {
+ /* if it is already set, this had better be a PR
+ * and it has to match */
+- if (mres->lvb[0] && (ml->type == LKM_EXMODE ||
+- memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
++ if (!dlm_lvb_is_empty(mres->lvb) &&
++ (ml->type == LKM_EXMODE ||
++ memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
+ mlog(ML_ERROR, "mismatched lvbs!\n");
+ __dlm_print_one_lock_resource(lock->lockres);
+ BUG();
+@@ -1196,22 +1219,25 @@
+ * we must send it immediately. */
+ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
+ res, total_locks);
+- if (ret < 0) {
+- // TODO
+- mlog(ML_ERROR, "dlm_send_mig_lockres_msg "
+- "returned %d, TODO\n", ret);
+- BUG();
+- }
++ if (ret < 0)
++ goto error;
+ }
+ }
+ /* flush any remaining locks */
+ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
+- if (ret < 0) {
+- // TODO
+- mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, "
+- "TODO\n", ret);
++ if (ret < 0)
++ goto error;
++ return ret;
++
++error:
++ mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",
++ dlm->name, ret);
++ if (!dlm_is_host_down(ret))
+ BUG();
+- }
++ mlog(ML_NOTICE, "%s: node %u went down while sending %s "
++ "lockres %.*s\n", dlm->name, send_to,
++ flags & DLM_MRES_RECOVERY ? "recovery" : "migration",
++ res->lockname.len, res->lockname.name);
+ return ret;
+ }
+
+@@ -1560,6 +1586,7 @@
+ ret += list_num;
+ return ret;
+ }
++
+ /* TODO: do ast flush business
+ * TODO: do MIGRATING and RECOVERING spinning
+ */
+@@ -1667,7 +1694,7 @@
+ lksb->flags |= (ml->flags &
+ (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
+
+- if (mres->lvb[0]) {
++ if (!dlm_lvb_is_empty(mres->lvb)) {
+ if (lksb->flags & DLM_LKSB_PUT_LVB) {
+ /* other node was trying to update
+ * lvb when node died. recreate the
+@@ -1678,8 +1705,9 @@
+ * most recent valid lvb info */
+ BUG_ON(ml->type != LKM_EXMODE &&
+ ml->type != LKM_PRMODE);
+- if (res->lvb[0] && (ml->type == LKM_EXMODE ||
+- memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
++ if (!dlm_lvb_is_empty(res->lvb) &&
++ (ml->type == LKM_EXMODE ||
++ memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+ mlog(ML_ERROR, "received bad lvb!\n");
+ __dlm_print_one_lock_resource(res);
+ BUG();
+only in patch2:
+unchanged:
+--- fs/ocfs2/dlm/dlmunlock.c (revision 2787)
++++ fs/ocfs2/dlm/dlmunlock.c (working copy)
+@@ -318,6 +318,16 @@ static enum dlm_status dlm_send_remote_u
+ size_t veclen = 1;
+
+ mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
++
++ if (owner == dlm->node_num) {
++ /* ended up trying to contact ourself. this means
++ * that the lockres had been remote but became local
++ * via a migration. just retry it, now as local */
++ mlog(0, "%s:%.*s: this node became the master due to a "
++ "migration, re-evaluate now\n", dlm->name,
++ res->lockname.len, res->lockname.name);
++ return DLM_FORWARD;
++ }
+
+ memset(&unlock, 0, sizeof(unlock));
+ unlock.node_idx = dlm->node_num;
+only in patch2:
+unchanged:
+--- fs/ocfs2/dlm/dlmcommon.h (revision 2787)
++++ fs/ocfs2/dlm/dlmcommon.h (working copy)
+@@ -300,6 +300,15 @@ enum dlm_lockres_list {
+ DLM_BLOCKED_LIST
+ };
+
++static inline int dlm_lvb_is_empty(char *lvb)
++{
++ int i;
++ for (i=0; i<DLM_LVB_LEN; i++)
++ if (lvb[i])
++ return 0;
++ return 1;
++}
++
+ static inline struct list_head *
+ dlm_list_idx_to_ptr(struct dlm_lock_resource *res, enum dlm_lockres_list idx)
+ {
Added: branches/ocfs2-1.2-cert/patches/em-silence-eexist.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/em-silence-eexist.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/em-silence-eexist.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,73 @@
+Index: fs/ocfs2/extent_map.c
+===================================================================
+--- fs/ocfs2/extent_map.c (revision 2787)
++++ fs/ocfs2/extent_map.c (working copy)
+@@ -296,7 +296,7 @@ static int ocfs2_extent_map_find_leaf(st
+
+ ret = ocfs2_extent_map_insert(inode, rec,
+ le16_to_cpu(el->l_tree_depth));
+- if (ret) {
++ if (ret && (ret != -EEXIST)) {
+ mlog_errno(ret);
+ goto out_free;
+ }
+@@ -425,6 +425,11 @@ static int ocfs2_extent_map_insert_entry
+ /*
+ * Simple rule: on any return code other than -EAGAIN, anything left
+ * in the insert_context will be freed.
++ *
++ * Simple rule #2: A return code of -EEXIST from this function or
++ * its calls to ocfs2_extent_map_insert_entry() signifies that another
++ * thread beat us to the insert. It is not an actual error, but it
++ * tells the caller we have no more work to do.
+ */
+ static int ocfs2_extent_map_try_insert(struct inode *inode,
+ struct ocfs2_extent_rec *rec,
+@@ -446,23 +451,32 @@ static int ocfs2_extent_map_try_insert(s
+ goto out_unlock;
+ }
+
++ /* Since insert_entry failed, the map MUST have old_ent */
+ old_ent = ocfs2_extent_map_lookup(em, le32_to_cpu(rec->e_cpos),
+- le32_to_cpu(rec->e_clusters), NULL,
+- NULL);
+-
++ le32_to_cpu(rec->e_clusters),
++ NULL, NULL);
+ if (!old_ent)
+ BUG();
+
+- ret = -EEXIST;
+- if (old_ent->e_tree_depth < tree_depth)
++ if (old_ent->e_tree_depth < tree_depth) {
++ /* Another thread beat us to the lower tree_depth */
++ ret = -EEXIST;
+ goto out_unlock;
++ }
+
+ if (old_ent->e_tree_depth == tree_depth) {
++ /*
++ * Another thread beat us to this tree_depth.
++ * Let's make sure we agree with that thread (the
++ * extent_rec should be identical).
++ */
+ if (!memcmp(rec, &old_ent->e_rec,
+ sizeof(struct ocfs2_extent_rec)))
+ ret = 0;
++ else
++ /* FIXME: Should this be ESRCH/EBADR??? */
++ ret = -EEXIST;
+
+- /* FIXME: Should this be ESRCH/EBADR??? */
+ goto out_unlock;
+ }
+
+@@ -597,7 +611,7 @@ int ocfs2_extent_map_insert(struct inode
+ tree_depth, &ctxt);
+ } while (ret == -EAGAIN);
+
+- if (ret < 0)
++ if ((ret < 0) && (ret != -EEXIST))
+ mlog_errno(ret);
+
+ if (ctxt.left_ent)
Added: branches/ocfs2-1.2-cert/patches/extra-em-trace3.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/extra-em-trace3.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/extra-em-trace3.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,74 @@
+diff --git a/fs/ocfs2/extent_map.c b/fs/ocfs2/extent_map.c
+index e6f207e..3b8e393 100644
+--- a/fs/ocfs2/extent_map.c
++++ b/fs/ocfs2/extent_map.c
+@@ -455,13 +455,36 @@ static int ocfs2_extent_map_try_insert(s
+ BUG_ON(!old_ent);
+
+ ret = -EEXIST;
+- if (old_ent->e_tree_depth < tree_depth)
++ if (old_ent->e_tree_depth < tree_depth) {
++ mlog(0, "Trying to add an extent record at tree depth"
++ " %d for inode %llu, but the extent map already"
++ " contains a record at tree depth %d\n",
++ tree_depth,
++ (unsigned long long)OCFS2_I(inode)->ip_blkno,
++ old_ent->e_tree_depth);
++ mlog(0,
++ "old_ent"
++ " (depth %d, e_cpos %u, e_clusters %u, e_blkno %llu),"
++ " new_ent"
++ " (depth %d, e_cpos %u, e_clusters %u, e_blkno %llu)\n",
++ old_ent->e_tree_depth,
++ le32_to_cpu(old_ent->e_rec.e_cpos),
++ le32_to_cpu(old_ent->e_rec.e_clusters),
++ (unsigned long long)le64_to_cpu(old_ent->e_rec.e_blkno),
++ tree_depth,
++ le32_to_cpu(rec->e_cpos),
++ le32_to_cpu(rec->e_clusters),
++ (unsigned long long)le64_to_cpu(rec->e_blkno));
++ mlog_errno(ret);
+ goto out_unlock;
++ }
+
+ if (old_ent->e_tree_depth == tree_depth) {
+ if (!memcmp(rec, &old_ent->e_rec,
+ sizeof(struct ocfs2_extent_rec)))
+ ret = 0;
++ else
++ mlog_errno(ret);
+
+ /* FIXME: Should this be ESRCH/EBADR??? */
+ goto out_unlock;
+@@ -511,16 +534,20 @@ static int ocfs2_extent_map_try_insert(s
+ if (ctxt->need_left) {
+ ret = ocfs2_extent_map_insert_entry(em,
+ ctxt->left_ent);
+- if (ret)
++ if (ret) {
++ mlog_errno(ret);
+ goto out_unlock;
++ }
+ ctxt->left_ent = NULL;
+ }
+
+ if (ctxt->need_right) {
+ ret = ocfs2_extent_map_insert_entry(em,
+ ctxt->right_ent);
+- if (ret)
++ if (ret) {
++ mlog_errno(ret);
+ goto out_unlock;
++ }
+ ctxt->right_ent = NULL;
+ }
+
+@@ -528,6 +555,8 @@ static int ocfs2_extent_map_try_insert(s
+
+ if (!ret)
+ ctxt->new_ent = NULL;
++ else
++ mlog_errno(ret);
+
+ out_unlock:
+ spin_unlock(&OCFS2_I(inode)->ip_lock);
Added: branches/ocfs2-1.2-cert/patches/fix-purge-lockres.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/fix-purge-lockres.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/fix-purge-lockres.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,64 @@
+Index: fs/ocfs2/dlm/dlmthread.c
+===================================================================
+--- fs/ocfs2/dlm/dlmthread.c.orig 2006-03-22 14:36:02.385122000 -0800
++++ fs/ocfs2/dlm/dlmthread.c 2006-03-22 23:50:11.592040000 -0800
+@@ -57,6 +57,8 @@ extern spinlock_t dlm_domain_lock;
+ extern struct list_head dlm_domains;
+
+ static int dlm_thread(void *data);
++static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
++ struct dlm_lock_resource *lockres);
+
+ #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
+
+@@ -112,10 +114,23 @@ void __dlm_lockres_calc_usage(struct dlm
+ res->last_used = jiffies;
+ list_add_tail(&res->purge, &dlm->purge_list);
+ dlm->purge_count++;
++
++ /* if this node is not the owner, there is
++ * no way to keep track of who the owner could be.
++ * unhash it to avoid serious problems. */
++ if (res->owner != dlm->node_num) {
++ mlog(0, "%s:%.*s: doing immediate "
++ "purge of lockres owned by %u\n",
++ dlm->name, res->lockname.len,
++ res->lockname.name, res->owner);
++
++ dlm_purge_lockres_now(dlm, res);
++ }
+ }
+ } else if (!list_empty(&res->purge)) {
+- mlog(0, "removing lockres %.*s from purge list\n",
+- res->lockname.len, res->lockname.name);
++ mlog(0, "removing lockres %.*s from purge list, "
++ "owner=%u\n", res->lockname.len, res->lockname.name,
++ res->owner);
+
+ list_del_init(&res->purge);
+ dlm->purge_count--;
+@@ -181,6 +196,24 @@ finish:
+ __dlm_unhash_lockres(lockres);
+ }
+
++/* make an unused lockres go away immediately.
++ * as soon as the dlm spinlock is dropped, this lockres
++ * will not be found. kfree still happens on last put. */
++static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
++ struct dlm_lock_resource *lockres)
++{
++ assert_spin_locked(&dlm->spinlock);
++ assert_spin_locked(&lockres->spinlock);
++
++ BUG_ON(!__dlm_lockres_unused(lockres));
++
++ if (!list_empty(&lockres->purge)) {
++ list_del_init(&lockres->purge);
++ dlm->purge_count--;
++ }
++ __dlm_unhash_lockres(lockres);
++}
++
+ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
+ int purge_now)
+ {
Added: branches/ocfs2-1.2-cert/patches/hold-recovery-ref.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/hold-recovery-ref.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/hold-recovery-ref.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,73 @@
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmrecovery.c
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmrecovery.c 2006-03-21 19:30:46.726473000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmrecovery.c 2006-03-22 13:20:39.267067000 -0800
+@@ -1783,8 +1783,14 @@ void dlm_move_lockres_to_recovery_list(s
+ struct dlm_lock *lock;
+
+ res->state |= DLM_LOCK_RES_RECOVERING;
+- if (!list_empty(&res->recovering))
++ if (!list_empty(&res->recovering)) {
++ mlog(ML_NOTICE,
++ "Recovering res %s:%.*s, is already on recovery list!\n",
++ dlm->name, res->lockname.len, res->lockname.name);
+ list_del_init(&res->recovering);
++ }
++ /* We need to hold a reference while on the recovery list */
++ dlm_lockres_get(res);
+ list_add_tail(&res->recovering, &dlm->reco.resources);
+
+ /* find any pending locks and put them back on proper list */
+@@ -1873,9 +1879,11 @@ static void dlm_finish_local_lockres_rec
+ spin_lock(&res->spinlock);
+ dlm_change_lockres_owner(dlm, res, new_master);
+ res->state &= ~DLM_LOCK_RES_RECOVERING;
+- __dlm_dirty_lockres(dlm, res);
++ if (!__dlm_lockres_unused(res))
++ __dlm_dirty_lockres(dlm, res);
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
++ dlm_lockres_put(res);
+ }
+ }
+
+@@ -1908,11 +1916,13 @@ static void dlm_finish_local_lockres_rec
+ dlm->name, res->lockname.len,
+ res->lockname.name, res->owner);
+ list_del_init(&res->recovering);
++ dlm_lockres_put(res);
+ }
+ spin_lock(&res->spinlock);
+ dlm_change_lockres_owner(dlm, res, new_master);
+ res->state &= ~DLM_LOCK_RES_RECOVERING;
+- __dlm_dirty_lockres(dlm, res);
++ if (!__dlm_lockres_unused(res))
++ __dlm_dirty_lockres(dlm, res);
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+ }
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmcommon.h
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmcommon.h 2006-03-21 19:30:46.711489000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmcommon.h 2006-03-22 11:59:46.979544000 -0800
+@@ -858,6 +858,7 @@ int dlm_lock_basts_flushed(struct dlm_ct
+
+
+ int dlm_dump_all_mles(const char __user *data, unsigned int len);
++int __dlm_lockres_unused(struct dlm_lock_resource *res);
+
+
+ static inline const char * dlm_lock_mode_name(int mode)
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmthread.c
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmthread.c 2006-03-22 11:58:41.192580000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmthread.c 2006-03-22 11:59:07.989988000 -0800
+@@ -82,7 +82,7 @@ repeat:
+ }
+
+
+-static int __dlm_lockres_unused(struct dlm_lock_resource *res)
++int __dlm_lockres_unused(struct dlm_lock_resource *res)
+ {
+ if (list_empty(&res->granted) &&
+ list_empty(&res->converting) &&
Added: branches/ocfs2-1.2-cert/patches/leave-other-dead-nodes-on-recovery-list.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/leave-other-dead-nodes-on-recovery-list.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/leave-other-dead-nodes-on-recovery-list.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,39 @@
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmrecovery.c
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmrecovery.c 2006-03-24 16:47:23.774339000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmrecovery.c 2006-03-24 16:48:04.772260000 -0800
+@@ -1992,14 +1992,23 @@ static void dlm_finish_local_lockres_rec
+ bucket = &(dlm->lockres_hash[i]);
+ hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
+ if (res->state & DLM_LOCK_RES_RECOVERING) {
++ mlog(ML_NOTICE,
++ "Continue! owner = %u, dead_node = %u, this = %u, name = %.*s\n",
++ res->owner, dead_node,
++ dlm->node_num,
++ res->lockname.len,
++ res->lockname.name);
++ continue;
++
++ /* The rest of this is a bug */
+ if (res->owner == dead_node) {
+- mlog(0, "(this=%u) res %.*s owner=%u "
++ mlog(ML_NOTICE, "(this=%u) res %.*s owner=%u "
+ "was not on recovering list, but "
+ "clearing state anyway\n",
+ dlm->node_num, res->lockname.len,
+ res->lockname.name, new_master);
+ } else if (res->owner == dlm->node_num) {
+- mlog(0, "(this=%u) res %.*s owner=%u "
++ mlog(ML_NOTICE, "(this=%u) res %.*s owner=%u "
+ "was not on recovering list, "
+ "owner is THIS node, clearing\n",
+ dlm->node_num, res->lockname.len,
+@@ -2008,7 +2017,7 @@ static void dlm_finish_local_lockres_rec
+ continue;
+
+ if (!list_empty(&res->recovering)) {
+- mlog(0, "%s:%.*s: lockres was "
++ mlog(ML_NOTICE, "%s:%.*s: lockres was "
+ "marked RECOVERING, owner=%u\n",
+ dlm->name, res->lockname.len,
+ res->lockname.name, res->owner);
Added: branches/ocfs2-1.2-cert/patches/lockres-release-info.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/lockres-release-info.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/lockres-release-info.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,25 @@
+Index: ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c
+===================================================================
+--- ocfs2-1.2.orig/fs/ocfs2/dlm/dlmmaster.c 2006-03-20 17:08:31.633920000 -0800
++++ ocfs2-1.2/fs/ocfs2/dlm/dlmmaster.c 2006-03-21 13:09:51.785260000 -0800
+@@ -613,6 +613,20 @@ static void dlm_lockres_release(struct k
+ mlog(0, "destroying lockres %.*s\n", res->lockname.len,
+ res->lockname.name);
+
++ if (!hlist_unhashed(&res->hash_node) ||
++ !list_empty(&res->granted) ||
++ !list_empty(&res->converting) ||
++ !list_empty(&res->blocked) ||
++ !list_empty(&res->dirty) ||
++ !list_empty(&res->recovering) ||
++ !list_empty(&res->purge)) {
++ mlog(ML_ERROR,
++ "Going to BUG for resource %.*s."
++ " We're on a list!\n",
++ res->lockname.len, res->lockname.name);
++ dlm_print_one_lock_resource(res);
++ }
++
+ /* By the time we're ready to blow this guy away, we shouldn't
+ * be on any lists. */
+ BUG_ON(!hlist_unhashed(&res->hash_node));
Added: branches/ocfs2-1.2-cert/patches/lvb-recovery-fix.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/lvb-recovery-fix.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/lvb-recovery-fix.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,139 @@
+Index: fs/ocfs2/dlm/dlmrecovery.c
+===================================================================
+--- fs/ocfs2/dlm/dlmrecovery.c.orig 2006-03-22 14:36:11.772185000 -0800
++++ fs/ocfs2/dlm/dlmrecovery.c 2006-03-23 18:31:54.877480000 -0800
+@@ -1606,6 +1606,66 @@ dlm_list_num_to_pointer(struct dlm_lock_
+ * TODO: do MIGRATING and RECOVERING spinning
+ */
+
++#define DLM_OCFS2_SEC_SHIFT (64 - 34)
++#define DLM_OCFS2_NSEC_MASK ((1ULL << DLM_OCFS2_SEC_SHIFT) - 1)
++
++struct floo {
++ __be32 lvb_old_seq;
++ __be32 lvb_version;
++ __be32 lvb_iclusters;
++ __be32 lvb_iuid;
++ __be32 lvb_igid;
++ __be16 lvb_imode;
++ __be16 lvb_inlink;
++ __be64 lvb_iatime_packed;
++ __be64 lvb_ictime_packed;
++ __be64 lvb_imtime_packed;
++ __be64 lvb_isize;
++ __be32 lvb_reserved[2];
++};
++
++// OLDSEQ-- VERSION- CLUSTERS UIUD---- IGID---- MODE NLNK ATIMEPACKED----- CTIMEPACKED----- MTIMEPACKED----- ISIZE----------- RESERVED--------
++// 00000000 00000001 00000001 0000c09f 00000262 41ff 0006 10f45a50844bfa5b 110885ed11acf024 110885ed11acf024 0000000000003000 0000000000000000
++static inline void dlm_print_ocfs2_lvb(unsigned char *lvb)
++{
++ struct floo *raw = (struct floo *)lvb;
++ u32 clusters, uid, gid, oldseq, vers;
++ u16 mode, nlink;
++ u64 isize, atime, mtime, ctime;
++ /* just do some lame decoding, doesn't need to be too
++ * accurate, just cut the encoded value into smaller values */
++
++
++ oldseq = be32_to_cpu(raw->lvb_old_seq);
++ vers = be32_to_cpu(raw->lvb_version);
++ clusters= be32_to_cpu(raw->lvb_iclusters);
++ isize = be64_to_cpu(raw->lvb_isize);
++ uid = be32_to_cpu(raw->lvb_iuid);
++ gid = be32_to_cpu(raw->lvb_igid);
++ mode = be16_to_cpu(raw->lvb_imode);
++ nlink = be16_to_cpu(raw->lvb_inlink);
++ /* just print out the tv_sec portion */
++ atime = be64_to_cpu(raw->lvb_iatime_packed) >> DLM_OCFS2_SEC_SHIFT;
++ mtime = be64_to_cpu(raw->lvb_imtime_packed) >> DLM_OCFS2_SEC_SHIFT;
++ ctime = be64_to_cpu(raw->lvb_ictime_packed) >> DLM_OCFS2_SEC_SHIFT;
++ printk("[%u:%u:%u:%llu:%u:%u:%u:%u:%llu:%llu:%llu]", oldseq, vers,
++ clusters, (unsigned long long)isize, uid, gid, mode,
++ nlink, (unsigned long long)atime,
++ (unsigned long long)mtime, (unsigned long long)ctime);
++}
++
++static inline void dlm_print_lvb(unsigned char *lvb)
++{
++#if 0
++ int i;
++ for (i=0; i<DLM_LVB_LEN; i++)
++ printk("%02x", (unsigned char)lvb[i]);
++#endif
++
++ dlm_print_ocfs2_lvb(lvb);
++}
++
++
+ /*
+ * NOTE about in-flight requests during migration:
+ *
+@@ -1708,13 +1768,21 @@ static int dlm_process_recovery_data(str
+ }
+ lksb->flags |= (ml->flags &
+ (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
+-
++
++ if (ml->type == LKM_NLMODE)
++ goto skip_lvb;
++
+ if (!dlm_lvb_is_empty(mres->lvb)) {
+ if (lksb->flags & DLM_LKSB_PUT_LVB) {
+ /* other node was trying to update
+ * lvb when node died. recreate the
+ * lksb with the updated lvb. */
+ memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
++ /* the lock resource lvb update must happen
++ * NOW, before the spinlock is dropped.
++ * we no longer wait for the AST to update
++ * the lvb. */
++ memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
+ } else {
+ /* otherwise, the node is sending its
+ * most recent valid lvb info */
+@@ -1723,17 +1791,19 @@ static int dlm_process_recovery_data(str
+ if (!dlm_lvb_is_empty(res->lvb) &&
+ (ml->type == LKM_EXMODE ||
+ memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+- int i;
++ u64 c = be64_to_cpu(lock->ml.cookie);
+ mlog(ML_ERROR, "%s:%.*s: received bad "
+- "lvb! type=%d\n", dlm->name,
+- res->lockname.len,
+- res->lockname.name, ml->type);
++ "lvb! type=%d, convtype=%d, "
++ "node=%u, cookie=%u:%llu\n",
++ dlm->name, res->lockname.len,
++ res->lockname.name, ml->type,
++ ml->convert_type, ml->node,
++ dlm_get_lock_cookie_node(c),
++ dlm_get_lock_cookie_seq(c));
+ printk("lockres lvb=[");
+- for (i=0; i<DLM_LVB_LEN; i++)
+- printk("%02x", res->lvb[i]);
++ dlm_print_lvb(res->lvb);
+ printk("]\nmigrated lvb=[");
+- for (i=0; i<DLM_LVB_LEN; i++)
+- printk("%02x", mres->lvb[i]);
++ dlm_print_lvb(mres->lvb);
+ printk("]\n");
+ dlm_print_one_lock_resource(res);
+ BUG();
+@@ -1741,7 +1811,7 @@ static int dlm_process_recovery_data(str
+ memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
+ }
+ }
+-
++skip_lvb:
+
+ /* NOTE:
+ * wrt lock queue ordering and recovery:
+@@ -1762,6 +1832,7 @@ static int dlm_process_recovery_data(str
+ bad = 0;
+ spin_lock(&res->spinlock);
+ list_for_each_entry(lock, queue, list) {
++#warning does this need be64_to_cpu conversion?
+ if (lock->ml.cookie == ml->cookie) {
+ u64 c = lock->ml.cookie;
+ mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
Added: branches/ocfs2-1.2-cert/patches/mar20-full-3.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/mar20-full-3.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/mar20-full-3.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,1094 @@
+Index: fs/ocfs2/dlm/dlmthread.c
+===================================================================
+--- fs/ocfs2/dlm/dlmthread.c (revision 2787)
++++ fs/ocfs2/dlm/dlmthread.c (working copy)
+@@ -39,6 +39,7 @@
+ #include <linux/inet.h>
+ #include <linux/timer.h>
+ #include <linux/kthread.h>
++#include <linux/delay.h>
+
+
+ #include "cluster/heartbeat.h"
+@@ -166,6 +167,7 @@ again:
+ } else if (ret < 0) {
+ mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
+ lockres->lockname.len, lockres->lockname.name);
++ msleep(100);
+ goto again;
+ }
+
+@@ -658,8 +660,9 @@ static int dlm_thread(void *data)
+ * spinlock and do NOT have the dlm lock.
+ * safe to reserve/queue asts and run the lists. */
+
+- mlog(0, "calling dlm_shuffle_lists with dlm=%p, "
+- "res=%p\n", dlm, res);
++ mlog(0, "calling dlm_shuffle_lists with dlm=%s, "
++ "res=%.*s\n", dlm->name,
++ res->lockname.len, res->lockname.name);
+
+ /* called while holding lockres lock */
+ dlm_shuffle_lists(dlm, res);
+Index: fs/ocfs2/dlm/dlmmaster.c
+===================================================================
+--- fs/ocfs2/dlm/dlmmaster.c (revision 2787)
++++ fs/ocfs2/dlm/dlmmaster.c (working copy)
+@@ -73,6 +73,7 @@ struct dlm_master_list_entry
+ wait_queue_head_t wq;
+ atomic_t woken;
+ struct kref mle_refs;
++ int inuse;
+ unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+ unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+ unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+@@ -123,15 +124,30 @@ static inline int dlm_mle_equal(struct d
+ return 1;
+ }
+
++#define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m)
++void _dlm_print_nodemap(unsigned long *map, const char *mapname)
++{
++ int i;
++ printk("%s=[ ", mapname);
++ for (i=0; i<O2NM_MAX_NODES; i++)
++ if (test_bit(i, map))
++ printk("%d ", i);
++ printk("]");
++}
++
+ void dlm_print_one_mle(struct dlm_master_list_entry *mle)
+ {
+- int i = 0, refs;
++ int refs;
+ char *type;
+ char attached;
+ u8 master;
+ unsigned int namelen;
+ const char *name;
+ struct kref *k;
++ unsigned long *maybe = mle->maybe_map,
++ *vote = mle->vote_map,
++ *resp = mle->response_map,
++ *node = mle->node_map;
+
+ k = &mle->mle_refs;
+ if (mle->type == DLM_MLE_BLOCK)
+@@ -151,10 +167,19 @@ void dlm_print_one_mle(struct dlm_master
+ namelen = mle->u.res->lockname.len;
+ name = mle->u.res->lockname.name;
+ }
+-
+- mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n",
+- i, type, refs, master, mle->new_master, attached,
+- namelen, namelen, name);
++
++ mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
++ namelen, name, type, refs, master, mle->new_master, attached,
++ mle->inuse);
++ dlm_print_nodemap(maybe);
++ printk(", ");
++ dlm_print_nodemap(vote);
++ printk(", ");
++ dlm_print_nodemap(resp);
++ printk(", ");
++ dlm_print_nodemap(node);
++ printk(", ");
++ printk("\n");
+ }
+
+
+@@ -166,7 +191,6 @@ static void dlm_dump_mles(struct dlm_ctx
+ struct list_head *iter;
+
+ mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
+- mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
+ spin_lock(&dlm->master_lock);
+ list_for_each(iter, &dlm->master_list) {
+ mle = list_entry(iter, struct dlm_master_list_entry, list);
+@@ -310,6 +334,31 @@ static inline void dlm_mle_detach_hb_eve
+ __dlm_mle_detach_hb_events(dlm, mle);
+ spin_unlock(&dlm->spinlock);
+ }
++
++static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
++{
++ struct dlm_ctxt *dlm;
++ dlm = mle->dlm;
++
++ assert_spin_locked(&dlm->spinlock);
++ assert_spin_locked(&dlm->master_lock);
++ mle->inuse++;
++ kref_get(&mle->mle_refs);
++}
++
++static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
++{
++ struct dlm_ctxt *dlm;
++ dlm = mle->dlm;
++
++ spin_lock(&dlm->spinlock);
++ spin_lock(&dlm->master_lock);
++ mle->inuse--;
++ __dlm_put_mle(mle);
++ spin_unlock(&dlm->master_lock);
++ spin_unlock(&dlm->spinlock);
++
++}
+
+ /* remove from list and free */
+ static void __dlm_put_mle(struct dlm_master_list_entry *mle)
+@@ -319,9 +368,13 @@ static void __dlm_put_mle(struct dlm_mas
+
+ assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&dlm->master_lock);
+- BUG_ON(!atomic_read(&mle->mle_refs.refcount));
+-
+- kref_put(&mle->mle_refs, dlm_mle_release);
++ if (!atomic_read(&mle->mle_refs.refcount)) {
++ /* this may or may not crash, but who cares.
++ * it's a BUG. */
++ mlog(ML_ERROR, "bad mle: %p\n", mle);
++ dlm_print_one_mle(mle);
++ } else
++ kref_put(&mle->mle_refs, dlm_mle_release);
+ }
+
+
+@@ -364,6 +417,7 @@ static void dlm_init_mle(struct dlm_mast
+ memset(mle->response_map, 0, sizeof(mle->response_map));
+ mle->master = O2NM_MAX_NODES;
+ mle->new_master = O2NM_MAX_NODES;
++ mle->inuse = 0;
+
+ if (mle->type == DLM_MLE_MASTER) {
+ BUG_ON(!res);
+@@ -784,7 +838,7 @@ lookup:
+ * if so, the creator of the BLOCK may try to put the last
+ * ref at this time in the assert master handler, so we
+ * need an extra one to keep from a bad ptr deref. */
+- dlm_get_mle(mle);
++ dlm_get_mle_inuse(mle);
+ spin_unlock(&dlm->master_lock);
+ spin_unlock(&dlm->spinlock);
+
+@@ -806,6 +860,7 @@ lookup:
+ }
+
+ dlm_kick_recovery_thread(dlm);
++ msleep(100);
+ dlm_wait_for_recovery(dlm);
+
+ spin_lock(&dlm->spinlock);
+@@ -873,7 +928,7 @@ wait:
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+ /* put the extra ref */
+- dlm_put_mle(mle);
++ dlm_put_mle_inuse(mle);
+
+ wake_waiters:
+ spin_lock(&res->spinlock);
+@@ -955,6 +1010,12 @@ recheck:
+ "rechecking now\n", dlm->name, res->lockname.len,
+ res->lockname.name);
+ goto recheck;
++ } else {
++ if (!voting_done) {
++ mlog(0, "map not changed and voting not done "
++ "for %s:%.*s\n", dlm->name, res->lockname.len,
++ res->lockname.name);
++ }
+ }
+
+ if (m != O2NM_MAX_NODES) {
+@@ -1458,15 +1519,12 @@ way_up_top:
+ mlog_errno(-ENOMEM);
+ goto send_response;
+ }
+- spin_lock(&dlm->spinlock);
+- dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
+- name, namelen);
+- spin_unlock(&dlm->spinlock);
+ goto way_up_top;
+ }
+
+ // mlog(0, "this is second time thru, already allocated, "
+ // "add the block.\n");
++ dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
+ set_bit(request->node_idx, mle->maybe_map);
+ list_add(&mle->list, &dlm->master_list);
+ response = DLM_MASTER_RESP_NO;
+@@ -1639,7 +1697,7 @@ int dlm_assert_master_handler(struct o2n
+ if (bit >= O2NM_MAX_NODES) {
+ /* not necessarily an error, though less likely.
+ * could be master just re-asserting. */
+- mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
++ mlog(ML_NOTICE, "no bits set in the maybe_map, but %u "
+ "is asserting! (%.*s)\n", assert->node_idx,
+ namelen, name);
+ } else if (bit != assert->node_idx) {
+@@ -1651,13 +1709,30 @@ int dlm_assert_master_handler(struct o2n
+ * number winning the mastery will respond
+ * YES to mastery requests, but this node
+ * had no way of knowing. let it pass. */
+- mlog(ML_ERROR, "%u is the lowest node, "
++ mlog(ML_NOTICE, "%u is the lowest node, "
+ "%u is asserting. (%.*s) %u must "
+ "have begun after %u won.\n", bit,
+ assert->node_idx, namelen, name, bit,
+ assert->node_idx);
+ }
+ }
++ if (mle->type == DLM_MLE_MIGRATION) {
++ if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
++ mlog(ML_NOTICE, "%s:%.*s: got cleanup assert"
++ " from %u for migration\n",
++ dlm->name, namelen, name,
++ assert->node_idx);
++ } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
++ mlog(ML_NOTICE, "%s:%.*s: got unrelated assert"
++ " from %u for migration, ignoring\n",
++ dlm->name, namelen, name,
++ assert->node_idx);
++ __dlm_put_mle(mle);
++ spin_unlock(&dlm->master_lock);
++ spin_unlock(&dlm->spinlock);
++ goto done;
++ }
++ }
+ }
+ spin_unlock(&dlm->master_lock);
+
+@@ -1672,7 +1747,8 @@ int dlm_assert_master_handler(struct o2n
+ goto kill;
+ }
+ if (!mle) {
+- if (res->owner != assert->node_idx) {
++ if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
++ res->owner != assert->node_idx) {
+ mlog(ML_ERROR, "assert_master from "
+ "%u, but current owner is "
+ "%u! (%.*s)\n",
+@@ -1725,6 +1801,7 @@ ok:
+ if (mle) {
+ int extra_ref = 0;
+ int nn = -1;
++ int rr, err = 0;
+
+ spin_lock(&mle->spinlock);
+ if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
+@@ -1744,27 +1821,64 @@ ok:
+ wake_up(&mle->wq);
+ spin_unlock(&mle->spinlock);
+
+- if (mle->type == DLM_MLE_MIGRATION && res) {
+- mlog(0, "finishing off migration of lockres %.*s, "
+- "from %u to %u\n",
+- res->lockname.len, res->lockname.name,
+- dlm->node_num, mle->new_master);
++ if (res) {
+ spin_lock(&res->spinlock);
+- res->state &= ~DLM_LOCK_RES_MIGRATING;
+- dlm_change_lockres_owner(dlm, res, mle->new_master);
+- BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
++ if (mle->type == DLM_MLE_MIGRATION) {
++ mlog(0, "finishing off migration of lockres %.*s, "
++ "from %u to %u\n",
++ res->lockname.len, res->lockname.name,
++ dlm->node_num, mle->new_master);
++ res->state &= ~DLM_LOCK_RES_MIGRATING;
++ dlm_change_lockres_owner(dlm, res, mle->new_master);
++ BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
++ } else {
++ dlm_change_lockres_owner(dlm, res, mle->master);
++ }
+ spin_unlock(&res->spinlock);
+ }
+- /* master is known, detach if not already detached */
+- dlm_mle_detach_hb_events(dlm, mle);
+- dlm_put_mle(mle);
+
++ /* master is known, detach if not already detached.
++ * ensures that only one assert_master call will happen
++ * on this mle. */
++ spin_lock(&dlm->spinlock);
++ spin_lock(&dlm->master_lock);
++
++ rr = atomic_read(&mle->mle_refs.refcount);
++ if (mle->inuse > 0) {
++ if (extra_ref && rr < 3)
++ err = 1;
++ else if (!extra_ref && rr < 2)
++ err = 1;
++ } else {
++ if (extra_ref && rr < 2)
++ err = 1;
++ else if (!extra_ref && rr < 1)
++ err = 1;
++ }
++ if (err) {
++ mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
++ "that will mess up this node, refs=%d, extra=%d, "
++ "inuse=%d\n", dlm->name, namelen, name,
++ assert->node_idx, rr, extra_ref, mle->inuse);
++ dlm_print_one_mle(mle);
++ }
++ list_del_init(&mle->list);
++ __dlm_mle_detach_hb_events(dlm, mle);
++ __dlm_put_mle(mle);
+ if (extra_ref) {
+ /* the assert master message now balances the extra
+ * ref given by the master / migration request message.
+ * if this is the last put, it will be removed
+ * from the list. */
+- dlm_put_mle(mle);
++ __dlm_put_mle(mle);
++ }
++ spin_unlock(&dlm->master_lock);
++ spin_unlock(&dlm->spinlock);
++ } else if (res) {
++ if (res->owner != assert->node_idx) {
++ mlog(ML_NOTICE, "assert_master from %u, but current "
++ "owner is %u (%.*s), no mle\n", assert->node_idx,
++ res->owner, namelen, name);
+ }
+ }
+
+@@ -2110,7 +2224,7 @@ fail:
+ * take both dlm->spinlock and dlm->master_lock */
+ spin_lock(&dlm->spinlock);
+ spin_lock(&dlm->master_lock);
+- dlm_get_mle(mle);
++ dlm_get_mle_inuse(mle);
+ spin_unlock(&dlm->master_lock);
+ spin_unlock(&dlm->spinlock);
+
+@@ -2127,7 +2241,10 @@ fail:
+ /* migration failed, detach and clean up mle */
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+- dlm_put_mle(mle);
++ dlm_put_mle_inuse(mle);
++ spin_lock(&res->spinlock);
++ res->state &= ~DLM_LOCK_RES_MIGRATING;
++ spin_unlock(&res->spinlock);
+ goto leave;
+ }
+
+@@ -2157,8 +2274,8 @@ fail:
+ /* avoid hang during shutdown when migrating lockres
+ * to a node which also goes down */
+ if (dlm_is_node_dead(dlm, target)) {
+- mlog(0, "%s:%.*s: expected migration target %u "
+- "is no longer up. restarting.\n",
++ mlog(ML_NOTICE, "%s:%.*s: expected migration "
++ "target %u is no longer up, restarting\n",
+ dlm->name, res->lockname.len,
+ res->lockname.name, target);
+ ret = -ERESTARTSYS;
+@@ -2168,7 +2285,10 @@ fail:
+ /* migration failed, detach and clean up mle */
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+- dlm_put_mle(mle);
++ dlm_put_mle_inuse(mle);
++ spin_lock(&res->spinlock);
++ res->state &= ~DLM_LOCK_RES_MIGRATING;
++ spin_unlock(&res->spinlock);
+ goto leave;
+ }
+ /* TODO: if node died: stop, clean up, return error */
+@@ -2184,7 +2304,7 @@ fail:
+
+ /* master is known, detach if not already detached */
+ dlm_mle_detach_hb_events(dlm, mle);
+- dlm_put_mle(mle);
++ dlm_put_mle_inuse(mle);
+ ret = 0;
+
+ dlm_lockres_calc_usage(dlm, res);
+@@ -2571,6 +2691,7 @@ static int dlm_add_migration_mle(struct
+ /* remove it from the list so that only one
+ * mle will be found */
+ list_del_init(&tmp->list);
++ __dlm_mle_detach_hb_events(dlm, mle);
+ }
+ spin_unlock(&tmp->spinlock);
+ }
+@@ -2664,14 +2785,15 @@ top:
+
+ /* remove from the list early. NOTE: unlinking
+ * list_head while in list_for_each_safe */
++ __dlm_mle_detach_hb_events(dlm, mle);
+ spin_lock(&mle->spinlock);
+ list_del_init(&mle->list);
+ atomic_set(&mle->woken, 1);
+ spin_unlock(&mle->spinlock);
+ wake_up(&mle->wq);
+
+- mlog(0, "node %u died during migration from "
+- "%u to %u!\n", dead_node,
++ mlog(ML_NOTICE, "%s: node %u died during migration from "
++ "%u to %u!\n", dlm->name, dead_node,
+ mle->master, mle->new_master);
+ /* if there is a lockres associated with this
+ * mle, find it and set its owner to UNKNOWN */
+Index: fs/ocfs2/dlm/dlmunlock.c
+===================================================================
+--- fs/ocfs2/dlm/dlmunlock.c (revision 2787)
++++ fs/ocfs2/dlm/dlmunlock.c (working copy)
+@@ -318,6 +318,16 @@ static enum dlm_status dlm_send_remote_u
+ size_t veclen = 1;
+
+ mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
++
++ if (owner == dlm->node_num) {
++ /* ended up trying to contact ourself. this means
++ * that the lockres had been remote but became local
++ * via a migration. just retry it, now as local */
++ mlog(0, "%s:%.*s: this node became the master due to a "
++ "migration, re-evaluate now\n", dlm->name,
++ res->lockname.len, res->lockname.name);
++ return DLM_FORWARD;
++ }
+
+ memset(&unlock, 0, sizeof(unlock));
+ unlock.node_idx = dlm->node_num;
+Index: fs/ocfs2/dlm/dlmcommon.h
+===================================================================
+--- fs/ocfs2/dlm/dlmcommon.h (revision 2787)
++++ fs/ocfs2/dlm/dlmcommon.h (working copy)
+@@ -300,6 +300,15 @@ enum dlm_lockres_list {
+ DLM_BLOCKED_LIST
+ };
+
++static inline int dlm_lvb_is_empty(char *lvb)
++{
++ int i;
++ for (i=0; i<DLM_LVB_LEN; i++)
++ if (lvb[i])
++ return 0;
++ return 1;
++}
++
+ static inline struct list_head *
+ dlm_list_idx_to_ptr(struct dlm_lock_resource *res, enum dlm_lockres_list idx)
+ {
+Index: fs/ocfs2/dlm/dlmrecovery.c
+===================================================================
+--- fs/ocfs2/dlm/dlmrecovery.c (revision 2787)
++++ fs/ocfs2/dlm/dlmrecovery.c (working copy)
+@@ -115,12 +115,31 @@ static u64 dlm_get_next_mig_cookie(void)
+ return c;
+ }
+
++static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
++ u8 dead_node)
++{
++ assert_spin_locked(&dlm->spinlock);
++ if (dlm->reco.dead_node != dead_node)
++ mlog(ML_NOTICE, "%s: changing dead_node from %u to %u\n",
++ dlm->name, dlm->reco.dead_node, dead_node);
++ dlm->reco.dead_node = dead_node;
++}
++
++static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
++ u8 master)
++{
++ assert_spin_locked(&dlm->spinlock);
++ mlog(ML_NOTICE, "%s: changing new_master from %u to %u\n",
++ dlm->name, dlm->reco.new_master, master);
++ dlm->reco.new_master = master;
++}
++
+ static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
+ {
+ spin_lock(&dlm->spinlock);
+ clear_bit(dlm->reco.dead_node, dlm->recovery_map);
+- dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
+- dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
++ dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
++ dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
+ spin_unlock(&dlm->spinlock);
+ }
+
+@@ -220,6 +239,53 @@ void dlm_complete_recovery_thread(struct
+ *
+ */
+
++static void dlm_print_reco_junk(struct dlm_ctxt *dlm)
++{
++ struct dlm_reco_node_data *ndata;
++ struct dlm_lock_resource *res;
++
++ mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, "
++ "dead=%u, master=%u\n", dlm->name,
++ dlm->dlm_reco_thread_task->pid,
++ dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive",
++ dlm->reco.dead_node, dlm->reco.new_master);
++
++ list_for_each_entry(ndata, &dlm->reco.node_data, list) {
++ char *st = "unknown";
++ switch (ndata->state) {
++ case DLM_RECO_NODE_DATA_INIT:
++ st = "init";
++ break;
++ case DLM_RECO_NODE_DATA_REQUESTING:
++ st = "requesting";
++ break;
++ case DLM_RECO_NODE_DATA_DEAD:
++ st = "dead";
++ break;
++ case DLM_RECO_NODE_DATA_RECEIVING:
++ st = "receiving";
++ break;
++ case DLM_RECO_NODE_DATA_REQUESTED:
++ st = "requested";
++ break;
++ case DLM_RECO_NODE_DATA_DONE:
++ st = "done";
++ break;
++ case DLM_RECO_NODE_DATA_FINALIZE_SENT:
++ st = "finalize-sent";
++ break;
++ default:
++ st = "bad";
++ break;
++ }
++ mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n",
++ dlm->name, ndata->node_num, st);
++ }
++ list_for_each_entry(res, &dlm->reco.resources, recovering) {
++ mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n",
++ dlm->name, res->lockname.len, res->lockname.name);
++ }
++}
+
+ #define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000)
+
+@@ -267,7 +333,7 @@ int dlm_is_node_dead(struct dlm_ctxt *dl
+ {
+ int dead;
+ spin_lock(&dlm->spinlock);
+- dead = test_bit(node, dlm->domain_map);
++ dead = !test_bit(node, dlm->domain_map);
+ spin_unlock(&dlm->spinlock);
+ return dead;
+ }
+@@ -308,7 +374,28 @@ static int dlm_in_recovery(struct dlm_ct
+
+ void dlm_wait_for_recovery(struct dlm_ctxt *dlm)
+ {
+- wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
++ if (dlm_in_recovery(dlm)) {
++ mlog(ML_NOTICE, "%s: reco thread %d in recovery: "
++ "state=%d, master=%u, dead=%u\n",
++ dlm->name, dlm->dlm_reco_thread_task->pid,
++ dlm->reco.state, dlm->reco.new_master,
++ dlm->reco.dead_node);
++ dlm_print_reco_junk(dlm);
++ }
++
++ while (1) {
++ if (wait_event_timeout(dlm->reco.event,
++ !dlm_in_recovery(dlm),
++ msecs_to_jiffies(5000)))
++ break;
++ mlog(ML_NOTICE, "%s: reco thread %d still in recovery: "
++ "state=%d, master=%u, dead=%u\n",
++ dlm->name, dlm->dlm_reco_thread_task->pid,
++ dlm->reco.state, dlm->reco.new_master,
++ dlm->reco.dead_node);
++ dlm_print_reco_junk(dlm);
++ }
++ // wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
+ }
+
+ static void dlm_begin_recovery(struct dlm_ctxt *dlm)
+@@ -341,7 +428,7 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+ mlog(0, "new master %u died while recovering %u!\n",
+ dlm->reco.new_master, dlm->reco.dead_node);
+ /* unset the new_master, leave dead_node */
+- dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
++ dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
+ }
+
+ /* select a target to recover */
+@@ -350,14 +437,14 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+
+ bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0);
+ if (bit >= O2NM_MAX_NODES || bit < 0)
+- dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
++ dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
+ else
+- dlm->reco.dead_node = bit;
++ dlm_set_reco_dead_node(dlm, bit);
+ } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
+ /* BUG? */
+ mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",
+ dlm->reco.dead_node);
+- dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
++ dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
+ }
+
+ if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
+@@ -366,7 +453,8 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+ /* return to main thread loop and sleep. */
+ return 0;
+ }
+- mlog(0, "recovery thread found node %u in the recovery map!\n",
++ mlog(ML_NOTICE, "%s(%d):recovery thread found node %u in the recovery map!\n",
++ dlm->name, dlm->dlm_reco_thread_task->pid,
+ dlm->reco.dead_node);
+ spin_unlock(&dlm->spinlock);
+
+@@ -389,8 +477,8 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+ }
+ mlog(0, "another node will master this recovery session.\n");
+ }
+- mlog(0, "dlm=%s, new_master=%u, this node=%u, dead_node=%u\n",
+- dlm->name, dlm->reco.new_master,
++ mlog(ML_NOTICE, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n",
++ dlm->name, dlm->dlm_reco_thread_task->pid, dlm->reco.new_master,
+ dlm->node_num, dlm->reco.dead_node);
+
+ /* it is safe to start everything back up here
+@@ -402,7 +490,8 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+ return 0;
+
+ master_here:
+- mlog(0, "mastering recovery of %s:%u here(this=%u)!\n",
++ mlog(ML_NOTICE, "(%d) mastering recovery of %s:%u here(this=%u)!\n",
++ dlm->dlm_reco_thread_task->pid,
+ dlm->name, dlm->reco.dead_node, dlm->node_num);
+
+ status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
+@@ -414,7 +503,7 @@ master_here:
+ msleep(100);
+ } else {
+ /* success! see if any other nodes need recovery */
+- mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n",
++ mlog(ML_NOTICE, "DONE mastering recovery of %s:%u here(this=%u)!\n",
+ dlm->name, dlm->reco.dead_node, dlm->node_num);
+ dlm_reset_recovery(dlm);
+ }
+@@ -544,11 +633,19 @@ static int dlm_remaster_locks(struct dlm
+ goto leave;
+ case DLM_RECO_NODE_DATA_RECEIVING:
+ case DLM_RECO_NODE_DATA_REQUESTED:
++ mlog(0, "%s: node %u still in state %s\n",
++ dlm->name, ndata->node_num,
++ ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?
++ "receiving" : "requested");
+ all_nodes_done = 0;
+ break;
+ case DLM_RECO_NODE_DATA_DONE:
++ mlog(0, "%s: node %u state is done\n",
++ dlm->name, ndata->node_num);
+ break;
+ case DLM_RECO_NODE_DATA_FINALIZE_SENT:
++ mlog(0, "%s: node %u state is finalize\n",
++ dlm->name, ndata->node_num);
+ break;
+ }
+ }
+@@ -573,7 +670,7 @@ static int dlm_remaster_locks(struct dlm
+ spin_unlock(&dlm->spinlock);
+ mlog(0, "should be done with recovery!\n");
+
+- mlog(0, "finishing recovery of %s at %lu, "
++ mlog(ML_NOTICE, "finishing recovery of %s at %lu, "
+ "dead=%u, this=%u, new=%u\n", dlm->name,
+ jiffies, dlm->reco.dead_node,
+ dlm->node_num, dlm->reco.new_master);
+@@ -690,6 +787,15 @@ int dlm_request_all_locks_handler(struct
+ if (!dlm_grab(dlm))
+ return -EINVAL;
+
++ if (lr->dead_node != dlm->reco.dead_node) {
++ mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
++ "dead_node is %u\n", dlm->name, lr->node_idx,
++ lr->dead_node, dlm->reco.dead_node);
++ dlm_print_reco_junk(dlm);
++ /* this is a hack */
++ dlm_put(dlm);
++ return -ENOMEM;
++ }
+ BUG_ON(lr->dead_node != dlm->reco.dead_node);
+
+ item = kcalloc(1, sizeof(*item), GFP_KERNEL);
+@@ -729,12 +835,16 @@ static void dlm_request_all_locks_worker
+ struct list_head *iter;
+ int ret;
+ u8 dead_node, reco_master;
++ int skip_all_done = 0;
+
+ dlm = item->dlm;
+ dead_node = item->u.ral.dead_node;
+ reco_master = item->u.ral.reco_master;
+ mres = (struct dlm_migratable_lockres *)data;
+
++ mlog(ML_NOTICE, "%s: recovery worker started, dead=%u, master=%u\n",
++ dlm->name, dead_node, reco_master);
++
+ if (dead_node != dlm->reco.dead_node ||
+ reco_master != dlm->reco.new_master) {
+ /* show extra debug info if the recovery state is messed */
+@@ -765,12 +875,21 @@ static void dlm_request_all_locks_worker
+ dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
+
+ /* now we can begin blasting lockreses without the dlm lock */
++
++ /* any errors returned will be due to the new_master dying,
++ * the dlm_reco_thread should detect this */
+ list_for_each(iter, &resources) {
+ res = list_entry (iter, struct dlm_lock_resource, recovering);
+ ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
+ DLM_MRES_RECOVERY);
+- if (ret < 0)
++ if (ret < 0) {
+ mlog_errno(ret);
++ mlog(ML_ERROR, "%s: node %u went down while sending "
++ "recovery state for dead node %u\n", dlm->name,
++ reco_master, dead_node);
++ skip_all_done = 1;
++ break;
++ }
+ }
+
+ /* move the resources back to the list */
+@@ -778,9 +897,15 @@ static void dlm_request_all_locks_worker
+ list_splice_init(&resources, &dlm->reco.resources);
+ spin_unlock(&dlm->spinlock);
+
+- ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
+- if (ret < 0)
+- mlog_errno(ret);
++ if (!skip_all_done) {
++ ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
++ if (ret < 0) {
++ mlog_errno(ret);
++ mlog(ML_ERROR, "%s: node %u went down while sending "
++ "recovery all-done for dead node %u\n", dlm->name,
++ reco_master, dead_node);
++ }
++ }
+
+ free_page((unsigned long)data);
+ }
+@@ -794,14 +919,20 @@ static int dlm_send_all_done_msg(struct
+ memset(&done_msg, 0, sizeof(done_msg));
+ done_msg.node_idx = dlm->node_num;
+ done_msg.dead_node = dead_node;
+- mlog(0, "sending DATA DONE message to %u, "
++ mlog(ML_NOTICE, "sending DATA DONE message to %u, "
+ "my node=%u, dead node=%u\n", send_to, done_msg.node_idx,
+ done_msg.dead_node);
+
+ ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
+ sizeof(done_msg), send_to, &tmpret);
+- /* negative status is ignored by the caller */
+- if (ret >= 0)
++ if (ret < 0) {
++ if (!dlm_is_host_down(ret)) {
++ mlog_errno(ret);
++ mlog(ML_ERROR, "%s: unknown error sending data-done "
++ "to %u\n", dlm->name, send_to);
++ BUG();
++ }
++ } else
+ ret = tmpret;
+ return ret;
+ }
+@@ -821,6 +952,11 @@ int dlm_reco_data_done_handler(struct o2
+ mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
+ "node_idx=%u, this node=%u\n", done->dead_node,
+ dlm->reco.dead_node, done->node_idx, dlm->node_num);
++ if (done->dead_node != dlm->reco.dead_node) {
++ mlog(ML_ERROR, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
++ "node_idx=%u, this node=%u\n", done->dead_node,
++ dlm->reco.dead_node, done->node_idx, dlm->node_num);
++ }
+ BUG_ON(done->dead_node != dlm->reco.dead_node);
+
+ spin_lock(&dlm_reco_state_lock);
+@@ -1022,8 +1158,9 @@ static int dlm_add_lock_to_array(struct
+ ml->type == LKM_PRMODE) {
+ /* if it is already set, this had better be a PR
+ * and it has to match */
+- if (mres->lvb[0] && (ml->type == LKM_EXMODE ||
+- memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
++ if (!dlm_lvb_is_empty(mres->lvb) &&
++ (ml->type == LKM_EXMODE ||
++ memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
+ mlog(ML_ERROR, "mismatched lvbs!\n");
+ __dlm_print_one_lock_resource(lock->lockres);
+ BUG();
+@@ -1082,22 +1219,25 @@ int dlm_send_one_lockres(struct dlm_ctxt
+ * we must send it immediately. */
+ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
+ res, total_locks);
+- if (ret < 0) {
+- // TODO
+- mlog(ML_ERROR, "dlm_send_mig_lockres_msg "
+- "returned %d, TODO\n", ret);
+- BUG();
+- }
++ if (ret < 0)
++ goto error;
+ }
+ }
+ /* flush any remaining locks */
+ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
+- if (ret < 0) {
+- // TODO
+- mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, "
+- "TODO\n", ret);
++ if (ret < 0)
++ goto error;
++ return ret;
++
++error:
++ mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",
++ dlm->name, ret);
++ if (!dlm_is_host_down(ret))
+ BUG();
+- }
++ mlog(ML_NOTICE, "%s: node %u went down while sending %s "
++ "lockres %.*s\n", dlm->name, send_to,
++ flags & DLM_MRES_RECOVERY ? "recovery" : "migration",
++ res->lockname.len, res->lockname.name);
+ return ret;
+ }
+
+@@ -1446,6 +1586,7 @@ dlm_list_num_to_pointer(struct dlm_lock_
+ ret += list_num;
+ return ret;
+ }
++
+ /* TODO: do ast flush business
+ * TODO: do MIGRATING and RECOVERING spinning
+ */
+@@ -1482,7 +1623,7 @@ static int dlm_process_recovery_data(str
+ struct dlm_lock *newlock = NULL;
+ struct dlm_lockstatus *lksb = NULL;
+ int ret = 0;
+- int i;
++ int i, bad;
+ struct list_head *iter;
+ struct dlm_lock *lock = NULL;
+
+@@ -1553,7 +1694,7 @@ static int dlm_process_recovery_data(str
+ lksb->flags |= (ml->flags &
+ (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
+
+- if (mres->lvb[0]) {
++ if (!dlm_lvb_is_empty(mres->lvb)) {
+ if (lksb->flags & DLM_LKSB_PUT_LVB) {
+ /* other node was trying to update
+ * lvb when node died. recreate the
+@@ -1564,8 +1705,9 @@ static int dlm_process_recovery_data(str
+ * most recent valid lvb info */
+ BUG_ON(ml->type != LKM_EXMODE &&
+ ml->type != LKM_PRMODE);
+- if (res->lvb[0] && (ml->type == LKM_EXMODE ||
+- memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
++ if (!dlm_lvb_is_empty(res->lvb) &&
++ (ml->type == LKM_EXMODE ||
++ memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+ mlog(ML_ERROR, "received bad lvb!\n");
+ __dlm_print_one_lock_resource(res);
+ BUG();
+@@ -1591,9 +1733,33 @@ static int dlm_process_recovery_data(str
+ * relative to each other, but clearly *not*
+ * preserved relative to locks from other nodes.
+ */
++ bad = 0;
+ spin_lock(&res->spinlock);
+- dlm_lock_get(newlock);
+- list_add_tail(&newlock->list, queue);
++ list_for_each_entry(lock, queue, list) {
++ if (lock->ml.cookie == ml->cookie) {
++ u64 c = lock->ml.cookie;
++ mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
++ "exists on this lockres!\n", dlm->name,
++ res->lockname.len, res->lockname.name,
++ dlm_get_lock_cookie_node(c),
++ dlm_get_lock_cookie_seq(c));
++
++ mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
++ "node=%u, cookie=%u:%llu, queue=%d\n",
++ ml->type, ml->convert_type, ml->node,
++ dlm_get_lock_cookie_node(ml->cookie),
++ dlm_get_lock_cookie_seq(ml->cookie),
++ ml->list);
++
++ __dlm_print_one_lock_resource(res);
++ bad = 1;
++ break;
++ }
++ }
++ if (!bad) {
++ dlm_lock_get(newlock);
++ list_add_tail(&newlock->list, queue);
++ }
+ spin_unlock(&res->spinlock);
+ }
+ mlog(0, "done running all the locks\n");
+@@ -2048,7 +2214,7 @@ int dlm_pick_recovery_master(struct dlm_
+ struct dlm_lockstatus lksb;
+ int status = -EINVAL;
+
+- mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
++ mlog(ML_NOTICE, "starting recovery of %s at %lu, dead=%u, this=%u\n",
+ dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
+ again:
+ memset(&lksb, 0, sizeof(lksb));
+@@ -2056,17 +2222,17 @@ again:
+ ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
+ DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
+
+- mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
++ mlog(ML_NOTICE, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
+ dlm->name, ret, lksb.status);
+
+ if (ret == DLM_NORMAL) {
+- mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
++ mlog(ML_NOTICE, "dlm=%s dlmlock says I got it (this=%u)\n",
+ dlm->name, dlm->node_num);
+
+ /* got the EX lock. check to see if another node
+ * just became the reco master */
+ if (dlm_reco_master_ready(dlm)) {
+- mlog(0, "%s: got reco EX lock, but %u will "
++ mlog(ML_NOTICE, "%s: got reco EX lock, but %u will "
+ "do the recovery\n", dlm->name,
+ dlm->reco.new_master);
+ status = -EEXIST;
+@@ -2077,7 +2243,7 @@ again:
+ spin_lock(&dlm->spinlock);
+ if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
+ status = -EINVAL;
+- mlog(0, "%s: got reco EX lock, but "
++ mlog(ML_NOTICE, "%s: got reco EX lock, but "
+ "node got recovered already\n", dlm->name);
+ if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
+ mlog(ML_ERROR, "%s: new master is %u "
+@@ -2092,7 +2258,7 @@ again:
+ /* if this node has actually become the recovery master,
+ * set the master and send the messages to begin recovery */
+ if (!status) {
+- mlog(0, "%s: dead=%u, this=%u, sending "
++ mlog(ML_NOTICE, "%s: dead=%u, this=%u, sending "
+ "begin_reco now\n", dlm->name,
+ dlm->reco.dead_node, dlm->node_num);
+ status = dlm_send_begin_reco_message(dlm,
+@@ -2102,7 +2268,7 @@ again:
+
+ /* set the new_master to this node */
+ spin_lock(&dlm->spinlock);
+- dlm->reco.new_master = dlm->node_num;
++ dlm_set_reco_master(dlm, dlm->node_num);
+ spin_unlock(&dlm->spinlock);
+ }
+
+@@ -2123,7 +2289,7 @@ again:
+ mlog(ML_ERROR, "dlmunlock returned %d\n", ret);
+ }
+ } else if (ret == DLM_NOTQUEUED) {
+- mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
++ mlog(ML_NOTICE, "dlm=%s dlmlock says another node got it (this=%u)\n",
+ dlm->name, dlm->node_num);
+ /* another node is master. wait on
+ * reco.new_master != O2NM_INVALID_NODE_NUM
+@@ -2132,12 +2298,12 @@ again:
+ dlm_reco_master_ready(dlm),
+ msecs_to_jiffies(1000));
+ if (!dlm_reco_master_ready(dlm)) {
+- mlog(0, "%s: reco master taking awhile\n",
++ mlog(ML_NOTICE, "%s: reco master taking awhile\n",
+ dlm->name);
+ goto again;
+ }
+ /* another node has informed this one that it is reco master */
+- mlog(0, "%s: reco master %u is ready to recover %u\n",
++ mlog(ML_NOTICE, "%s: reco master %u is ready to recover %u\n",
+ dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
+ status = -EEXIST;
+ } else {
+@@ -2171,7 +2337,7 @@ static int dlm_send_begin_reco_message(s
+
+ mlog_entry("%u\n", dead_node);
+
+- mlog(0, "dead node is %u\n", dead_node);
++ mlog(ML_NOTICE, "%s: dead node is %u\n", dlm->name, dead_node);
+
+ spin_lock(&dlm->spinlock);
+ dlm_node_iter_init(dlm->domain_map, &iter);
+@@ -2244,8 +2410,9 @@ int dlm_begin_reco_handler(struct o2net_
+ if (!dlm_grab(dlm))
+ return 0;
+
+- mlog(0, "node %u wants to recover node %u\n",
+- br->node_idx, br->dead_node);
++ mlog(ML_NOTICE, "%s: node %u wants to recover node %u (%u:%u)\n",
++ dlm->name, br->node_idx, br->dead_node,
++ dlm->reco.dead_node, dlm->reco.new_master);
+
+ dlm_fire_domain_eviction_callbacks(dlm, br->dead_node);
+
+@@ -2267,8 +2434,8 @@ int dlm_begin_reco_handler(struct o2net_
+ "node %u changing it to %u\n", dlm->name,
+ dlm->reco.dead_node, br->node_idx, br->dead_node);
+ }
+- dlm->reco.new_master = br->node_idx;
+- dlm->reco.dead_node = br->dead_node;
++ dlm_set_reco_master(dlm, br->node_idx);
++ dlm_set_reco_dead_node(dlm, br->dead_node);
+ if (!test_bit(br->dead_node, dlm->recovery_map)) {
+ mlog(0, "recovery master %u sees %u as dead, but this "
+ "node has not yet. marking %u as dead\n",
+@@ -2287,6 +2454,11 @@ int dlm_begin_reco_handler(struct o2net_
+ spin_unlock(&dlm->spinlock);
+
+ dlm_kick_recovery_thread(dlm);
++
++ mlog(ML_NOTICE, "%s: recovery started by node %u, for %u (%u:%u)\n",
++ dlm->name, br->node_idx, br->dead_node,
++ dlm->reco.dead_node, dlm->reco.new_master);
++
+ dlm_put(dlm);
+ return 0;
+ }
+@@ -2299,7 +2471,7 @@ static int dlm_send_finalize_reco_messag
+ int nodenum;
+ int status;
+
+- mlog(0, "finishing recovery for node %s:%u\n",
++ mlog(ML_NOTICE, "finishing recovery for node %s:%u\n",
+ dlm->name, dlm->reco.dead_node);
+
+ spin_lock(&dlm->spinlock);
+@@ -2344,8 +2516,9 @@ int dlm_finalize_reco_handler(struct o2n
+ if (!dlm_grab(dlm))
+ return 0;
+
+- mlog(0, "node %u finalizing recovery of node %u\n",
+- fr->node_idx, fr->dead_node);
++ mlog(ML_NOTICE, "%s: node %u finalizing recovery of node %u (%u:%u)\n",
++ dlm->name, fr->node_idx, fr->dead_node,
++ dlm->reco.dead_node, dlm->reco.new_master);
+
+ spin_lock(&dlm->spinlock);
+
+@@ -2369,6 +2542,9 @@ int dlm_finalize_reco_handler(struct o2n
+ dlm_reset_recovery(dlm);
+
+ dlm_kick_recovery_thread(dlm);
++ mlog(ML_NOTICE, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
++ dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
++
+ dlm_put(dlm);
+ return 0;
+ }
Added: branches/ocfs2-1.2-cert/patches/mar24-create-lock-handler.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/mar24-create-lock-handler.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/mar24-create-lock-handler.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,39 @@
+Index: fs/ocfs2/dlm/dlmlock.c
+===================================================================
+--- fs/ocfs2/dlm/dlmlock.c.orig 2006-03-16 18:17:21.358926000 -0800
++++ fs/ocfs2/dlm/dlmlock.c 2006-03-24 15:20:26.372564000 -0800
+@@ -280,6 +280,14 @@ static enum dlm_status dlm_send_remote_l
+ if (tmpret >= 0) {
+ // successfully sent and received
+ ret = status; // this is already a dlm_status
++ if (ret == DLM_RECOVERING) {
++ mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres "
++ "no longer owned by %u. that node is coming back "
++ "up currently.\n", dlm->name, create.namelen,
++ create.name, res->owner);
++ dlm_print_one_lock_resource(res);
++ BUG();
++ }
+ } else {
+ mlog_errno(tmpret);
+ if (dlm_is_host_down(tmpret)) {
+@@ -428,11 +436,16 @@ int dlm_create_lock_handler(struct o2net
+ if (!dlm_grab(dlm))
+ return DLM_REJECTED;
+
+- mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
+- "Domain %s not fully joined!\n", dlm->name);
+-
+ name = create->name;
+ namelen = create->namelen;
++ status = DLM_RECOVERING;
++ if (!dlm_domain_fully_joined(dlm)) {
++ mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
++ "sending a create_lock message for lock %.*s!\n",
++ dlm->name, create->node_idx, namelen, name);
++ dlm_error(status);
++ goto leave;
++ }
+
+ status = DLM_IVBUFLEN;
+ if (namelen > DLM_LOCKID_NAME_MAX) {
Added: branches/ocfs2-1.2-cert/patches/mastery-restart-recovery.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/mastery-restart-recovery.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/mastery-restart-recovery.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,120 @@
+Index: fs/ocfs2/dlm/dlmmaster.c
+===================================================================
+--- fs/ocfs2/dlm/dlmmaster.c.orig 2006-03-27 14:23:30.045812000 -0800
++++ fs/ocfs2/dlm/dlmmaster.c 2006-03-27 14:23:30.283574000 -0800
+@@ -856,6 +856,7 @@ lookup:
+ spin_unlock(&dlm->master_lock);
+ spin_unlock(&dlm->spinlock);
+
++redo_request:
+ while (wait_on_recovery) {
+ /* any cluster changes that occurred after dropping the
+ * dlm spinlock would be detectable be a change on the mle,
+@@ -893,7 +894,6 @@ lookup:
+ if (blocked)
+ goto wait;
+
+-redo_request:
+ ret = -EINVAL;
+ dlm_node_iter_init(mle->vote_map, &iter);
+ while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+@@ -918,7 +918,8 @@ wait:
+ /* keep going until the response map includes all nodes */
+ ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
+ if (ret < 0) {
+- mlog(0, "%s:%.*s: node map changed, redo the "
++ wait_on_recovery = 1;
++ mlog(ML_NOTICE, "%s:%.*s: node map changed, redo the "
+ "master request now, blocked=%d\n",
+ dlm->name, res->lockname.len,
+ res->lockname.name, blocked);
+@@ -1199,7 +1200,60 @@ static int dlm_restart_lock_mastery(stru
+ set_bit(node, mle->vote_map);
+ } else {
+ mlog(ML_ERROR, "node down! %d\n", node);
++ if (blocked) {
++ int lowest = find_next_bit(mle->maybe_map,
++ O2NM_MAX_NODES, 0);
++
++ /* act like it was never there */
++ clear_bit(node, mle->maybe_map);
+
++ if (node == lowest) {
++ mlog(ML_ERROR, "expected master %u died"
++ " while this node was blocked "
++ "waiting on it!\n", node);
++ lowest = find_next_bit(mle->maybe_map,
++ O2NM_MAX_NODES,
++ lowest+1);
++ if (lowest < O2NM_MAX_NODES) {
++ mlog(ML_NOTICE, "%s:%.*s:still "
++ "blocked. waiting on %u "
++ "now\n", dlm->name,
++ res->lockname.len,
++ res->lockname.name,
++ lowest);
++ } else {
++ /* mle is an MLE_BLOCK, but
++ * there is now nothing left to
++ * block on. we need to return
++ * all the way back out and try
++ * again with an MLE_MASTER.
++ * dlm_do_local_recovery_cleanup
++ * has already run, so the mle
++ * refcount is ok */
++ mlog(ML_NOTICE, "%s:%.*s: no "
++ "longer blocking. try to "
++ "master this here\n",
++ dlm->name,
++ res->lockname.len,
++ res->lockname.name);
++ mle->type = DLM_MLE_MASTER;
++ mle->u.res = res;
++ }
++ }
++ }
++
++ /* now blank out everything, as if we had never
++ * contacted anyone */
++ memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
++ memset(mle->response_map, 0, sizeof(mle->response_map));
++ /* reset the vote_map to the current node_map */
++ memcpy(mle->vote_map, mle->node_map,
++ sizeof(mle->node_map));
++ /* put myself into the maybe map */
++ if (mle->type != DLM_MLE_BLOCK)
++ set_bit(dlm->node_num, mle->maybe_map);
++
++#if 0
+ /* if the node wasn't involved in mastery skip it,
+ * but clear it out from the maps so that it will
+ * not affect mastery of this lockres */
+@@ -1207,7 +1261,6 @@ static int dlm_restart_lock_mastery(stru
+ clear_bit(node, mle->vote_map);
+ if (!test_bit(node, mle->maybe_map))
+ goto next;
+-
+ /* if we're already blocked on lock mastery, and the
+ * dead node wasn't the expected master, or there is
+ * another node in the maybe_map, keep waiting */
+@@ -1253,7 +1306,6 @@ static int dlm_restart_lock_mastery(stru
+ ret = -EAGAIN;
+ goto next;
+ }
+-
+ clear_bit(node, mle->maybe_map);
+ if (node > dlm->node_num)
+ goto next;
+@@ -1263,9 +1315,12 @@ static int dlm_restart_lock_mastery(stru
+ * in the vote_map, removing this node. */
+ memset(mle->response_map, 0,
+ sizeof(mle->response_map));
++#endif
+ }
+ ret = -EAGAIN;
++#if 0
+ next:
++#endif
+ node = dlm_bitmap_diff_iter_next(&bdi, &sc);
+ }
+ return ret;
Added: branches/ocfs2-1.2-cert/patches/ocfs2-disable_lvbs1.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2-disable_lvbs1.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2-disable_lvbs1.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,13 @@
+Index: fs/ocfs2/dlmglue.c
+===================================================================
+--- fs/ocfs2/dlmglue.c (revision 2786)
++++ fs/ocfs2/dlmglue.c (working copy)
+@@ -1312,6 +1312,8 @@ static inline int ocfs2_meta_lvb_is_trus
+ {
+ struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
+
++ return 0;
++
+ /* Old OCFS2 versions stored a "sequence" in the lvb to
+ * determine whether the information could be trusted. We
+ * don't want to use an lvb populated from a node running the
Added: branches/ocfs2-1.2-cert/patches/ocfs2-extend_file_more_info_on_oops.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2-extend_file_more_info_on_oops.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2-extend_file_more_info_on_oops.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,40 @@
+Index: fs/ocfs2/file.c
+===================================================================
+--- fs/ocfs2/file.c (revision 2787)
++++ fs/ocfs2/file.c (working copy)
+@@ -778,18 +778,23 @@ restart_all:
+ status = -EIO;
+ goto leave;
+ }
+- mlog_bug_on_msg(i_size_read(inode) !=
+- (le64_to_cpu(fe->i_size) - *bytes_extended),
+- "Inode %"MLFu64" i_size = %lld, dinode i_size "
+- "= %"MLFu64", bytes_extended = %"MLFu64", new_i_size "
+- "= %"MLFu64"\n", OCFS2_I(inode)->ip_blkno,
+- i_size_read(inode), le64_to_cpu(fe->i_size),
+- *bytes_extended, new_i_size);
+- mlog_bug_on_msg(new_i_size < i_size_read(inode),
+- "Inode %"MLFu64", i_size = %lld, new sz = %"MLFu64"\n",
+- OCFS2_I(inode)->ip_blkno, i_size_read(inode),
+- new_i_size);
+-
++ if (i_size_read(inode) != (le64_to_cpu(fe->i_size) - *bytes_extended)) {
++ mlog(ML_ERROR, "Inode %"MLFu64" i_size = %lld, dinode i_size "
++ "= %"MLFu64", bytes_extended = %"MLFu64", new_i_size "
++ "= %"MLFu64"\n", OCFS2_I(inode)->ip_blkno,
++ i_size_read(inode), le64_to_cpu(fe->i_size),
++ *bytes_extended, new_i_size);
++ mlog_meta_lvb(ML_ERROR, &OCFS2_I(inode)->ip_meta_lockres);
++ BUG();
++ }
++ if (new_i_size < i_size_read(inode)) {
++ mlog(ML_ERROR,
++ "Inode %"MLFu64", i_size = %lld, new sz = %"MLFu64"\n",
++ OCFS2_I(inode)->ip_blkno, i_size_read(inode),
++ new_i_size);
++ mlog_meta_lvb(ML_ERROR, &OCFS2_I(inode)->ip_meta_lockres);
++ BUG();
++ }
+ if (i_size_read(inode) == new_i_size)
+ goto leave;
+
Added: branches/ocfs2-1.2-cert/patches/ocfs2-journal_start_stop_msgs.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2-journal_start_stop_msgs.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2-journal_start_stop_msgs.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,37 @@
+Index: fs/ocfs2/journal.c
+===================================================================
+--- fs/ocfs2/journal.c (revision 2787)
++++ fs/ocfs2/journal.c (working copy)
+@@ -1029,6 +1031,8 @@ static int __ocfs2_recovery_thread(void
+ }
+
+ restart:
++
++ mlog(ML_NOTICE, "Begin recovery pass on volume %s\n", osb->uuid_str);
+ status = ocfs2_super_lock(osb, 1);
+ if (status < 0) {
+ mlog_errno(status);
+@@ -1043,6 +1047,7 @@ restart:
+ break;
+ }
+
++ mlog(ML_NOTICE, "Try to recover node %d\n", node_num);
+ status = ocfs2_recover_node(osb, node_num);
+ if (status < 0) {
+ mlog(ML_ERROR,
+@@ -1052,11 +1057,13 @@ restart:
+ mlog(ML_ERROR, "Volume requires unmount.\n");
+ continue;
+ }
+-
++ mlog(ML_NOTICE, "Remove %d from recovery map\n", node_num);
+ ocfs2_recovery_map_clear(osb, node_num);
+ }
+ ocfs2_super_unlock(osb, 1);
+
++ mlog(ML_NOTICE, "Complete recovery pass on volume %s\n", osb->uuid_str);
++
+ /* We always run recovery on our own orphan dir - the dead
+ * node(s) may have voted "no" on an inode delete earlier. A
+ * revote is therefore required. */
+
Added: branches/ocfs2-1.2-cert/patches/ocfs2-reco_nofs.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2-reco_nofs.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2-reco_nofs.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,40 @@
+Index: fs/ocfs2/journal.c
+===================================================================
+--- fs/ocfs2/journal.c (revision 2787)
++++ fs/ocfs2/journal.c (working copy)
+@@ -858,9 +858,11 @@ static int ocfs2_force_read_journal(stru
+ if (p_blocks > CONCURRENT_JOURNAL_FILL)
+ p_blocks = CONCURRENT_JOURNAL_FILL;
+
++ /* We are reading journal data which should not
++ * be put in the uptodate cache */
+ status = ocfs2_read_blocks(OCFS2_SB(inode->i_sb),
+ p_blkno, p_blocks, bhs, 0,
+- inode);
++ NULL);
+ if (status < 0) {
+ mlog_errno(status);
+ goto bail;
+Index: fs/ocfs2/uptodate.c
+===================================================================
+--- fs/ocfs2/uptodate.c (revision 2787)
++++ fs/ocfs2/uptodate.c (working copy)
+@@ -335,7 +335,7 @@ static void __ocfs2_set_buffer_uptodate(
+ mlog(0, "Inode %"MLFu64", block %llu, expand = %d\n",
+ oi->ip_blkno, (unsigned long long) block, expand_tree);
+
+- new = kmem_cache_alloc(ocfs2_uptodate_cachep, GFP_KERNEL);
++ new = kmem_cache_alloc(ocfs2_uptodate_cachep, GFP_NOFS);
+ if (!new) {
+ mlog_errno(-ENOMEM);
+ return;
+@@ -347,7 +347,7 @@ static void __ocfs2_set_buffer_uptodate(
+ * has no way of tracking that. */
+ for(i = 0; i < OCFS2_INODE_MAX_CACHE_ARRAY; i++) {
+ tree[i] = kmem_cache_alloc(ocfs2_uptodate_cachep,
+- GFP_KERNEL);
++ GFP_NOFS);
+ if (!tree[i]) {
+ mlog_errno(-ENOMEM);
+ goto out_free;
+
Added: branches/ocfs2-1.2-cert/patches/ocfs2-validate_lvb_contents1.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2-validate_lvb_contents1.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2-validate_lvb_contents1.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,62 @@
+Index: fs/ocfs2/dlmglue.c
+===================================================================
+--- fs/ocfs2/dlmglue.c (revision 2787)
++++ fs/ocfs2/dlmglue.c (working copy)
+@@ -1377,6 +1377,32 @@ static inline void ocfs2_complete_lock_r
+ mlog_exit_void();
+ }
+
++static void ocfs2_validate_lvb(struct ocfs2_lock_res *lockres,
++ struct ocfs2_dinode *di)
++{
++ struct ocfs2_meta_lvb *lvb;
++
++ lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
++ if (be32_to_cpu(lvb->lvb_iclusters) != le32_to_cpu(di->i_clusters)
++ || be64_to_cpu(lvb->lvb_isize) != le64_to_cpu(di->i_size)
++ || be32_to_cpu(lvb->lvb_iuid) != le32_to_cpu(di->i_uid)
++ || be32_to_cpu(lvb->lvb_igid) != le32_to_cpu(di->i_gid)
++ || be16_to_cpu(lvb->lvb_imode) != le16_to_cpu(di->i_mode)
++ || be16_to_cpu(lvb->lvb_inlink) != le16_to_cpu(di->i_links_count)) {
++ mlog(ML_ERROR, "LVB and disk information for inode %llu don't "
++ "match!\n", (unsigned long long)le64_to_cpu(di->i_blkno));
++ mlog_meta_lvb(ML_ERROR, lockres);
++ mlog(ML_ERROR, "Dinode info: clusters %u, size %llu, uid %u, "
++ "gid %u, mode %u, links count %u\n",
++ le32_to_cpu(di->i_clusters),
++ (unsigned long long)le64_to_cpu(di->i_size),
++ le32_to_cpu(di->i_uid), le32_to_cpu(di->i_gid),
++ le16_to_cpu(di->i_mode), le16_to_cpu(di->i_links_count));
++ mlog(ML_ERROR, "Lockres flags: 0x%lx level %d ro %u ex %u requested %u blocking %u\n", lockres->l_flags, lockres->l_level, lockres->l_ro_holders ,lockres->l_ex_holders,lockres->l_requested,lockres->l_blocking);
++ BUG();
++ }
++}
++
+ /* may or may not return a bh if it went to disk. */
+ static int ocfs2_meta_lock_update(struct inode *inode,
+ struct buffer_head **bh)
+@@ -1412,7 +1438,8 @@ static int ocfs2_meta_lock_update(struct
+ * map (directories, bitmap files, etc) */
+ ocfs2_extent_map_trunc(inode, 0);
+
+- if (ocfs2_meta_lvb_is_trustable(lockres)) {
++// if (ocfs2_meta_lvb_is_trustable(lockres)) {
++ if (0) {
+ mlog(0, "Trusting LVB on inode %"MLFu64"\n",
+ oi->ip_blkno);
+ ocfs2_refresh_inode_from_lvb(inode);
+@@ -1453,7 +1480,13 @@ static int ocfs2_meta_lock_update(struct
+ le64_to_cpu(fe->i_dtime),
+ le32_to_cpu(fe->i_flags));
+
+- ocfs2_refresh_inode(inode, fe);
++ if (ocfs2_meta_lvb_is_trustable(lockres)) {
++ /* Refresh from lvb to maintain the same behavior */
++ ocfs2_validate_lvb(lockres, fe);
++ ocfs2_refresh_inode_from_lvb(inode);
++ } else {
++ ocfs2_refresh_inode(inode, fe);
++ }
+ }
+
+ #ifdef OCFS2_DELETE_INODE_WORKAROUND
Added: branches/ocfs2-1.2-cert/patches/ocfs2_dlm-do_lvb_puts_inline2.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/ocfs2_dlm-do_lvb_puts_inline2.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/ocfs2_dlm-do_lvb_puts_inline2.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,40 @@
+Index: fs/ocfs2/dlm/dlmconvert.c
+===================================================================
+--- fs/ocfs2/dlm/dlmconvert.c (revision 2787)
++++ fs/ocfs2/dlm/dlmconvert.c (working copy)
+@@ -214,6 +214,9 @@ grant:
+ if (lock->ml.node == dlm->node_num)
+ mlog(0, "doing in-place convert for nonlocal lock\n");
+ lock->ml.type = type;
++ if (lock->lksb->flags & DLM_LKSB_PUT_LVB)
++ memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN);
++
+ status = DLM_NORMAL;
+ *call_ast = 1;
+ goto unlock_exit;
+Index: fs/ocfs2/dlm/dlmast.c
+===================================================================
+--- fs/ocfs2/dlm/dlmast.c (revision 2787)
++++ fs/ocfs2/dlm/dlmast.c (working copy)
+@@ -197,12 +197,20 @@ static void dlm_update_lvb(struct dlm_ct
+ lock->ml.node == dlm->node_num ? "master" :
+ "remote");
+ memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN);
+- } else if (lksb->flags & DLM_LKSB_PUT_LVB) {
++ }
++ /* Do nothing for lvb put requests - they should be
++ * done in place when the lock is downconverted -
++ * otherwise we risk racing gets and puts which could
++ * result in old lvb data being propagated. We leave
++ * the flag set however ... */
++#if 0
++ else if (lksb->flags & DLM_LKSB_PUT_LVB) {
+ mlog(0, "setting lvb from lockres for %s node\n",
+ lock->ml.node == dlm->node_num ? "master" :
+ "remote");
+ memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
+ }
++#endif
+ spin_unlock(&res->spinlock);
+ }
+
Added: branches/ocfs2-1.2-cert/patches/series
===================================================================
--- branches/ocfs2-1.2-cert/patches/series 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/series 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,16 @@
+em-silence-eexist.patch -p0
+mar20-full-3.patch -p0
+ocfs2-extend_file_more_info_on_oops.patch -p0
+ocfs2-journal_start_stop_msgs.patch -p0
+ocfs2-reco_nofs.patch -p0
+ocfs2_dlm-do_lvb_puts_inline2.patch -p0
+lockres-release-info.patch
+debug-mastery.patch
+hold-recovery-ref.patch
+two-stage-finalize.patch -p0
+fix-purge-lockres.patch -p0
+dlm-eloop.patch -p0
+lvb-recovery-fix.patch -p0
+mar24-create-lock-handler.patch -p0
+mastery-restart-recovery.patch -p0
+leave-other-dead-nodes-on-recovery-list.patch
Added: branches/ocfs2-1.2-cert/patches/two-stage-finalize.patch
===================================================================
--- branches/ocfs2-1.2-cert/patches/two-stage-finalize.patch 2006-03-27 22:25:16 UTC (rev 2789)
+++ branches/ocfs2-1.2-cert/patches/two-stage-finalize.patch 2006-03-27 22:29:12 UTC (rev 2790)
@@ -0,0 +1,332 @@
+Index: fs/ocfs2/dlm/dlmcommon.h
+===================================================================
+--- fs/ocfs2/dlm/dlmcommon.h.orig 2006-03-22 14:36:02.379128000 -0800
++++ fs/ocfs2/dlm/dlmcommon.h 2006-03-22 14:36:02.448059000 -0800
+@@ -61,7 +61,8 @@ static inline int dlm_is_recovery_lock(c
+ return 0;
+ }
+
+-#define DLM_RECO_STATE_ACTIVE 0x0001
++#define DLM_RECO_STATE_ACTIVE 0x0001
++#define DLM_RECO_STATE_FINALIZE 0x0002
+
+ struct dlm_recovery_ctxt
+ {
+@@ -618,7 +619,8 @@ struct dlm_finalize_reco
+ {
+ u8 node_idx;
+ u8 dead_node;
+- __be16 pad1;
++ u8 flags;
++ u8 pad1;
+ __be32 pad2;
+ };
+
+Index: fs/ocfs2/dlm/dlmrecovery.c
+===================================================================
+--- fs/ocfs2/dlm/dlmrecovery.c.orig 2006-03-22 14:36:02.372135000 -0800
++++ fs/ocfs2/dlm/dlmrecovery.c 2006-03-22 14:36:02.462056000 -0800
+@@ -134,12 +134,18 @@ static inline void dlm_set_reco_master(s
+ dlm->reco.new_master = master;
+ }
+
+-static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
++static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm)
+ {
+- spin_lock(&dlm->spinlock);
++ assert_spin_locked(&dlm->spinlock);
+ clear_bit(dlm->reco.dead_node, dlm->recovery_map);
+ dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
+ dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
++}
++
++static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
++{
++ spin_lock(&dlm->spinlock);
++ __dlm_reset_recovery(dlm);
+ spin_unlock(&dlm->spinlock);
+ }
+
+@@ -380,7 +386,7 @@ void dlm_wait_for_recovery(struct dlm_ct
+ dlm->name, dlm->dlm_reco_thread_task->pid,
+ dlm->reco.state, dlm->reco.new_master,
+ dlm->reco.dead_node);
+- dlm_print_reco_junk(dlm);
++ //dlm_print_reco_junk(dlm);
+ }
+
+ while (1) {
+@@ -393,7 +399,7 @@ void dlm_wait_for_recovery(struct dlm_ct
+ dlm->name, dlm->dlm_reco_thread_task->pid,
+ dlm->reco.state, dlm->reco.new_master,
+ dlm->reco.dead_node);
+- dlm_print_reco_junk(dlm);
++ //dlm_print_reco_junk(dlm);
+ }
+ // wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
+ }
+@@ -429,6 +435,7 @@ int dlm_do_recovery(struct dlm_ctxt *dlm
+ dlm->reco.new_master, dlm->reco.dead_node);
+ /* unset the new_master, leave dead_node */
+ dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
++#warning need to make a decision here whether to move lockreses off of dlm->reco.resources or leave them
+ }
+
+ /* select a target to recover */
+@@ -521,6 +528,7 @@ static int dlm_remaster_locks(struct dlm
+ int all_nodes_done;
+ int destroy = 0;
+ int pass = 0;
++ unsigned long long mlg;
+
+ status = dlm_init_recovery_area(dlm, dead_node);
+ if (status < 0)
+@@ -559,9 +567,9 @@ static int dlm_remaster_locks(struct dlm
+ BUG();
+ break;
+ case DLM_RECO_NODE_DATA_DEAD:
+- mlog(0, "node %u died after requesting "
+- "recovery info for node %u\n",
+- ndata->node_num, dead_node);
++ mlog(ML_ERROR, "%s:node %u died after "
++ "requesting recovery info for node %u\n",
++ dlm->name, ndata->node_num, dead_node);
+ // start all over
+ destroy = 1;
+ status = -EAGAIN;
+@@ -593,6 +601,7 @@ static int dlm_remaster_locks(struct dlm
+ while (1) {
+ /* check all the nodes now to see if we are
+ * done, or if anyone died */
++ pass++;
+ all_nodes_done = 1;
+ spin_lock(&dlm_reco_state_lock);
+ list_for_each(iter, &dlm->reco.node_data) {
+@@ -633,7 +642,13 @@ static int dlm_remaster_locks(struct dlm
+ goto leave;
+ case DLM_RECO_NODE_DATA_RECEIVING:
+ case DLM_RECO_NODE_DATA_REQUESTED:
+- mlog(0, "%s: node %u still in state %s\n",
++ if (pass % 1000)
++ mlg = ML_ERROR;
++ else if (pass % 100 == 0)
++ mlg = ML_NOTICE;
++ else
++ mlg = 0;
++ mlog(mlg, "%s: node %u still in state %s\n",
+ dlm->name, ndata->node_num,
+ ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?
+ "receiving" : "requested");
+@@ -651,7 +666,7 @@ static int dlm_remaster_locks(struct dlm
+ }
+ spin_unlock(&dlm_reco_state_lock);
+
+- mlog(0, "pass #%d, all_nodes_done?: %s\n", ++pass,
++ mlog(0, "pass #%d, all_nodes_done?: %s\n", pass,
+ all_nodes_done?"yes":"no");
+ if (all_nodes_done) {
+ int ret;
+@@ -1708,8 +1723,19 @@ static int dlm_process_recovery_data(str
+ if (!dlm_lvb_is_empty(res->lvb) &&
+ (ml->type == LKM_EXMODE ||
+ memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+- mlog(ML_ERROR, "received bad lvb!\n");
+- __dlm_print_one_lock_resource(res);
++ int i;
++ mlog(ML_ERROR, "%s:%.*s: received bad "
++ "lvb! type=%d\n", dlm->name,
++ res->lockname.len,
++ res->lockname.name, ml->type);
++ printk("lockres lvb=[");
++ for (i=0; i<DLM_LVB_LEN; i++)
++ printk("%02x", res->lvb[i]);
++ printk("]\nmigrated lvb=[");
++ for (i=0; i<DLM_LVB_LEN; i++)
++ printk("%02x", mres->lvb[i]);
++ printk("]\n");
++ dlm_print_one_lock_resource(res);
+ BUG();
+ }
+ memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
+@@ -2099,6 +2125,20 @@ void __dlm_hb_node_down(struct dlm_ctxt
+ {
+ assert_spin_locked(&dlm->spinlock);
+
++ if (dlm->reco.new_master == idx) {
++ mlog(ML_NOTICE, "%s: recovery master %d just died\n",
++ dlm->name, idx);
++ if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
++ /* finalize1 was reached, so it is safe to clear
++ * the new_master and dead_node. that recovery
++ * is complete. */
++ mlog(ML_NOTICE, "%s: dead master %d had reached "
++ "finalize1 state, clearing\n", dlm->name, idx);
++ dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
++ __dlm_reset_recovery(dlm);
++ }
++ }
++
+ /* check to see if the node is already considered dead */
+ if (!test_bit(idx, dlm->live_nodes_map)) {
+ mlog(0, "for domain %s, node %d is already dead. "
+@@ -2405,6 +2445,13 @@ retry:
+ * another ENOMEM */
+ msleep(100);
+ goto retry;
++ } else if (ret == EAGAIN) {
++ mlog(ML_NOTICE, "%s: trying to start recovery of node "
++ "%u, but node %u is waiting for last recovery "
++ "to complete, backoff for a bit\n", dlm->name,
++ dead_node, nodenum);
++ msleep(100);
++ goto retry;
+ }
+ }
+
+@@ -2419,6 +2466,17 @@ int dlm_begin_reco_handler(struct o2net_
+ /* ok to return 0, domain has gone away */
+ if (!dlm_grab(dlm))
+ return 0;
++
++ spin_lock(&dlm->spinlock);
++ if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
++ mlog(ML_NOTICE, "%s: node %u wants to recover node %u (%u:%u) "
++ "but this node is in finalize state, waiting on finalize2\n",
++ dlm->name, br->node_idx, br->dead_node,
++ dlm->reco.dead_node, dlm->reco.new_master);
++ spin_unlock(&dlm->spinlock);
++ return EAGAIN;
++ }
++ spin_unlock(&dlm->spinlock);
+
+ mlog(ML_NOTICE, "%s: node %u wants to recover node %u (%u:%u)\n",
+ dlm->name, br->node_idx, br->dead_node,
+@@ -2473,6 +2531,7 @@ int dlm_begin_reco_handler(struct o2net_
+ return 0;
+ }
+
++#define DLM_FINALIZE_STAGE2 0x01
+ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
+ {
+ int ret = 0;
+@@ -2480,25 +2539,31 @@ static int dlm_send_finalize_reco_messag
+ struct dlm_node_iter iter;
+ int nodenum;
+ int status;
++ int stage = 1;
+
+- mlog(ML_NOTICE, "finishing recovery for node %s:%u\n",
+- dlm->name, dlm->reco.dead_node);
++ mlog(ML_NOTICE, "finishing recovery for node %s:%u, "
++ "stage %d\n", dlm->name, dlm->reco.dead_node, stage);
+
+ spin_lock(&dlm->spinlock);
+ dlm_node_iter_init(dlm->domain_map, &iter);
+ spin_unlock(&dlm->spinlock);
+
++stage2:
+ memset(&fr, 0, sizeof(fr));
+ fr.node_idx = dlm->node_num;
+ fr.dead_node = dlm->reco.dead_node;
++ if (stage == 2)
++ fr.flags |= DLM_FINALIZE_STAGE2;
+
+ while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+ if (nodenum == dlm->node_num)
+ continue;
+ ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
+ &fr, sizeof(fr), nodenum, &status);
+- if (ret >= 0) {
++ if (ret >= 0)
+ ret = status;
++ if (ret < 0) {
++ mlog_errno(ret);
+ if (dlm_is_host_down(ret)) {
+ /* this has no effect on this recovery
+ * session, so set the status to zero to
+@@ -2507,12 +2572,15 @@ static int dlm_send_finalize_reco_messag
+ "node finished recovery.\n", nodenum);
+ ret = 0;
+ }
+- }
+- if (ret < 0) {
+- mlog_errno(ret);
+ break;
+ }
+ }
++ if (stage == 1) {
++ /* reset the node_iter back to the top and send finalize2 */
++ iter.curnode = -1;
++ stage = 2;
++ goto stage2;
++ }
+
+ return ret;
+ }
+@@ -2521,14 +2589,18 @@ int dlm_finalize_reco_handler(struct o2n
+ {
+ struct dlm_ctxt *dlm = data;
+ struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
++ int stage = 1;
+
+ /* ok to return 0, domain has gone away */
+ if (!dlm_grab(dlm))
+ return 0;
+
+- mlog(ML_NOTICE, "%s: node %u finalizing recovery of node %u (%u:%u)\n",
+- dlm->name, fr->node_idx, fr->dead_node,
+- dlm->reco.dead_node, dlm->reco.new_master);
++ if (fr->flags & DLM_FINALIZE_STAGE2)
++ stage = 2;
++
++ mlog(ML_NOTICE, "%s: node %u finalizing recovery stage%d of "
++ "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
++ fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
+
+ spin_lock(&dlm->spinlock);
+
+@@ -2545,13 +2617,38 @@ int dlm_finalize_reco_handler(struct o2n
+ BUG();
+ }
+
+- dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
+-
+- spin_unlock(&dlm->spinlock);
+-
+- dlm_reset_recovery(dlm);
++ switch (stage) {
++ case 1:
++ dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
++ if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
++ mlog(ML_ERROR, "%s: received finalize1 from "
++ "new master %u for dead node %u, but "
++ "this node has already received it!\n",
++ dlm->name, fr->node_idx, fr->dead_node);
++ dlm_print_reco_junk(dlm);
++ BUG();
++ }
++ dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
++ spin_unlock(&dlm->spinlock);
++ break;
++ case 2:
++ if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) {
++ mlog(ML_ERROR, "%s: received finalize2 from "
++ "new master %u for dead node %u, but "
++ "this node did not have finalize1!\n",
++ dlm->name, fr->node_idx, fr->dead_node);
++ dlm_print_reco_junk(dlm);
++ BUG();
++ }
++ dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
++ spin_unlock(&dlm->spinlock);
++ dlm_reset_recovery(dlm);
++ dlm_kick_recovery_thread(dlm);
++ break;
++ default:
++ BUG();
++ }
+
+- dlm_kick_recovery_thread(dlm);
+ mlog(ML_NOTICE, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
+ dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
+
More information about the Ocfs2-commits
mailing list