[Ocfs2-commits] zab commits r2422 - in trunk/fs/ocfs2: cluster dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Tue Jun 28 16:07:32 CDT 2005


Author: zab
Signed-off-by: mfasheh
Date: 2005-06-23 20:43:58 -0500 (Thu, 23 Jun 2005)
New Revision: 2422

Modified:
   trunk/fs/ocfs2/cluster/heartbeat.c
   trunk/fs/ocfs2/cluster/heartbeat.h
   trunk/fs/ocfs2/cluster/masklog.c
   trunk/fs/ocfs2/cluster/masklog.h
   trunk/fs/ocfs2/cluster/nodemanager.c
   trunk/fs/ocfs2/cluster/tcp.c
   trunk/fs/ocfs2/cluster/tcp.h
   trunk/fs/ocfs2/cluster/tcp_internal.h
   trunk/fs/ocfs2/dlm/dlmdomain.c
Log:
Keep track of node connectivity and fence nodes who find out that they
can't talk to a majority of the heartbeating nodes in the cluster.  Also
add the net protocol version handshake.

o turn on some tcp options to send keepalives and timeout after 10s
o count connected and hearbeating nodes.  on disconnect, after hb timeout,
  if we aren't connected to a majority of the cluster we panic.
o only reconnect until we have a valid connection.. if it fails hb must
  go down and back up to connect again.
o get rid of the complicated reconnecting backoff; just reconnect every 2s
o add a timeout that triggers if we don't connect/accept in time after hb up
o enotconn is now fatal in the dlm join path
o trade version numbers on connect and fail if they don't match
o move to static hb timeouts so we can derive net timeouts from them
o turn down hb timeout to 14 seconds to make our 45 sec deadline
o fix up bad eof/handshake handling in rx_until_empty that could livelock
o add hb helpers which don't use the sem for calling while holding it
o give net the ability to stop all hb regions
o avoid infinite recursion by not flushing the workqueue from within it

Signed-off-by: mfasheh



Modified: trunk/fs/ocfs2/cluster/heartbeat.c
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.c	2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/heartbeat.c	2005-06-24 01:43:58 UTC (rev 2422)
@@ -57,18 +57,14 @@
 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
 static u64 o2hb_generation;
 
+static LIST_HEAD(o2hb_all_regions);
+
 static struct o2hb_callback {
 	struct list_head list;
 } o2hb_callbacks[O2HB_NUM_CB];
 
 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
 
-#define O2HB_REGION_MIN_TIMEOUT_MS	500
-#define O2HB_REGION_MAX_TIMEOUT_MS	jiffies_to_msecs(MAX_JIFFY_OFFSET)
-
-/* number of changes to be seen as live */ 
-#define O2HB_LIVE_THRESHOLD	   2
-
 #define O2HB_DEFAULT_BLOCK_BITS       9
 
 struct o2hb_node_event {
@@ -93,6 +89,9 @@
 struct o2hb_region {
 	struct config_item	hr_item;
 
+	struct list_head	hr_all_item;
+	unsigned		hr_unclean_stop:1;
+
 	/* protected by the hr_callback_sem */
 	struct task_struct 	*hr_task;
 
@@ -791,7 +790,7 @@
 
 	mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
 
-	while (!kthread_should_stop()) {
+	while (!kthread_should_stop() && !reg->hr_unclean_stop) {
 		o2hb_do_disk_heartbeat(reg);
 
 		/* the kthread api has blocked signals for us so no
@@ -799,7 +798,8 @@
 		msleep_interruptible(reg->hr_timeout_ms);
 	}
 
-	for(i = 0; i < reg->hr_blocks; i++)
+	/* unclean stop is only used in very bad situation */
+	for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
 		o2hb_shutdown_slot(&reg->hr_slots[i]);
 
 	mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n");
@@ -827,18 +827,24 @@
 		get_random_bytes(&o2hb_generation, sizeof(o2hb_generation));
 }
 
+/* if we're already in a callback then we're already serialized by the sem */
+void o2hb_fill_node_map_from_callback(unsigned long *map, unsigned bytes)
+{
+	BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
+
+	memcpy(map, &o2hb_live_node_bitmap, bytes);
+}
+
 /*
  * get a map of all nodes that are heartbeating in any regions
  */
 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
 {
-	BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
-
 	/* callers want to serialize this map and callbacks so that they
 	 * can trust that they don't miss nodes coming to the party */
 	down_read(&o2hb_callback_sem);
 	spin_lock(&o2hb_live_lock);
-	memcpy(map, &o2hb_live_node_bitmap, bytes);
+	o2hb_fill_node_map_from_callback(map, bytes);
 	spin_unlock(&o2hb_live_lock);
 	up_read(&o2hb_callback_sem);
 }
@@ -877,6 +883,11 @@
 
 	if (reg->hr_slots)
 		kfree(reg->hr_slots);
+
+	spin_lock(&o2hb_live_lock);
+	list_del(&reg->hr_all_item);
+	spin_unlock(&o2hb_live_lock);
+
 	kfree(reg);
 }
 
@@ -903,7 +914,8 @@
 	if (!tmp)
 		return -ERANGE;
 
-	reg->hr_dead_iter = (unsigned int)tmp;
+	/* XXX */
+	reg->hr_dead_iter = O2HB_DEAD_THRESHOLD;
 
 	return count;
 }
@@ -928,12 +940,9 @@
 	if (!p || (*p && (*p != '\n')))
 		return -EINVAL;
 
-	if (tmp > O2HB_REGION_MAX_TIMEOUT_MS ||
-	    tmp < O2HB_REGION_MIN_TIMEOUT_MS)
-		return -ERANGE;
+	/* XXX */
+	reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
 
-	reg->hr_timeout_ms = (unsigned int)tmp;
-
 	return count;
 }
 
@@ -1412,6 +1421,10 @@
 	config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
 
 	ret = &reg->hr_item;
+
+	spin_lock(&o2hb_live_lock);
+	list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
+	spin_unlock(&o2hb_live_lock);
 out:
 	if (ret == NULL)
 		kfree(reg);
@@ -1568,6 +1581,22 @@
 }
 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
 
+int o2hb_check_node_heartbeating_from_callback(u8 node_num)
+{
+	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+
+	o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
+	if (!test_bit(node_num, testing_map)) {
+		mlog(ML_HEARTBEAT,
+		     "node (%u) does not have heartbeating enabled.\n",
+		     node_num);
+		return 0;
+	}
+
+	return 1;
+}
+EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
+
 /* Makes sure our local node is configured with a node number, and is
  * heartbeating. */
 int o2hb_check_local_node_heartbeating(void)
@@ -1584,3 +1613,39 @@
 	return o2hb_check_node_heartbeating(node_num);
 }
 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
