[Ocfs2-commits] khackel commits r2520 - trunk/fs/ocfs2/dlm
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Wed Aug 17 15:25:56 CDT 2005
Author: khackel
Signed-off-by: mfasheh
Date: 2005-08-17 15:25:55 -0500 (Wed, 17 Aug 2005)
New Revision: 2520
Modified:
trunk/fs/ocfs2/dlm/dlmrecovery.c
Log:
* fix a problem with lvb recovery reported in bug 527
* adds dlm_revalidate_lvb to dlm_do_local_recovery_cleanup in the
recovery path
Signed-off-by: mfasheh
Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-08-16 23:15:28 UTC (rev 2519)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-08-17 20:25:55 UTC (rev 2520)
@@ -912,15 +912,17 @@
ml->list = queue;
if (lock->lksb) {
ml->flags = lock->lksb->flags;
- if (ml->flags & DLM_LKSB_PUT_LVB) {
- /* NOTE: because we only support NL, PR and EX locks
- * there can be only one lock on this lockres with
- * this flag, and it must be currently an EX.
- * this means this node had a pending LVB change
- * when the master died. we should send his lvb
- * over and attach it to the lksb on the other side */
- BUG_ON(ml->type != LKM_EXMODE);
- BUG_ON(mres->lvb[0] != 0);
+ /* send our current lvb */
+ if (ml->type == LKM_EXMODE ||
+ ml->type == LKM_PRMODE) {
+ /* if it is already set, this had better be a PR
+ * and it has to match */
+ if (mres->lvb[0] && (ml->type == LKM_EXMODE ||
+ memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
+ mlog(ML_ERROR, "mismatched lvbs!\n");
+ __dlm_print_one_lock_resource(lock->lockres);
+ BUG();
+ }
memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
}
}
@@ -1428,13 +1430,29 @@
}
lksb->flags |= (ml->flags &
(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
- if (lksb->flags & DLM_LKSB_PUT_LVB) {
- /* other node was trying to update
- * lvb when node died. recreate the
- * lksb with the updated lvb. */
- memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
+
+ if (mres->lvb[0]) {
+ if (lksb->flags & DLM_LKSB_PUT_LVB) {
+ /* other node was trying to update
+ * lvb when node died. recreate the
+ * lksb with the updated lvb. */
+ memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
+ } else {
+ /* otherwise, the node is sending its
+ * most recent valid lvb info */
+ BUG_ON(ml->type != LKM_EXMODE &&
+ ml->type != LKM_PRMODE);
+ if (res->lvb[0] && (ml->type == LKM_EXMODE ||
+ memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+ mlog(ML_ERROR, "received bad lvb!\n");
+ __dlm_print_one_lock_resource(res);
+ BUG();
+ }
+ memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
+ }
}
+
/* NOTE:
* wrt lock queue ordering and recovery:
* 1. order of locks on granted queue is
@@ -1605,12 +1623,71 @@
}
}
+static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local)
+{
+ if (local) {
+ if (lock->ml.type != LKM_EXMODE &&
+ lock->ml.type != LKM_PRMODE)
+ return 1;
+ } else if (lock->ml.type == LKM_EXMODE)
+ return 1;
+ return 0;
+}
+
+static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res, u8 dead_node)
+{
+ struct list_head *iter, *queue;
+ struct dlm_lock *lock;
+ int blank_lvb = 0, local = 0;
+ int i;
+ u8 search_node;
+
+ assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&res->spinlock);
+
+ if (res->owner == dlm->node_num)
+ /* if this node owned the lockres, and if the dead node
+ * had an EX when he died, blank out the lvb */
+ search_node = dead_node;
+ else {
+ /* if this is a secondary lockres, and we had no EX or PR
+ * locks granted, we can no longer trust the lvb */
+ search_node = dlm->node_num;
+ local = 1; /* check local state for valid lvb */
+ }
+
+ for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) {
+ queue = dlm_list_idx_to_ptr(res, i);
+ list_for_each(iter, queue) {
+ lock = list_entry (iter, struct dlm_lock, list);
+ if (lock->ml.node == search_node) {
+ if (dlm_lvb_needs_invalidation(lock, local)) {
+ /* zero the lksb lvb and lockres lvb */
+ blank_lvb = 1;
+ memset(lock->lksb->lvb, 0, DLM_LVB_LEN);
+ }
+ }
+ }
+ }
+
+ if (blank_lvb) {
+ mlog(0, "clearing %.*s lvb, dead node %u had EX\n",
+ res->lockname.len, res->lockname.name, dead_node);
+ memset(res->lvb, 0, DLM_LVB_LEN);
+ }
+}
+
static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, u8 dead_node)
{
struct list_head *iter, *tmpiter;
struct dlm_lock *lock;
+ /* this node is the lockres master:
+ * 1) remove any stale locks for the dead node
+ * 2) if the dead node had an EX when he died, blank out the lvb
+ */
assert_spin_locked(&dlm->spinlock);
assert_spin_locked(&res->spinlock);
@@ -1641,6 +1718,13 @@
__dlm_dirty_lockres(dlm, res);
}
+/* if this node is the recovery master, and there are no
+ * locks for a given lockres owned by this node that are in
+ * either PR or EX mode, zero out the lvb before requesting.
+ *
+ */
+
+
static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
{
struct list_head *iter;
@@ -1673,7 +1757,10 @@
if (dlm_is_recovery_lock(res->lockname.name,
res->lockname.len))
continue;
+
spin_lock(&res->spinlock);
+ /* zero the lvb if necessary */
+ dlm_revalidate_lvb(dlm, res, dead_node);
if (res->owner == dead_node)
dlm_move_lockres_to_recovery_list(dlm, res);
else if (res->owner == dlm->node_num) {
More information about the Ocfs2-commits
mailing list