[Ocfs2-commits] khackel commits r2382 - trunk/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Thu Jun 9 20:55:23 CDT 2005


Author: khackel
Signed-off-by: mfasheh
Date: 2005-06-09 20:55:22 -0500 (Thu, 09 Jun 2005)
New Revision: 2382

Modified:
   trunk/fs/ocfs2/dlm/dlmcommon.h
   trunk/fs/ocfs2/dlm/dlmrecovery.c
Log:
* fixes bug 479, bad data was being used for the dlm_reco_data_done
  message type
* fixes bug where very fast recovery message would BUG on
  because master would still expect the node to be in REQUESTING state
* allow a recovery master to manually trigger a "node down" event on
  the local node if that node has not seen the node death yet
* move local recovery cleanup before setting of bit in recovery_map

Signed-off-by: mfasheh



Modified: trunk/fs/ocfs2/dlm/dlmcommon.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmcommon.h	2005-06-09 18:42:48 UTC (rev 2381)
+++ trunk/fs/ocfs2/dlm/dlmcommon.h	2005-06-10 01:55:22 UTC (rev 2382)
@@ -893,6 +893,7 @@
 
 int dlm_nm_init(dlm_ctxt *dlm);
 int dlm_heartbeat_init(dlm_ctxt *dlm);
+void __dlm_hb_node_down(dlm_ctxt *dlm, int idx);
 void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data);
 void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data);
 int dlm_hb_node_dead(dlm_ctxt *dlm, int node);

Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-06-09 18:42:48 UTC (rev 2381)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-06-10 01:55:22 UTC (rev 2382)
@@ -663,14 +663,15 @@
 static int dlm_send_all_done_msg(dlm_ctxt *dlm, u8 dead_node, u8 send_to)
 {
 	int ret, tmpret;	
-	dlm_reco_data_done *done_msg;
+	dlm_reco_data_done done_msg;
 
-	done_msg = kcalloc(1, sizeof(*done_msg), GFP_KERNEL);
-	if (!done_msg)
-		return -ENOMEM;
-	done_msg->node_idx = dlm->node_num;
-	done_msg->dead_node = dead_node;
-	dlm_reco_data_done_to_net(done_msg);
+	memset(&done_msg, 0, sizeof(done_msg));
+	done_msg.node_idx = dlm->node_num;
+	done_msg.dead_node = dead_node;
+	mlog(0, "sending DATA DONE message to %u, "
+	     "my node=%u, dead node=%u\n", send_to, done_msg.node_idx, 
+	     done_msg.dead_node);
+	dlm_reco_data_done_to_net(&done_msg);
 	
 	ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, 
 				 sizeof(done_msg), send_to, &tmpret);
@@ -692,6 +693,9 @@
 		return -EINVAL;
 
 	dlm_reco_data_done_to_host(done);
+	mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
+	     "node_idx=%u, this node=%u\n", done->dead_node, 
+	     dlm->reco.dead_node, done->node_idx, dlm->node_num);
 	DLM_ASSERT(done->dead_node == dlm->reco.dead_node);
 
 	spin_lock(&dlm_reco_state_lock);
@@ -702,7 +706,6 @@
 
 		switch (ndata->state) {
 			case DLM_RECO_NODE_DATA_INIT:
-			case DLM_RECO_NODE_DATA_REQUESTING:
 			case DLM_RECO_NODE_DATA_DEAD:
 			case DLM_RECO_NODE_DATA_DONE:
 			case DLM_RECO_NODE_DATA_FINALIZE_SENT:
@@ -713,9 +716,11 @@
 				break;
 			case DLM_RECO_NODE_DATA_RECEIVING:
 			case DLM_RECO_NODE_DATA_REQUESTED:
+			case DLM_RECO_NODE_DATA_REQUESTING:
 				mlog(0, "node %u is DONE sending "
 					  "recovery data!\n",
 					  ndata->node_num);
+
 				ndata->state = DLM_RECO_NODE_DATA_DONE;
 				ret = 0;
 				break;
@@ -731,6 +736,8 @@
 		mlog(ML_ERROR, "failed to find recovery node data for node "
 		     "%u\n", done->node_idx);
 	dlm_put(dlm);
+
+	mlog(0, "leaving reco data done handler, ret=%d\n", ret);
 	return ret;
 }
 
@@ -1531,18 +1538,13 @@
 
 }
 
-
-void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data)
+void __dlm_hb_node_down(dlm_ctxt *dlm, int idx)
 {
-	dlm_ctxt *dlm = data;
 	dlm_master_list_entry *mle;
 	struct list_head *iter;
 
-	if (!dlm_grab(dlm))
-		return;
+	assert_spin_locked(&dlm->spinlock);
 
-	spin_lock(&dlm->spinlock);
-
 	clear_bit(idx, dlm->live_nodes_map);
 
 	/* Clean up join state on node death. */
@@ -1551,17 +1553,21 @@
 		__dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
 	}
 
+	/* make sure local cleanup occurs before the heartbeat events */
+	if (!test_bit(idx, dlm->recovery_map))
+		dlm_do_local_recovery_cleanup(dlm, idx);
+
 	/* notify any mles attached to the heartbeat events */
 	list_for_each(iter, &dlm->mle_hb_events) {
 		mle = list_entry(iter, dlm_master_list_entry, hb_events);
-		dlm_mle_node_down(dlm, mle, node, idx);
+		dlm_mle_node_down(dlm, mle, NULL, idx);
 	}
 
 	if (!test_bit(idx, dlm->domain_map)) {
 		/* This also catches the case that we get a node down
 		 * but haven't joined the domain yet. */
 		mlog(0, "node %u already removed from domain!\n", idx);
-		goto bail;
+		return;
 	}
 
 	mlog(0, "node %u being removed from domain map!\n", idx);
@@ -1569,12 +1575,19 @@
 
 	if (test_bit(idx, dlm->recovery_map))
 		mlog(0, "node %u already added to recovery map!\n", idx);
-	else {
+	else
 		set_bit(idx, dlm->recovery_map);
-		dlm_do_local_recovery_cleanup(dlm, idx);
-	}
+}
 
-bail:
+void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data)
+{
+	dlm_ctxt *dlm = data;
+
+	if (!dlm_grab(dlm))
+		return;
+
+	spin_lock(&dlm->spinlock);
+	__dlm_hb_node_down(dlm, idx);
 	spin_unlock(&dlm->spinlock);
 
 	dlm_put(dlm);
@@ -1754,6 +1767,12 @@
 	}
 	dlm->reco.new_master = br->node_idx;
 	dlm->reco.dead_node = br->dead_node;
+	if (!test_bit(br->dead_node, dlm->recovery_map)) {
+		mlog(ML_ERROR, "recovery master %u sees %u as dead, but this "
+		     "node has not yet.  marking %u as dead\n",
+		     br->node_idx, br->dead_node, br->dead_node);
+		__dlm_hb_node_down(dlm, br->dead_node);
+	}
 	spin_unlock(&dlm->spinlock);
 
 	dlm_kick_recovery_thread(dlm);



More information about the Ocfs2-commits mailing list