+
+/* Makes sure our local node is configured with a node number, and is
+ * heartbeating. */
+int o2hb_check_local_node_heartbeating_from_callback(void)
+{
+	u8 node_num;
+
+	/* if this node was set then we have networking */
+	node_num = o2nm_this_node();
+	if (node_num == O2NM_MAX_NODES) {
+		mlog(ML_HEARTBEAT, "this node has not been configured.\n");
+		return 0;
+	}
+
+	return o2hb_check_node_heartbeating_from_callback(node_num);
+}
+EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating_from_callback);
+
+/*
+ * this is just a hack until we get the plumbing which flips file systems
+ * read only and drops the hb ref instead of killing the node dead.
+ */
+void o2hb_stop_all_regions(void)
+{
+	struct o2hb_region *reg;
+
+	mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
+
+	spin_lock(&o2hb_live_lock);
+
+	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
+		reg->hr_unclean_stop = 1;
+
+	spin_unlock(&o2hb_live_lock);
+}	
+EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);

Modified: trunk/fs/ocfs2/cluster/heartbeat.h
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.h	2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/heartbeat.h	2005-06-24 01:43:58 UTC (rev 2422)
@@ -29,6 +29,13 @@
 
 #include "ocfs2_heartbeat.h"
 
+#define O2HB_REGION_TIMEOUT_MS		2000
+
+/* number of changes to be seen as live */ 
+#define O2HB_LIVE_THRESHOLD	   2
+/* number of equal samples to be seen as dead */ 
+#define O2HB_DEAD_THRESHOLD	   7
+
 #define O2HB_CB_MAGIC		0x51d1e4ec
 
 /* callback stuff */
@@ -62,8 +69,12 @@
 int o2hb_unregister_callback(struct o2hb_callback_func *hc); 
 void o2hb_fill_node_map(unsigned long *map,
 			unsigned bytes);
+void o2hb_fill_node_map_from_callback(unsigned long *map, unsigned bytes);
 void o2hb_init(void);
 int o2hb_check_node_heartbeating(u8 node_num);
+int o2hb_check_node_heartbeating_from_callback(u8 node_num);
 int o2hb_check_local_node_heartbeating(void);
+int o2hb_check_local_node_heartbeating_from_callback(void);
+void o2hb_stop_all_regions(void);
 
 #endif /* O2CLUSTER_HEARTBEAT_H */

Modified: trunk/fs/ocfs2/cluster/masklog.c
===================================================================
--- trunk/fs/ocfs2/cluster/masklog.c	2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/masklog.c	2005-06-24 01:43:58 UTC (rev 2422)
@@ -210,6 +210,7 @@
 	set_a_string(VOTE);
 	set_a_string(DCACHE);
 	set_a_string(CONN);
+	set_a_string(QUORUM);
 	set_a_string(ERROR);
 	set_a_string(NOTICE);
 	set_a_string(KTHREAD);

Modified: trunk/fs/ocfs2/cluster/masklog.h
===================================================================
--- trunk/fs/ocfs2/cluster/masklog.h	2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/masklog.h	2005-06-24 01:43:58 UTC (rev 2422)
@@ -107,6 +107,7 @@
 #define ML_VOTE		0x0000000001000000ULL /* ocfs2 node messaging  */
 #define ML_DCACHE	0x0000000002000000ULL /* ocfs2 dcache operations */
 #define ML_CONN		0x0000000004000000ULL /* net connection management */
+#define ML_QUORUM	0x0000000008000000ULL /* net connection quorum */
 /* bits that are infrequently given and frequently matched in the high word */
 #define ML_ERROR	0x0000000100000000ULL /* sent to KERN_ERR */
 #define ML_NOTICE	0x0000000200000000ULL /* setn to KERN_NOTICE */

Modified: trunk/fs/ocfs2/cluster/nodemanager.c
===================================================================
--- trunk/fs/ocfs2/cluster/nodemanager.c	2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/nodemanager.c	2005-06-24 01:43:58 UTC (rev 2422)
@@ -393,14 +393,14 @@
 
 	/* bring up the rx thread if we're setting the new local node. */
 	if (tmp && !cluster->cl_has_local) {
-		ret = o2net_start_listening(node->nd_ipv4_port);
+		ret = o2net_start_listening(node);
 		if (ret)
 			return ret;
 	}
 
 	if (!tmp && cluster->cl_has_local &&
 	    cluster->cl_local_node == node->nd_num) {
-		o2net_stop_listening();
+		o2net_stop_listening(node);
 		cluster->cl_local_node = 0;
 	}
 
@@ -577,7 +577,7 @@
 	    (cluster->cl_local_node == node->nd_num)) {
 		cluster->cl_has_local = 0;
 		cluster->cl_local_node = O2NM_MAX_NODES;
-		o2net_stop_listening();
+		o2net_stop_listening(node);
 	}
 
 	/* XXX call into net to stop this node from trading messages */
@@ -750,6 +750,8 @@
 	mlog_remove_proc(o2nm_proc);
 	o2net_proc_exit(o2nm_proc);
 	remove_proc_entry(O2NM_PROC_PATH, NULL);
+
+	o2net_exit();
 }
 
 static int o2nm_proc_version(char *page, char **start, off_t off,

Modified: trunk/fs/ocfs2/cluster/tcp.c
===================================================================
--- trunk/fs/ocfs2/cluster/tcp.c	2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/tcp.c	2005-06-24 01:43:58 UTC (rev 2422)
@@ -1,4 +1,5 @@
 /* -*- mode: c; c-basic-offset: 8; -*-
+ *
  * vim: noexpandtab sw=8 ts=8 sts=0:
  *
  * Copyright (C) 2004 Oracle.  All rights reserved.
@@ -50,15 +51,6 @@
  * higher node number gets a heartbeat callback which indicates that the lower
  * numbered node has started heartbeating.  The lower numbered node is passive
  * and only accepts the connection if the higher numbered node is heartbeating.
- * The connection attempts continue as long as heartbeat is active until a
- * build-time max number of failed attempts.
- *
- * XXX
- * 	- don't allow node rmdir if it has socket and rx thread is running
- * 	- disable preemt before calling rx handler when debugging
- * 	- find explicit stack call to drain rx queue
- * 	- add trivial version trading message at the start of a conn
- * 	- add kerneldoc for the external interface
  */
 
 #include <linux/kernel.h>
@@ -102,16 +94,32 @@
 static rwlock_t o2net_handler_lock = RW_LOCK_UNLOCKED;
 static struct rb_root o2net_handler_tree = RB_ROOT;
 
-static struct workqueue_struct *o2net_wq;
 static struct o2net_node o2net_nodes[O2NM_MAX_NODES];
 
 /* XXX someday we'll need better accounting */
 static struct socket *o2net_listen_sock = NULL;
-static struct work_struct o2net_listen_work;
 
+/*
+ * listen work is only queued by the listening socket callbacks on the
+ * o2net_wq.  teardown detaches the callbacks before destroying the workqueue.
+ * quorum work is queued as sock containers are shutdown.. stop_listening
+ * tears down all the node's sock containers, preventing future shutdowns
+ * and queued quroum work, before canceling delayed quorum work and
+ * destroying the work queue.
+ */
+static struct workqueue_struct *o2net_wq;
+static struct work_struct o2net_listen_work, o2net_quorum_work;
+
 static struct o2hb_callback_func o2net_hb_up, o2net_hb_down;
 #define O2NET_HB_PRI 0x1
 
+static struct o2net_handshake *o2net_hand;
+
+/* these node totals include our node.  I think the hb and net threads
+ * sufficiently serialize things so that these don't need locking */
+static int o2net_heartbeating_nodes = 0, o2net_connected_nodes = 0;
+static unsigned long o2net_connected_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
+
 static int o2net_sys_err_translations[O2NET_ERR_MAX] =
 		{[O2NET_ERR_NONE]	= 0,
 		 [O2NET_ERR_NO_HNDLR]	= -ENOPROTOOPT,
@@ -119,7 +127,6 @@
 		 [O2NET_ERR_DIED]	= -EHOSTDOWN,};
 
 /* can't quite avoid *all* internal declarations :/ */
-static void o2net_start_connect(void *arg);
 static void o2net_sc_connect_completed(void *arg);
 static void o2net_rx_until_empty(void *arg);
 static void o2net_shutdown_sc(void *arg);
@@ -148,6 +155,28 @@
 	return nn - o2net_nodes;
 }
 
+static void o2net_msg_to_net(o2net_msg *m)
+{
+	m->magic = htons(m->magic);
+	m->data_len = htons(m->data_len);
+	m->msg_type = htons(m->msg_type);
+	m->sys_status = htonl(m->sys_status);
+	m->status = htonl(m->status);
+	m->key = htonl(m->key);
+	m->msg_num = htonl(m->msg_num);
+}
+
+static void o2net_msg_to_host(o2net_msg *m)
+{
+	m->magic = ntohs(m->magic);
+	m->data_len = ntohs(m->data_len);
+	m->msg_type = ntohs(m->msg_type);
+	m->sys_status = ntohl(m->sys_status);
+	m->status = ntohl(m->status);
+	m->key = ntohl(m->key);
+	m->msg_num = ntohl(m->msg_num);
+}
+
 /* ------------------------------------------------------------ */
 
 static int o2net_prep_nsw(struct o2net_node *nn, struct o2net_status_wait *nsw)
@@ -276,6 +305,11 @@
 	sclog(sc, "put\n");
 	kref_put(&sc->sc_kref, sc_kref_release);
 }
+static void sc_get(struct o2net_sock_container *sc)
+{
+	sclog(sc, "get\n");
+	kref_get(&sc->sc_kref);
+}
 static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
 {
 	struct o2net_sock_container *sc, *ret = NULL;
@@ -312,15 +346,33 @@
 
 /* ------------------------------------------------------------ */
 
-/* make sure the work gets a ref to drop */
 static void o2net_sc_queue_work(struct o2net_sock_container *sc,
 				struct work_struct *work)
 {
-	kref_get(&sc->sc_kref);
+	sc_get(sc);
 	if (!queue_work(o2net_wq, work))
 		sc_put(sc);
 }
 
+static void o2net_mod_connected_nodes(u8 node, int delta)
+{
+	BUG_ON(delta == 0);
+	if (delta < 0) {
+		mlog_bug_on_msg(o2net_connected_nodes == 0,
+				"node %u delta %d\n", node, delta);
+		o2net_connected_nodes--;
+		clear_bit(node, o2net_connected_bitmap);
+	} else {
+		mlog_bug_on_msg(o2net_connected_nodes == O2NM_MAX_NODES,
+				"node %u delta %d\n", node, delta);
+		o2net_connected_nodes++;
+		set_bit(node, o2net_connected_bitmap);
+	}
+
+	mlog(ML_QUORUM, "node %u %sconnected, now %u connected\n", node,
+	     delta < 0 ? "dis" : "", o2net_connected_nodes);
+}
+
 static void o2net_set_nn_state(struct o2net_node *nn,
 			       struct o2net_sock_container *sc,
 			       unsigned valid, int err)
@@ -336,6 +388,11 @@
 	mlog_bug_on_msg(err && valid, "err %d valid %u\n", err, valid);
 	mlog_bug_on_msg(valid && !sc, "valid %u sc %p\n", valid, sc);
 
+	/* we won't reconnect after our valid conn goes away for
+	 * this hb iteration.. here so it shows up in the logs */ 
+	if (was_valid && !valid && err == 0)
+		err = -ENOTCONN;
+
 	mlog(ML_CONN, "node %u sc: %p -> %p, valid %u -> %u, err %d -> %d\n",
 	     o2net_num_from_nn(nn), nn->nn_sc, sc, nn->nn_sc_valid, valid,
 	     nn->nn_persistent_error, err);
@@ -349,7 +406,12 @@
 		wake_up(&nn->nn_sc_wq);
 
 	if (was_valid && !valid) {
-		mlog(ML_NOTICE, "ocfs2:tcp: no longer connected to "
+		o2net_mod_connected_nodes(old_sc->sc_node->nd_num, -1);
+		if (o2net_wq)
+			queue_delayed_work(o2net_wq, &o2net_quorum_work,
+				      msecs_to_jiffies(O2NET_QUORUM_DELAY_MS));
+
+		mlog(ML_NOTICE, "no longer connected to "
 		       "node %s at %u.%u.%u.%u:%d\n",
 		       old_sc->sc_node->nd_name,
 		       NIPQUAD(old_sc->sc_node->nd_ipv4_address), 
@@ -357,26 +419,42 @@
 		o2net_complete_nodes_nsw(nn);
 	}
 
-	if (!was_valid && valid)
-		mlog(ML_NOTICE, "ocfs2:tcp: connected to node %s at "
-		     "%u.%u.%u.%u:%d\n", sc->sc_node->nd_name,
+	if (!was_valid && valid) {
+		/* this is a bit of a hack.  we only try reconnecting
+		 * when heartbeating starts until we get a connection.
+		 * if that connection then dies we don't try reconnecting.
+		 * the only way to start connecting again is to down
+		 * heartbeat and bring it back up. */
+		cancel_delayed_work(&nn->nn_connect_expired);
+		o2net_mod_connected_nodes(sc->sc_node->nd_num, 1);
+		mlog(ML_NOTICE, "%s node %s num %u at %u.%u.%u.%u:%d\n",
+		     o2nm_this_node() > sc->sc_node->nd_num ?
+		     	"connected to" : "accepted connection from",
+		     sc->sc_node->nd_name, sc->sc_node->nd_num, 
 		     NIPQUAD(sc->sc_node->nd_ipv4_address), 
 		     ntohs(sc->sc_node->nd_ipv4_port));
+	}
 
 	/* trigger the connecting worker func as long as we're not valid,
 	 * it will back off if it shouldn't connect.  This can be called
 	 * from node config teardown and so needs to be careful about
 	 * the work queue actually being up. */
 	if (!valid && o2net_wq) {
-		mlog(ML_CONN, "queueing conn with %u attempts\n",
-		     nn->nn_connect_attempts);
-		queue_delayed_work(o2net_wq, &nn->nn_connect_work,
-				   nn->nn_connect_attempts * 2 * HZ);
+		unsigned long delay;
+		/* delay if we're withing a RECONNECT_DELAY of the
+		 * last attempt */
+		delay = (nn->nn_last_connect_attempt + 
+			 msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
+			- jiffies;
+		if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
+			delay = 0;
+		mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay);
+		queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay);
 	}
 
 	/* keep track of the nn's sc ref for the caller */
 	if ((old_sc == NULL) && sc)
-		kref_get(&sc->sc_kref);
+		sc_get(sc);
 	if (old_sc && (old_sc != sc)) {
 		o2net_sc_queue_work(old_sc, &old_sc->sc_shutdown_work);
 		sc_put(old_sc);
@@ -420,6 +498,7 @@
 	state_change = sc->sc_state_change;
 
 	switch(sk->sk_state) {
+		/* ignore connecting sockets as they make progress */
 		case TCP_SYN_SENT: 
 		case TCP_SYN_RECV: 
 			break;
@@ -427,22 +506,29 @@
 			o2net_sc_queue_work(sc, &sc->sc_connect_work);
 			break;
 		default:
+			if (sk->sk_err == ETIMEDOUT)
+				mlog(ML_NOTICE, "connection to node %s num %u "
+				     "at %u.%u.%u.%u:%d has been idle for 10 "
+				     "seconds, shutting it down.\n",
+				     sc->sc_node->nd_name,
+				     sc->sc_node->nd_num,
+				     NIPQUAD(sc->sc_node->nd_ipv4_address), 
+				     ntohs(sc->sc_node->nd_ipv4_port));
 			o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
 			break;
 	}
 out:
 	read_unlock(&sk->sk_callback_lock);
 	state_change(sk);
-}	
+}
 
-
 /*
  * we register callbacks so we can queue work on events before calling
  * the original callbacks.  our callbacks our careful to test user_data
- * to discover when they've reaced with sk_unregister_callbacks().
+ * to discover when they've reaced with o2net_unregister_callbacks().
  */
-static void sk_register_callbacks(struct sock *sk,
-			          struct o2net_sock_container *sc)
+static void o2net_register_callbacks(struct sock *sk,
+				     struct o2net_sock_container *sc)
 {
 	write_lock_bh(&sk->sk_callback_lock);
 
@@ -454,7 +540,7 @@
 
 	BUG_ON(sk->sk_user_data != NULL);
 	sk->sk_user_data = sc;
-	kref_get(&sc->sc_kref);
+	sc_get(sc);
 
 	sc->sc_data_ready = sk->sk_data_ready;
 	sc->sc_state_change = sk->sk_state_change;
@@ -464,7 +550,7 @@
 	write_unlock_bh(&sk->sk_callback_lock);
 }
 
-static int sk_unregister_callbacks(struct sock *sk,
+static int o2net_unregister_callbacks(struct sock *sk,
 			           struct o2net_sock_container *sc)
 {
 	int ret = 0;
@@ -484,14 +570,16 @@
 /* 
  * this is a little helper that is called by callers who have seen a problem
  * with an sc and want to detach it from the nn if someone already hasn't beat
- * them to it..
+ * them to it.  if an error is given then the shutdown will be persistent
+ * and pending transmits will be canceled.
  */
 static void o2net_ensure_shutdown(struct o2net_node *nn,
-			           struct o2net_sock_container *sc)
+			           struct o2net_sock_container *sc,
+				   int err)
 {
 	spin_lock(&nn->nn_lock);
 	if (nn->nn_sc == sc)
-		o2net_set_nn_state(nn, NULL, 0, 0);
+		o2net_set_nn_state(nn, NULL, 0, err);
 	spin_unlock(&nn->nn_lock);
 }
 
@@ -511,14 +599,17 @@
 	sclog(sc, "shutting down\n");
 
 	/* drop the callbacks ref and call shutdown only once */
-	if (sk_unregister_callbacks(sc->sc_sock->sk, sc)) {
-		flush_workqueue(o2net_wq);
+	if (o2net_unregister_callbacks(sc->sc_sock->sk, sc)) {
+		/* we shouldn't flush as we're in the thread, the
+		 * races with pending sc work structs are harmless */
 		sc_put(sc);
 		sc->sc_sock->ops->shutdown(sc->sc_sock,
 					   RCV_SHUTDOWN|SEND_SHUTDOWN);
 	}
 
-	o2net_ensure_shutdown(nn, sc);
+	/* not fatal so failed connects before the other guy has our
+	 * heartbeat can be retried */
+	o2net_ensure_shutdown(nn, sc, 0);
 	sc_put(sc);
 }
 
@@ -996,12 +1087,51 @@
 	return ret;
 }
 
+static int o2net_check_handshake(struct o2net_sock_container *sc)
+{
+	struct o2net_handshake *hand = page_address(sc->sc_page);
+	struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
+
+	hand->protocol_version = be64_to_cpu(hand->protocol_version);
+	hand->connector_id = be64_to_cpu(hand->connector_id);
+
+	if (hand->protocol_version != O2NET_PROTOCOL_VERSION) {
+		mlog(ML_NOTICE, "node %s at %u.%u.%u.%u:%u advertised "
+		     "net protocol version %llu but %llu is required,"
+		     "disconnecting\n", sc->sc_node->nd_name,
+		       NIPQUAD(sc->sc_node->nd_ipv4_address), 
+		       ntohs(sc->sc_node->nd_ipv4_port),
+		       (unsigned long long)hand->protocol_version,
+		       O2NET_PROTOCOL_VERSION);
+
+		/* don't bother reconnecting if its the wrong version. */
+		o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+		return -1;
+	}
+
+	sc->sc_handshake_ok = 1;
+
+	spin_lock(&nn->nn_lock);
+	/* set valid if it hasn't been shutdown already.. */
+	if (nn->nn_sc == sc) 
+		o2net_set_nn_state(nn, sc, 1, 0);
+	spin_unlock(&nn->nn_lock);
+
+	/* shift everything up as though it wasn't there */
+	sc->sc_page_off -= sizeof(struct o2net_handshake);
+	if (sc->sc_page_off)
+		memmove(hand, hand + 1, sc->sc_page_off);
+
+	return 0;
+}
+
 /* this demuxes the queued rx bytes into header or payload bits and calls
- * handlers as each full message is read off the socket */
+ * handlers as each full message is read off the socket.  it returns -error, 
+ * == 0 eof, or > 0 for progress made.*/
 static int o2net_advance_rx(struct o2net_sock_container *sc)
 {
 	o2net_msg *hdr;
-	int ret = 0, made_progress = 0;
+	int ret = 0;
 	void *data;
 	size_t datalen;
 
@@ -1013,8 +1143,16 @@
 		datalen = sizeof(o2net_msg) - sc->sc_page_off;
 		ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
 		if (ret > 0) {
-			made_progress = 1;
 			sc->sc_page_off += ret;
+
+			/* this working relies on the handshake being
+			 * smaller than the normal message header */
+			if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
+			    !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
+				ret = -EPROTO;
+				goto out;
+			}
+
 			/* only swab incoming here.. we can
 			 * only get here once as we cross from
 			 * being under to over */
@@ -1025,7 +1163,7 @@
 					ret = -EOVERFLOW;
 			}
 		}
-		if (ret < 0)
+		if (ret <= 0)
 			goto out;
 	}
 
@@ -1046,32 +1184,30 @@
 		datalen = (sizeof(o2net_msg) + hdr->data_len) -
 			  sc->sc_page_off;
 		ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
-		if (ret > 0) {
-			made_progress = 1;
+		if (ret > 0)
 			sc->sc_page_off += ret;
-		}
-		if (ret < 0)
+		if (ret <= 0)
 			goto out;
 	}
 
 	if (sc->sc_page_off - sizeof(o2net_msg) == hdr->data_len) {
-		/* whooo peee, we have a full message */
-		/* after calling this the message is toast */
+		/* we can only get here once, the first time we read
+		 * the payload.. so set ret to progress if the handler
+		 * works out. after calling this the message is toast */
 		ret = o2net_process_message(sc, hdr);
+		if (ret == 0)
+			ret = 1;
 		sc->sc_page_off = 0;
 	}
 
 out:
-	sclog(sc, "finished receiving: progress %d ret %d\n", made_progress,
-	      ret);
-	if (made_progress)
-		ret = made_progress;
+	sclog(sc, "ret = %d\n", ret);
 	return ret;
 }
 
-/* this work func is triggerd by data ready.  it reads until it can
- * read no more.  if data_ready hits while we're doing our work the work
- * struct will be marked and we'll be called again. */
+/* this work func is triggerd by data ready.  it reads until it can read no
+ * more.  it interprets 0, eof, as fatal.  if data_ready hits while we're doing
+ * our work the work struct will be marked and we'll be called again. */
 static void o2net_rx_until_empty(void *arg)
 {
 	struct o2net_sock_container *sc = arg;
@@ -1081,47 +1217,79 @@
 		ret = o2net_advance_rx(sc);
 	} while (ret > 0);
 
-	if (ret && ret != -EAGAIN) {
+	if (ret <= 0 && ret != -EAGAIN) {
 		struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
 		sclog(sc, "saw error %d, closing\n", ret);
-		o2net_ensure_shutdown(nn, sc);
+		/* not permanent so read failed handshake can retry */
+		o2net_ensure_shutdown(nn, sc, 0);
 	}
 
 	sc_put(sc);
 }
 
-int o2net_set_nodelay(struct socket *sock)
+int o2net_set_options(struct socket *sock)
 {
-	int ret, opt = 1;
+	int ret, i;
 	mm_segment_t oldfs;
+	static struct optpairs {
+		int opt, val;
+	} pairs[] = {
+		{TCP_NODELAY, 1}, 
+		{TCP_KEEPCNT, O2NET_KEEPCNT}, 
+		{TCP_KEEPIDLE, O2NET_KEEPIDLE}, 
+		{TCP_KEEPINTVL, O2NET_KEEPINTVL}, 
+	}; 
 
 	oldfs = get_fs();
 	set_fs(KERNEL_DS);
-	ret = sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY,
-				    (char __user *)&opt, sizeof(opt));
+
+	i = 1;
+	/* SOL_SOCKET is magical */
+	ret = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+			      (char __user *)&i, sizeof(i));
+
+	for (i = 0; ret == 0 && i < ARRAY_SIZE(pairs); i++) {
+		ret = sock->ops->setsockopt(sock, SOL_TCP, pairs[i].opt,
+					    (char __user *)&pairs[i].val,
+					    sizeof(pairs[i].val));
+		if (ret)
+			break;
+	}
+
 	set_fs(oldfs);
-
 	return ret;
 }
 
 /* ------------------------------------------------------------ */
 
-/* state_change saw this sc go into _ESTABLISHED. */
-static void o2net_sc_connect_completed(void *arg)
+/* called when a connect completes and after a sock is accepted.  the
+ * rx path will see the response and mark the sc valid */
+static void o2net_sc_send_handshake(struct o2net_sock_container *sc)
 {
-	struct o2net_sock_container *sc = arg;
 	struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
+	ssize_t ret;
 
-	mlog(ML_CONN, "sc %p for node %s connected\n", sc,
-	     sc->sc_node->nd_name);
+	mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n",
+              (unsigned long long)O2NET_PROTOCOL_VERSION,
+	      (unsigned long long)o2net_hand->connector_id);
 
-	spin_lock(&nn->nn_lock);
-	/* set valid if it hasn't been shutdown already.. */
-	if (nn->nn_sc == sc) {
-		nn->nn_connect_attempts = 0;
-		o2net_set_nn_state(nn, sc, 1, 0);
+	ret = sc->sc_sock->ops->sendpage(sc->sc_sock, 
+					 virt_to_page(o2net_hand),
+					 (long)o2net_hand & ~PAGE_MASK,
+					 sizeof(*o2net_hand), MSG_DONTWAIT);
+	if (ret != sizeof(*o2net_hand)) {
+		if (ret >= 0)
+			ret = -EBADE;
+		mlog(ML_CONN, "sendpage failed with %zu\n", ret);
+		o2net_ensure_shutdown(nn, sc, 0);
 	}
-	spin_unlock(&nn->nn_lock);
+}
+
+static void o2net_sc_connect_completed(void *arg)
+{
+	struct o2net_sock_container *sc = arg;
+
+	o2net_sc_send_handshake(sc);
 	sc_put(sc);
 }
 
@@ -1154,21 +1322,12 @@
 	/* see if we already have one pending or have given up */
 	if (nn->nn_sc || nn->nn_persistent_error)
 		arg = NULL;
-	else if (nn->nn_connect_attempts == O2NET_MAX_CONNECT_ATTEMPTS) {
-		mlog(ML_NOTICE, "%u connections to node %u [%s] have failed "
-		     "since it started heartbeating.  No more connections "
-		     "will be initiated.  Network communication attempts will "
-		     "return errors.\n", nn->nn_connect_attempts,
-		     node->nd_num, node->nd_name);
-
-		o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
-		arg = NULL;
-	} else
-		nn->nn_connect_attempts++;
 	spin_unlock(&nn->nn_lock);
 	if (arg == NULL) /* *shrug*, needed some indicator */
 		goto out;
 
+	nn->nn_last_connect_attempt = jiffies;
+
 	sc = sc_alloc(node);
 	if (sc == NULL) {
 		mlog(0, "couldn't allocate sc\n");
@@ -1193,18 +1352,18 @@
 		goto out;
 	}
 	
-	o2net_set_nodelay(sc->sc_sock);
-	sk_register_callbacks(sc->sc_sock->sk, sc);
+	o2net_set_options(sc->sc_sock);
+	o2net_register_callbacks(sc->sc_sock->sk, sc);
 
 	spin_lock(&nn->nn_lock);
-	/* connect completion will set nn->nn_sc_valid */
+	/* handshake completion will set nn->nn_sc_valid */
 	o2net_set_nn_state(nn, sc, 0, 0);
 	spin_unlock(&nn->nn_lock);
 
 	remoteaddr.sin_family = AF_INET;
 	remoteaddr.sin_addr.s_addr = node->nd_ipv4_address;
 	remoteaddr.sin_port = node->nd_ipv4_port;
-
+	
 	ret = sc->sc_sock->ops->connect(sc->sc_sock,
 					(struct sockaddr *)&remoteaddr, 
 					sizeof(remoteaddr),
@@ -1213,11 +1372,16 @@
 		ret = 0;
 
 out:
-	mlog(ML_CONN, "finished with %d\n", ret);
 	if (ret) {
-		/* XXX log? */
+		mlog(ML_NOTICE, "connect attempt to node %s num %u at "
+		     "%u.%u.%u.%u:%d failed with errno %d\n",
+		     sc->sc_node->nd_name, sc->sc_node->nd_num, 
+		     NIPQUAD(sc->sc_node->nd_ipv4_address), 
+		     ntohs(sc->sc_node->nd_ipv4_port), ret);
+		/* 0 err so that another will be queued and attempted
+		 * from set_nn_state */
 		if (sc)
-			o2net_ensure_shutdown(nn, sc);
+			o2net_ensure_shutdown(nn, sc, 0);
 	}
 	if (sc)
 		sc_put(sc);
@@ -1227,27 +1391,130 @@
 	return;
 }
 
+static void o2net_connect_expired(void *arg)
+{
+	struct o2net_node *nn = arg;
+
+	spin_lock(&nn->nn_lock);
+	if (!nn->nn_sc_valid) {
+		mlog(ML_ERROR, "no connection established with node %u after "
+		     "%u seconds, giving up and returning errors.\n",
+		     o2net_num_from_nn(nn), O2NET_CONN_IDLE_SECS);
+
+		o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
+	}
+	spin_unlock(&nn->nn_lock);
+}
+
 /* ------------------------------------------------------------ */
 
+/* this work func is queued whenever a disconnection makes it such that
+ * we have half or fewer nodes connected than we do heartbeating.  if by
+ * the time we get here we still have that imbalance we decide that we
+ * cannot reach the majority of the cluster and cut ourselves off.  this
+ * work is delayed so that heartbeat has a chance to notice dead nodes
+ * and account for them in the heartbeating count before getting here */
+static void o2net_check_quorum(void *arg)
+{
+	int quorum;
+	unsigned long heartbeating[BITS_TO_LONGS(O2NM_MAX_NODES)];
+	int lowest_hb, lowest_reachable = 0, fence = 0;
+
+	/* usually _callback would be used from a hb callback to avoid
+	 * deadlocking on the hb_callback_sem.  we might as well be
+	 * in a callback sem.  our hb callback func will flush the work
+	 * queue until we're finished and it holds the callback sem.. */
+	o2hb_fill_node_map_from_callback(heartbeating, sizeof(heartbeating));
+	lowest_hb = find_first_bit(heartbeating, O2NM_MAX_NODES);
+	if (lowest_hb != O2NM_MAX_NODES)
+		lowest_reachable = test_bit(lowest_hb, o2net_connected_bitmap);
+
+	mlog(ML_QUORUM, "heartbeating: %u, connected: %u, "
+	     "lowest: %u (%sreachable)\n", o2net_heartbeating_nodes,
+	     o2net_connected_nodes, lowest_hb,
+	     lowest_reachable ? "" : "un");
+
+	if (!o2hb_check_local_node_heartbeating_from_callback() ||
+	    o2net_heartbeating_nodes == 1)
+		goto out;
+
+	if (o2net_heartbeating_nodes & 1) {
+		/* the odd numbered cluster case is straight forward --
+		 * if we can't talk to the majority we're hosed */
+		quorum = (o2net_heartbeating_nodes + 1)/2;
+		if (o2net_connected_nodes < quorum) {
+			mlog(ML_ERROR, "fencing this node because it is "
+			     "only connected to %u nodes and %u is needed "
+			     "to make a quorum out of %u heartbeating nodes\n",
+			     o2net_connected_nodes, quorum, 
+			     o2net_heartbeating_nodes);
+			fence = 1;
+		}
+	} else {
+		/* the even numbered cluster adds the possibility of each half
+		 * of the cluster being able to talk amongst themselves.. in
+		 * that case we're hosed if we can't talk to the group that has
+		 * the lowest numbered node */
+		quorum = o2net_heartbeating_nodes / 2;
+		if (o2net_connected_nodes < quorum) {
+			mlog(ML_ERROR, "fencing this node because it is "
+			     "only connected to %u nodes and %u is needed "
+			     "to make a quorum out of %u heartbeating nodes\n",
+			     o2net_connected_nodes, quorum, 
+			     o2net_heartbeating_nodes);
+		}
+		else if ((o2net_connected_nodes == quorum) && 
+			 !lowest_reachable) {
+			mlog(ML_ERROR, "fencing this node because it is "
+			     "connected to a half-quorum of %u out of %u "
+			     "nodes which doesn't include the lowest active "
+			     "node %u\n", quorum, o2net_heartbeating_nodes,
+			     lowest_hb);
+			fence = 1;
+		}
+	}
+	
+out:
+	/* this is horribly heavy-handed.  It should instead flip the file
+	 * system RO and call some userspace script */
+	if (fence) {
+		/* panic spins with interrupts enabled.  with preempt
+		 * threads can still schedule, etc, etc */
+		o2hb_stop_all_regions();
+		panic("ocfs2 is very sorry to be fencing this system by "
+		      "panicing\n");
+	}
+}
+
 void o2net_disconnect_node(struct o2nm_node *node)
 {
 	struct o2net_node *nn = o2net_nn_from_num(node->nd_num);
 
-	/* don't bother trying to connect until it's heartbeating again */ 
+	/* don't reconnecting until it's heartbeating again */ 
 	spin_lock(&nn->nn_lock);
-	nn->nn_connect_attempts = O2NET_MAX_CONNECT_ATTEMPTS;
-	o2net_set_nn_state(nn, NULL, 0, -EINVAL);
-	cancel_delayed_work(&nn->nn_connect_work);
+	o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
 	spin_unlock(&nn->nn_lock);
 
-	if (o2net_wq)
+	if (o2net_wq) {
+		cancel_delayed_work(&nn->nn_connect_work);
 		flush_workqueue(o2net_wq);
+	}
 }
 
 static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
 				  void *data)
 {
-	o2net_disconnect_node(node);
+	struct o2net_node *nn = o2net_nn_from_num(node_num);
+
+	o2net_heartbeating_nodes--;
+	mlog(ML_QUORUM, "node %u, now %u heartbeating\n", node_num,
+	     o2net_heartbeating_nodes);
+
+	if (node_num != o2nm_this_node()) {
+		cancel_delayed_work(&nn->nn_connect_expired);
+		o2net_disconnect_node(node);
+	} else 
+		cancel_delayed_work(&o2net_quorum_work);
 }
 
 static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
@@ -1255,11 +1522,30 @@
 {
 	struct o2net_node *nn = o2net_nn_from_num(node_num);
 
-	/* ok, we can try to connect again */
-	spin_lock(&nn->nn_lock);
-	nn->nn_connect_attempts = 0;
-	o2net_set_nn_state(nn, NULL, 0, 0);
-	spin_unlock(&nn->nn_lock);
+	o2net_heartbeating_nodes++;
+	mlog(ML_QUORUM, "node %u, now %u heartbeating\n", node_num,
+	     o2net_heartbeating_nodes);
+
+	/* ensure an immediate connect attempt */
+	nn->nn_last_connect_attempt = jiffies - 
+		(msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1);
+
+	if (node_num != o2nm_this_node()) {
+		/* heartbeat doesn't work unless a local node number is
+		 * configured and doing so brings up the o2net_wq, so we can
+		 * use it.. */
+		queue_delayed_work(o2net_wq, &nn->nn_connect_expired,
+				   O2NET_CONN_IDLE_SECS * HZ);
+
+		/* believe it or not, accept and node hearbeating testing
+		 * can succeed for this node before we got here.. so
+		 * only use set_nn_state to clear the persistent error
+		 * if that hasn't already happened */
+		spin_lock(&nn->nn_lock);
+		if (nn->nn_persistent_error)
+			o2net_set_nn_state(nn, NULL, 0, 0);
+		spin_unlock(&nn->nn_lock);
+	}
 }
 
 void o2net_unregister_hb_callbacks(void)
@@ -1335,7 +1621,7 @@
 	if (ret < 0)
 		goto out;
 
-	o2net_set_nodelay(new_sock);
+	o2net_set_options(new_sock);
 
 	slen = sizeof(sin);
 	ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,
@@ -1361,8 +1647,10 @@
 		goto out;
 	}
 
-	if (!o2hb_check_node_heartbeating(node->nd_num)) {
-		mlog(ML_NOTICE, "attempt to connect from node '%s' at "
+	/* this happens all the time when the other node sees our heartbeat
+	 * and tries to connect before we see their heartbeat */
+	if (!o2hb_check_node_heartbeating_from_callback(node->nd_num)) {
+		mlog(ML_CONN, "attempt to connect from node '%s' at "
 		     "%u.%u.%u.%u:%d but it isn't heartbeating\n",
 		     node->nd_name, NIPQUAD(sin.sin_addr.s_addr), 
 		     ntohs(sin.sin_port));
@@ -1396,12 +1684,14 @@
 	new_sock = NULL;
 
 	spin_lock(&nn->nn_lock);
-	o2net_set_nn_state(nn, sc, 1, 0);
+	o2net_set_nn_state(nn, sc, 0, 0);
 	spin_unlock(&nn->nn_lock);
 
-	sk_register_callbacks(sc->sc_sock->sk, sc);
+	o2net_register_callbacks(sc->sc_sock->sk, sc);
 	o2net_sc_queue_work(sc, &sc->sc_rx_work);
 
+	o2net_sc_send_handshake(sc);
+
 out:
 	if (new_sock)
 		sock_release(new_sock);
@@ -1436,7 +1726,7 @@
 	 * socket */
 	if (sk->sk_state == TCP_LISTEN) {
 		mlog(ML_TCP, "bytes: %d\n", bytes);
-		schedule_work(&o2net_listen_work);
+		queue_work(o2net_wq, &o2net_listen_work);
 	}
 
 out:
@@ -1494,9 +1784,11 @@
 /*
  * called from node manager when we should bring up our network listening
  * socket.  node manager handles all the serialization to only call this
- * once and to match it with o2net_stop_listening().
+ * once and to match it with o2net_stop_listening().  note,
+ * o2nm_this_node() doesn't work yet as we're being called while it
+ * is being set up.
  */
-int o2net_start_listening(u16 port)
+int o2net_start_listening(struct o2nm_node *node)
 {
 	int ret = 0;
 
@@ -1510,15 +1802,19 @@
 		return -ENOMEM; /* ? */
 	}
 
-	ret = o2net_open_listening_sock(port);
+	ret = o2net_open_listening_sock(node->nd_ipv4_port);
 	if (ret) {
 		destroy_workqueue(o2net_wq);
 		o2net_wq = NULL;
-	}
+	} else 
+		o2net_mod_connected_nodes(node->nd_num, 1);
+
 	return ret;
 }
 
-void o2net_stop_listening(void)
+/* again, o2nm_this_node() doesn't work here as we're involved in
+ * tearing it down */
+void o2net_stop_listening(struct o2nm_node *node)
 {
 	struct socket *sock = o2net_listen_sock;
 	size_t i;
@@ -1542,26 +1838,39 @@
 	
 	/* finish all work and tear down the work queue */  
 	mlog(ML_KTHREAD, "waiting for o2net thread to exit....\n");
+	cancel_delayed_work(&o2net_quorum_work);
 	destroy_workqueue(o2net_wq);
 	o2net_wq = NULL;
 
 	sock_release(o2net_listen_sock);
 	o2net_listen_sock = NULL;
+
+	o2net_mod_connected_nodes(node->nd_num, -1);
 }
 
 /* ------------------------------------------------------------ */
 
-int __init o2net_init(void)
+int o2net_init(void)
 {
 	unsigned long i;
 
+	o2net_hand = kcalloc(1, sizeof(struct o2net_handshake), GFP_KERNEL);
+	if (o2net_hand == NULL)
+		return -ENOMEM;
+
+	o2net_hand->protocol_version = cpu_to_be64(O2NET_PROTOCOL_VERSION);
+	o2net_hand->connector_id = cpu_to_be64(1);
+
+	INIT_WORK(&o2net_quorum_work, o2net_check_quorum, NULL);
+
 	for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {
 		struct o2net_node *nn = o2net_nn_from_num(i);
 
 		spin_lock_init(&nn->nn_lock);
 		INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn);
+		INIT_WORK(&nn->nn_connect_expired, o2net_connect_expired, nn);
 		/* until we see hb from a node we'll return einval */
-		nn->nn_persistent_error = -EINVAL;
+		nn->nn_persistent_error = -ENOTCONN;
 		init_waitqueue_head(&nn->nn_sc_wq);
 		idr_init(&nn->nn_status_idr);
 		INIT_LIST_HEAD(&nn->nn_status_list);
@@ -1569,3 +1878,8 @@
 
 	return 0;
 }
+
+void o2net_exit(void)
+{
+	kfree(o2net_hand);
+}

Modified: trunk/fs/ocfs2/cluster/tcp.h
===================================================================
--- trunk/fs/ocfs2/cluster/tcp.h	2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/tcp.h	2005-06-24 01:43:58 UTC (rev 2422)
@@ -37,14 +37,6 @@
 #include <linux/inet.h>
 #include <linux/in.h>
 
-/*
- * FIXME: no need for nodemanager.h except for
- * NM_MAX_NAME_LEN...shouldn't that be something or somewhere else?
- */
-#include "nodemanager.h"
-
-#define O2NET_MSG_MAGIC           ((u16)0xfa55)
-#define O2NET_MSG_STATUS_MAGIC    ((u16)0xfa56)
 typedef struct _o2net_msg
 {
 	__u16 magic;
@@ -58,36 +50,10 @@
 	__u8  buf[0];
 } o2net_msg;
 
-static inline void o2net_msg_to_net(o2net_msg *m)
-{
-	m->magic = htons(m->magic);
-	m->data_len = htons(m->data_len);
-	m->msg_type = htons(m->msg_type);
-	m->sys_status = htonl(m->sys_status);
-	m->status = htonl(m->status);
-	m->key = htonl(m->key);
-	m->msg_num = htonl(m->msg_num);
-}
-
-static inline void o2net_msg_to_host(o2net_msg *m)
-{
-	m->magic = ntohs(m->magic);
-	m->data_len = ntohs(m->data_len);
-	m->msg_type = ntohs(m->msg_type);
-	m->sys_status = ntohl(m->sys_status);
-	m->status = ntohl(m->status);
-	m->key = ntohl(m->key);
-	m->msg_num = ntohl(m->msg_num);
-}
-
 typedef int (o2net_msg_handler_func)(o2net_msg *msg, u32 len, void *data);
 
 #define O2NET_MAX_PAYLOAD_BYTES  (4096 - sizeof(o2net_msg))
 
-/* RESERVED */
-#define O2NET_ALREADY_CONNECTED   (0xfff0)
-#define O2NET_UNKNOWN_HOST        (0xfff1)
-
 /* TODO: figure this out.... */
 static inline int o2net_link_down(int err, struct socket *sock)
 {
@@ -132,13 +98,15 @@
 			   struct list_head *unreg_list);
 void o2net_unregister_handler_list(struct list_head *list);
 
+struct o2nm_node;
 int o2net_register_hb_callbacks(void);
 void o2net_unregister_hb_callbacks(void);
-int o2net_start_listening(u16 port);
-void o2net_stop_listening(void);
+int o2net_start_listening(struct o2nm_node *node);
+void o2net_stop_listening(struct o2nm_node *node);
 void o2net_disconnect_node(struct o2nm_node *node);
 
-int __init o2net_init(void);
+int o2net_init(void);
+void o2net_exit(void);
 int o2net_proc_init(struct proc_dir_entry *parent);
 void o2net_proc_exit(struct proc_dir_entry *parent);
 

Modified: trunk/fs/ocfs2/cluster/tcp_internal.h
===================================================================
--- trunk/fs/ocfs2/cluster/tcp_internal.h	2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/tcp_internal.h	2005-06-24 01:43:58 UTC (rev 2422)
@@ -24,6 +24,31 @@
 
 #define O2NET_MAX_CONNECT_ATTEMPTS	5
 
+#define O2NET_MSG_MAGIC           ((u16)0xfa55)
+#define O2NET_MSG_STATUS_MAGIC    ((u16)0xfa56)
+
+/* same as hb delay, we're waiting for another node to recognize our hb */
+#define O2NET_RECONNECT_DELAY_MS	O2HB_REGION_TIMEOUT_MS
+
+/* we're delaying our quorum decision so that heartbeat will have timed
+ * out truly dead nodes by the time we come around to making decisions
+ * on their number */
+#define O2NET_QUORUM_DELAY_MS	((O2HB_DEAD_THRESHOLD + 2) * O2HB_REGION_TIMEOUT_MS)
+/* send 5 keepalives every 1 second after 5 seconds of idle.  this 
+ * is *so short* because we have to wait for quorum then wait for hb
+ * timeouts again and have recovery complete all within 45 seconds */
+#define O2NET_KEEPCNT		(5)
+#define O2NET_KEEPIDLE		(5)
+#define O2NET_KEEPINTVL		(1)
+#define O2NET_CONN_IDLE_SECS	(O2NET_KEEPIDLE + \
+					(O2NET_KEEPCNT * O2NET_KEEPINTVL))
+
+#define O2NET_PROTOCOL_VERSION 1ULL
+struct o2net_handshake {
+	u64	protocol_version;
+	u64	connector_id;
+};
+
 struct o2net_node {
 	/* this is never called from int/bh */
 	spinlock_t			nn_lock;
@@ -35,7 +60,6 @@
 	/* if this is set tx just returns it */
 	int				nn_persistent_error;
 
-
 	/* threads waiting for an sc to arrive wait on the wq for generation
 	 * to increase.  it is increased when a connecting socket succeeds
 	 * or fails or when an accepted socket is attached. */
@@ -44,8 +68,20 @@
 	struct idr			nn_status_idr;
 	struct list_head		nn_status_list;
 
+	/* connects are attempted from when heartbeat comes up until either hb
+	 * goes down, the node is unconfigured, no connect attempts succeed
+	 * before O2NET_CONN_IDLE_DELAY, or a connect succeeds.  connect_work
+	 * is queued from set_nn_state both from hb up and from itself if a
+	 * connect attempt fails and so can be self-arming.  shutdown is
+	 * careful to first mark the nn such that no connects will be attempted
+	 * before canceling delayed connect work and flushing the queue. */
 	struct work_struct		nn_connect_work;
-	unsigned			nn_connect_attempts;
+	unsigned long			nn_last_connect_attempt;
+
+	/* this is queued as nodes come up and is canceled when a connection is
+	 * established.  this expiring gives up on the node and errors out
+	 * transmits */
+	struct work_struct		nn_connect_expired;
 };
 
 struct o2net_sock_container {
@@ -54,10 +90,30 @@
 	struct socket		*sc_sock;
 	struct o2nm_node	*sc_node;
 
+	/* all of these sc work structs hold refs on the sc while they are
+	 * queued.  they should not be able to ref a freed sc.  the teardown
+	 * race is with o2net_wq destruction in o2net_stop_listening() */
+
+	/* rx and connect work are generated from socket callbacks.  sc
+	 * shutdown removes the callbacks and then flushes the work queue */
 	struct work_struct	sc_rx_work;
+	struct work_struct	sc_connect_work;
+	/* shutdown work is triggered in two ways.  the simple way is
+	 * for a code path calls ensure_shutdown which gets a lock, removes
+	 * the sc from the nn, and queues the work.  in this case the
+	 * work is single-shot.  the work is also queued from a sock
+	 * callback, though, and in this case the work will find the sc
+	 * still on the nn and will call ensure_shutdown itself.. this
+	 * ends up triggering the shutdown work again, though nothing
+	 * will be done in that second iteration.  so work queue teardown
+	 * has to be careful to remove the sc from the nn before waiting
+	 * on the work queue so that the shutdown work doesn't remove the
+	 * sc and rearm itself.
+	 */
 	struct work_struct	sc_shutdown_work;
-	struct work_struct	sc_connect_work;
 
+	unsigned		sc_handshake_ok:1;
+
 	struct page 		*sc_page;
 	size_t			sc_page_off;
 

Modified: trunk/fs/ocfs2/dlm/dlmdomain.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmdomain.c	2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/dlm/dlmdomain.c	2005-06-24 01:43:58 UTC (rev 2422)
@@ -752,7 +752,7 @@
 
 	status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
 				    sizeof(join_msg), node, &retval);
-	if (status < 0 && status != -ENOPROTOOPT && status != -ENOTCONN) {
+	if (status < 0 && status != -ENOPROTOOPT) {
 		mlog_errno(status);
 		goto bail;
 	}
@@ -760,12 +760,8 @@
 	/* -ENOPROTOOPT from the net code means the other side isn't
 	    listening for our message type -- that's fine, it means
 	    his dlm isn't up, so we can consider him a 'yes' but not
-	    joined into the domain. 
-	   -ENOTCONN is treated similarly -- it's returned from the
-	    core kernel net code however and indicates that they don't
-	    even have their cluster networking module loaded (bad
-	    user!) */
-	if (status == -ENOPROTOOPT || status == -ENOTCONN) {
+	    joined into the domain.  */
+	if (status == -ENOPROTOOPT) {
 		status = 0;
 		*response = JOIN_OK_NO_MAP;
 	} else if (retval == JOIN_DISALLOW ||



More information about the Ocfs2-commits mailing list