[Ocfs2-commits] zab commits r2404 - trunk/fs/ocfs2/cluster

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Fri Jun 17 14:11:40 CDT 2005


Author: zab
Signed-off-by: mfasheh
Date: 2005-06-17 14:11:38 -0500 (Fri, 17 Jun 2005)
New Revision: 2404

Modified:
   trunk/fs/ocfs2/cluster/heartbeat.c
   trunk/fs/ocfs2/cluster/heartbeat.h
   trunk/fs/ocfs2/cluster/masklog.c
   trunk/fs/ocfs2/cluster/masklog.h
   trunk/fs/ocfs2/cluster/net_proc.c
   trunk/fs/ocfs2/cluster/nodemanager.c
   trunk/fs/ocfs2/cluster/nodemanager.h
   trunk/fs/ocfs2/cluster/tcp.c
   trunk/fs/ocfs2/cluster/tcp.h
   trunk/fs/ocfs2/cluster/tcp_internal.h
Log:
rework connection maintenance to eliminate racing connects and allow for
keepalives and timeouts in the immediate future.

o in a pair of nodes the higher number connects, no more concurrent connect
o connections are triggered by hb, not message sending
o connection attempts are retried a few times before giving up
o drive accept from data_ready instead of sock->sk->sk_sleep
o replace sketchy thread/lock/lists with a work queue and work structs
o move net state from o2nm_node to a static o2net_node array
o have rx spin until eagain instead of rearming
o lose o2net_sock_drain, sock_release() drains the rx queue
o disconnect and error out transmits if listening is stopped in configfs

Signed-off-by: mfasheh



Modified: trunk/fs/ocfs2/cluster/heartbeat.c
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.c	2005-06-17 05:57:53 UTC (rev 2403)
+++ trunk/fs/ocfs2/cluster/heartbeat.c	2005-06-17 19:11:38 UTC (rev 2404)
@@ -1416,11 +1416,26 @@
 }
 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
 
+int o2hb_check_node_heartbeating(u8 node_num)
+{
+	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+
+	o2hb_fill_node_map(testing_map, sizeof(testing_map));
+	if (!test_bit(node_num, testing_map)) {
+		mlog(ML_HEARTBEAT,
+		     "node (%u) does not have heartbeating enabled.\n",
+		     node_num);
+		return 0;
+	}
+
+	return 1;
+}
+EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
+
 /* Makes sure our local node is configured with a node number, and is
  * heartbeating. */
 int o2hb_check_local_node_heartbeating(void)
 {
-	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	u8 node_num;
 
 	/* if this node was set then we have networking */
@@ -1430,15 +1445,6 @@
 		return 0;
 	}
 
-	o2hb_fill_node_map(testing_map, sizeof(testing_map));
-
-	if (!test_bit(node_num, testing_map)) {
-		mlog(ML_HEARTBEAT,
-		     "this node (%u) does not have heartbeating enabled.\n",
-		     node_num);
-		return 0;
-	}
-
-	return 1;
+	return o2hb_check_node_heartbeating(node_num);
 }
 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);

Modified: trunk/fs/ocfs2/cluster/heartbeat.h
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.h	2005-06-17 05:57:53 UTC (rev 2403)
+++ trunk/fs/ocfs2/cluster/heartbeat.h	2005-06-17 19:11:38 UTC (rev 2404)
@@ -63,6 +63,7 @@
 void o2hb_fill_node_map(unsigned long *map,
 			unsigned bytes);
 void o2hb_init(void);
+int o2hb_check_node_heartbeating(u8 node_num);
 int o2hb_check_local_node_heartbeating(void);
 
 #endif /* O2CLUSTER_HEARTBEAT_H */

Modified: trunk/fs/ocfs2/cluster/masklog.c
===================================================================
--- trunk/fs/ocfs2/cluster/masklog.c	2005-06-17 05:57:53 UTC (rev 2403)
+++ trunk/fs/ocfs2/cluster/masklog.c	2005-06-17 19:11:38 UTC (rev 2404)
@@ -209,6 +209,7 @@
 	set_a_string(INODE);
 	set_a_string(VOTE);
 	set_a_string(DCACHE);
+	set_a_string(CONN);
 	set_a_string(ERROR);
 	set_a_string(NOTICE);
 	set_a_string(KTHREAD);

Modified: trunk/fs/ocfs2/cluster/masklog.h
===================================================================
--- trunk/fs/ocfs2/cluster/masklog.h	2005-06-17 05:57:53 UTC (rev 2403)
+++ trunk/fs/ocfs2/cluster/masklog.h	2005-06-17 19:11:38 UTC (rev 2404)
@@ -82,12 +82,12 @@
 /* NOTE: If you add a flag, you need to also update mlog.c! */
 #define ML_ENTRY	0x0000000000000001ULL /* func call entry */
 #define ML_EXIT		0x0000000000000002ULL /* func call exit */
-#define ML_TCP		0x0000000000000004ULL /* cluster/tcp.c */
-#define ML_MSG		0x0000000000000008ULL /* network messages */
-#define ML_SOCKET	0x0000000000000010ULL /* socket lifetime */
-#define ML_HEARTBEAT	0x0000000000000020ULL /* cluster heartbeat */
-#define ML_HB_BIO	0x0000000000000040ULL /* heartbaet io tracing */
-#define ML_DLMFS	0x0000000000000080ULL /* ocfs2_dlmfs */
+#define ML_TCP		0x0000000000000004ULL /* net cluster/tcp.c */
+#define ML_MSG		0x0000000000000008ULL /* net network messages */
+#define ML_SOCKET	0x0000000000000010ULL /* net socket lifetime */
+#define ML_HEARTBEAT	0x0000000000000020ULL /* hb all heartbeat tracking */
+#define ML_HB_BIO	0x0000000000000040ULL /* hb io tracing */
+#define ML_DLMFS	0x0000000000000080ULL /* dlm user dlmfs */
 #define ML_DLM		0x0000000000000100ULL /* dlm general debugging */
 #define ML_DLM_DOMAIN	0x0000000000000200ULL /* dlm domain debugging */
 #define ML_DLM_THREAD	0x0000000000000400ULL /* dlm domain thread */
@@ -106,6 +106,7 @@
 #define ML_INODE	0x0000000000800000ULL /* ocfs2 inode manipulation */
 #define ML_VOTE		0x0000000001000000ULL /* ocfs2 node messaging  */
 #define ML_DCACHE	0x0000000002000000ULL /* ocfs2 dcache operations */
+#define ML_CONN		0x0000000004000000ULL /* net connection management */
 /* bits that are infrequently given and frequently matched in the high word */
 #define ML_ERROR	0x0000000100000000ULL /* sent to KERN_ERR */
 #define ML_NOTICE	0x0000000200000000ULL /* setn to KERN_NOTICE */

Modified: trunk/fs/ocfs2/cluster/net_proc.c
===================================================================
--- trunk/fs/ocfs2/cluster/net_proc.c	2005-06-17 05:57:53 UTC (rev 2403)
+++ trunk/fs/ocfs2/cluster/net_proc.c	2005-06-17 19:11:38 UTC (rev 2404)
@@ -114,7 +114,7 @@
 			   "  pid:          %lu\n"
 			   "  tgid:         %lu\n"
 			   "  process name: %s\n"
-			   "  node:         %s\n"
+			   "  node:         %u\n"
 			   "  sc:           %p\n"
 			   "  message type: %u\n"
 			   "  message key:  0x%08x\n"
@@ -123,7 +123,7 @@
 			   "  wait start:   %lu.%lu\n",
 			   nst, (unsigned long)nst->st_task->pid,
 			   (unsigned long)nst->st_task->tgid,
-			   nst->st_task->comm, nst->st_node->nd_name,
+			   nst->st_task->comm, nst->st_node,
 			   nst->st_sc, nst->st_msg_type, nst->st_msg_key,
 			   nst->st_sock_time.tv_sec, nst->st_sock_time.tv_usec,
 			   nst->st_send_time.tv_sec, nst->st_send_time.tv_usec,
@@ -277,16 +277,11 @@
 			   "  krefs:           %d\n"
 			   "  sock:            %u.%u.%u.%u:%u -> %u.%u.%u.%u:%u\n"
 			   "  remote node:     %s\n"
-			   "  from connect:    %u\n"
-			   "  pending connect: %u\n"
-			   "  _item on list:   %u\n"
 			   "  page off:        %zu\n",
 			   sc, atomic_read(&sc->sc_kref.refcount),
 			   NIPQUAD(saddr), inet ? ntohs(inet->sport) : 0,
 			   NIPQUAD(daddr), inet ? ntohs(inet->dport) : 0,
-			   sc->sc_node ? sc->sc_node->nd_name : "[detached]",
-			   sc->sc_from_connect, sc->sc_pending_connect,
-			   !list_empty(&sc->sc_item), sc->sc_page_off);
+			   sc->sc_node->nd_name, sc->sc_page_off);
 	}
 
 

Modified: trunk/fs/ocfs2/cluster/nodemanager.c
===================================================================
--- trunk/fs/ocfs2/cluster/nodemanager.c	2005-06-17 05:57:53 UTC (rev 2403)
+++ trunk/fs/ocfs2/cluster/nodemanager.c	2005-06-17 19:11:38 UTC (rev 2404)
@@ -391,17 +391,16 @@
 	    cluster->cl_local_node != node->nd_num)
 		return -EBUSY;
 
-	/* bring up the rx thread if we're setting the new local
-	 * node.  XXX make sure port/addr are set */
+	/* bring up the rx thread if we're setting the new local node. */
 	if (tmp && !cluster->cl_has_local) {
-		ret = o2net_start_rx_thread(node);
+		ret = o2net_start_listening(node->nd_ipv4_port);
 		if (ret)
 			return ret;
 	}
 
 	if (!tmp && cluster->cl_has_local &&
 	    cluster->cl_local_node == node->nd_num) {
-		o2net_stop_rx_thread(node);
+		o2net_stop_listening();
 		cluster->cl_local_node = 0;
 	}
 
@@ -556,11 +555,6 @@
 	strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
 	config_item_init_type_name(&node->nd_item, name, &o2nm_node_type);
 	spin_lock_init(&node->nd_lock);
-	atomic_set(&node->nd_pending_connects, 0);
-	atomic_set(&node->nd_sc_generation, 0);
-	init_waitqueue_head(&node->nd_sc_wq);
-	idr_init(&node->nd_status_idr);
-	INIT_LIST_HEAD(&node->nd_status_list);
 
 	ret = &node->nd_item;
 
@@ -577,13 +571,13 @@
 	struct o2nm_node *node = to_o2nm_node(item);
 	struct o2nm_cluster *cluster = to_o2nm_cluster(group->cg_item.ci_parent);
 
-	o2net_detach_sc(NULL, node);
+	o2net_disconnect_node(node);
 
 	if (cluster->cl_has_local &&
 	    (cluster->cl_local_node == node->nd_num)) {
 		cluster->cl_has_local = 0;
 		cluster->cl_local_node = O2NM_MAX_NODES;
-		o2net_stop_rx_thread(node);
+		o2net_stop_listening();
 	}
 
 	/* XXX call into net to stop this node from trading messages */
@@ -804,6 +798,7 @@
 	cluster_print_version();
 
 	o2hb_init();
+	o2net_init();
 
 	ocfs2_table_header = register_sysctl_table(ocfs2_root_table, 0);
 	if (!ocfs2_table_header) {

Modified: trunk/fs/ocfs2/cluster/nodemanager.h
===================================================================
--- trunk/fs/ocfs2/cluster/nodemanager.h	2005-06-17 05:57:53 UTC (rev 2403)
+++ trunk/fs/ocfs2/cluster/nodemanager.h	2005-06-17 19:11:38 UTC (rev 2404)
@@ -52,21 +52,6 @@
 	int			nd_local;
 
 	unsigned long		nd_set_attributes;
-
-	/* only used by the local node. */
-	struct task_struct	*nd_rx_thread;
-
-	/* protected by nd_lock.  It isn't so hot that we have all these
-	 * net-private things out here in nodemanager.h. */
-	struct o2net_sock_container *nd_sc;
-	/* threads waiting for an sc to arrive wait on the wq for generation
-	 * to increase.  it is increased when a connecting socket succeeds
-	 * or fails or when an accepted socket is attached. */
-	atomic_t		nd_pending_connects;
-	atomic_t		nd_sc_generation;
-	wait_queue_head_t	nd_sc_wq;
-	struct idr		nd_status_idr;
-	struct list_head	nd_status_list;
 };
 
 u8 o2nm_this_node(void);

Modified: trunk/fs/ocfs2/cluster/tcp.c
===================================================================
--- trunk/fs/ocfs2/cluster/tcp.c	2005-06-17 05:57:53 UTC (rev 2403)
+++ trunk/fs/ocfs2/cluster/tcp.c	2005-06-17 19:11:38 UTC (rev 2404)
@@ -40,42 +40,25 @@
  * Any framing errors (bad magic, large payload lengths) close a connection.
  *
  * Our sock_container holds the state we associate with a socket.  It's current
- * framing state is held there as well as the refcounting we do around when
- * it is safe to tear down the socket.
- * 
- * A caller who wants to communicate with a node gets a ref to the sock
- * container by finding it the o2nm_node that it wants to communicate with.
- * The sock container will only be found on the node once there is a valid
- * socket in the container.  The socket is only finally torn down from
- * the container when the container loses all of its references -- so as
- * long as you hold a ref on the container you can trust that the socket
- * is valid for use with kernel socket APIs.
+ * framing state is held there as well as the refcounting we do around when it
+ * is safe to tear down the socket.  The socket is only finally torn down from
+ * the container when the container loses all of its references -- so as long
+ * as you hold a ref on the container you can trust that the socket is valid
+ * for use with kernel socket APIs.
  *
- * Usually blocking paths get and drop refs on the container, which is easy.
- * The exception to this are the socket callbacks.  They push their work
- * back up to the rx thread who takes it from there. 
+ * Connections are initiated between a pair of nodes when the node with the
+ * higher node number gets a heartbeat callback which indicates that the lower
+ * numbered node has started heartbeating.  The lower numbered node is passive
+ * and only accepts the connection if the higher numbered node is heartbeating.
+ * The connection attempts continue as long as heartbeat is active until a
+ * build-time max number of failed attempts.
  *
- * One can imagine the direction a more sophisticated API would head in:
- * (there are certainly a half dozen examples in the kernel)
- *   * tx
- *   	- passes in page/off/len to send, gets put on a queue
- *   	- if response data is needed, passes in preallocated page/off/len
- *   	- tx header includes message id to associate reply with posted rx buf
- *   	- write_space triggers passing the p/o/l tx queue to ->sendpage()
- *   * rx
- *      - data_ready uses tcp_read_sock to parse message header
- *      - header identifies whether to copy into posted rx buf or unsolicited
- *      - handers must be callable from bh context
- * but it really depends on what the semantics and messages are.
- *
  * XXX
- * 	- tear down all node sockets on rx thread exit
- * 	- have rx thread stop active tx and wait for them
  * 	- don't allow node rmdir if it has socket and rx thread is running
  * 	- disable preemt before calling rx handler when debugging
  * 	- find explicit stack call to drain rx queue
  * 	- add trivial version trading message at the start of a conn
- * 	- go nuts adding static
+ * 	- add kerneldoc for the external interface
  */
 
 #include <linux/kernel.h>
@@ -94,48 +77,54 @@
 
 #include "tcp_internal.h"
 
-/* In the following two log macros, the whitespace after the ',' just
+/* 
+ * In the following two log macros, the whitespace after the ',' just
  * before ##args is intentional. Otherwise, gcc 2.95 will eat the
  * previous token if args expands to nothing.
  */
-
-#define __msg_fmt "[mag %u len %u typ %u stat %d sys_stat %d key %08x num %u] "
-#define __msg_args __hdr->magic, __hdr->data_len, __hdr->msg_type, 	\
- 	__hdr->status,	__hdr->sys_status, __hdr->key, __hdr->msg_num
 #define msglog(hdr, fmt, args...) do {					\
 	typeof(hdr) __hdr = (hdr);					\
-	mlog(ML_MSG, __msg_fmt fmt, __msg_args , ##args);		\
+	mlog(ML_MSG, "[mag %u len %u typ %u stat %d sys_stat %d "	\
+	     "key %08x num %u] " fmt, __hdr->magic, __hdr->data_len, 	\
+	     __hdr->msg_type, __hdr->status, __hdr->sys_status,		\
+	     __hdr->key, __hdr->msg_num ,  ##args);			\
 } while (0)
 
-#define __sc_fmt 							\
-	"[sc %p refs %d sock %p node %p from_con %u item_on %d "	\
-	"page %p pg_off %zu] "
-#define __sc_args __sc, atomic_read(&__sc->sc_kref.refcount), 		\
-	__sc->sc_sock,	__sc->sc_node, __sc->sc_from_connect, 		\
-	!list_empty(&__sc->sc_item), __sc->sc_page, __sc->sc_page_off
 #define sclog(sc, fmt, args...) do {					\
 	typeof(sc) __sc = (sc);						\
-	mlog(ML_SOCKET, __sc_fmt fmt, __sc_args , ##args);		\
+	mlog(ML_SOCKET, "[sc %p refs %d sock %p node %u page %p "	\
+	     "pg_off %zu] " fmt, __sc,					\
+	     atomic_read(&__sc->sc_kref.refcount), __sc->sc_sock,	\
+	    __sc->sc_node->nd_num, __sc->sc_page, __sc->sc_page_off ,	\
+	    ##args);							\
 } while (0)
 
 static rwlock_t o2net_handler_lock = RW_LOCK_UNLOCKED;
 static struct rb_root o2net_handler_tree = RB_ROOT;
 
-/* this lock is also grabbed from bh context, non-bh use _bh() locking */
-static spinlock_t o2net_active_lock = SPIN_LOCK_UNLOCKED;
-static LIST_HEAD(o2net_active_list);
-static LIST_HEAD(o2net_detach_list);
-static LIST_HEAD(o2net_attach_list);
+static struct workqueue_struct *o2net_wq;
+static struct o2net_node o2net_nodes[O2NM_MAX_NODES];
 
 /* XXX someday we'll need better accounting */
-static struct task_struct *o2net_recv_task = NULL;
+static struct socket *o2net_listen_sock = NULL;
+static struct work_struct o2net_listen_work;
 
+static struct o2hb_callback_func o2net_hb_up, o2net_hb_down;
+#define O2NET_HB_PRI 0x1
+
 static int o2net_sys_err_translations[O2NET_ERR_MAX] =
 		{[O2NET_ERR_NONE]	= 0,
 		 [O2NET_ERR_NO_HNDLR]	= -ENOPROTOOPT,
 		 [O2NET_ERR_OVERFLOW]	= -EOVERFLOW,
 		 [O2NET_ERR_DIED]	= -EHOSTDOWN,};
 
+/* can't quite avoid *all* internal declarations :/ */
+static void o2net_start_connect(void *arg);
+static void o2net_sc_connect_completed(void *arg);
+static void o2net_rx_until_empty(void *arg);
+static void o2net_shutdown_sc(void *arg);
+static void o2net_listen_data_ready(struct sock *sk, int bytes);
+
 static inline int o2net_sys_err_to_errno(enum o2net_system_error err)
 {
 	int trans;
@@ -147,95 +136,137 @@
 	return trans;
 }
 
-/////////////////////
-static struct socket *o2net_init_tcp_recv_sock(u16 port);
-static int o2net_receive_thread(void *data);
-static int o2net_receive(void);
-static void o2net_try_accept(struct socket *sock);
-static int o2net_process_message(struct o2nm_node *node, struct socket *sock,
-				 o2net_msg *hdr);
+static struct o2net_node * o2net_nn_from_num(u8 node_num)
+{
+	BUG_ON(node_num >= ARRAY_SIZE(o2net_nodes));
+	return &o2net_nodes[node_num];
+}
 
-static void o2net_data_ready(struct sock *sk, int bytes);
-static int o2net_sc_from_node(struct o2nm_node *node,
-			      struct o2net_sock_container **sc_ret);
-static int o2net_attach_sc(struct o2net_sock_container *sc);
-static void o2net_finish_connect(struct o2nm_node *node);
-static struct o2net_sock_container *sc_alloc(struct o2nm_node *node,
-					     int from_conn);
-static void o2net_state_change(struct sock *sk);
-static void o2net_complete_nodes_nsw(struct o2nm_node *node);
+static u8 o2net_num_from_nn(struct o2net_node *nn)
+{
+	BUG_ON(nn == NULL);
+	return nn - o2net_nodes;
+}
 
-//////////////////////
+/* ------------------------------------------------------------ */
 
-int o2net_start_rx_thread(struct o2nm_node *node)
+static int o2net_prep_nsw(struct o2net_node *nn, struct o2net_status_wait *nsw)
 {
-	struct socket *sock;
 	int ret = 0;
 
-	BUG_ON(node->nd_rx_thread != NULL);
-	BUG_ON(o2net_recv_task != NULL);
+	do { 
+		if (!idr_pre_get(&nn->nn_status_idr, GFP_NOFS)) {
+			ret = -EAGAIN;
+			break;
+		}
+		spin_lock(&nn->nn_lock);
+#ifndef IDR_GET_NEW_RETURNS_ID
+		ret = idr_get_new(&nn->nn_status_idr, nsw, &nsw->ns_id);
+#else
+		/* old semantics */
+		nsw->ns_id = idr_get_new(&nn->nn_status_idr, nsw);
+		if (nsw->ns_id < 0)
+			ret = -EAGAIN;
+		else
+			ret = 0;
+#endif
+		if (ret == 0)
+			list_add_tail(&nsw->ns_node_item,
+				      &nn->nn_status_list);
+		spin_unlock(&nn->nn_lock);
+	} while (ret == -EAGAIN);
 
-	/* if the thread was setting up the rx socket we'd like to have it
-	 * communicate errors back to us here.  us setting up the socket
-	 * and passing it to the thread is easier */
-	sock = o2net_init_tcp_recv_sock(node->nd_ipv4_port);
-	if (IS_ERR(sock)) {
-		ret = PTR_ERR(sock);
-		sock = NULL;
-		goto out;
+	if (ret == 0)  {
+		init_waitqueue_head(&nsw->ns_wq);
+		nsw->ns_sys_status = O2NET_ERR_NONE;
+		nsw->ns_status = 0;
 	}
 
-	mlog(ML_KTHREAD, "starting net receive thread...\n");
+	return ret;
+}
 
-	node->nd_rx_thread = kthread_run(o2net_receive_thread, sock,
-					 "o2netrecv-%s", node->nd_name);
-	if (IS_ERR(node->nd_rx_thread)) {
-		ret = PTR_ERR(node->nd_rx_thread);
-		node->nd_rx_thread = NULL;
-		mlog(ML_ERROR, "unable to launch net receive thread, "
-		     "error=%ld\n", (long)ret);
-		goto out;
+static void o2net_complete_nsw_locked(struct o2net_node *nn,
+				      struct o2net_status_wait *nsw,
+				      enum o2net_system_error sys_status,
+				      s32 status)
+{
+	assert_spin_locked(&nn->nn_lock);
+
+	if (!list_empty(&nsw->ns_node_item)) {
+		list_del_init(&nsw->ns_node_item);
+		nsw->ns_sys_status = sys_status;
+		nsw->ns_status = status;
+		idr_remove(&nn->nn_status_idr, nsw->ns_id);
+		wake_up(&nsw->ns_wq);
 	}
+}
 
-	/* once the thread is running it has ownership of the sock */
-	sock = NULL;
-	o2net_recv_task = node->nd_rx_thread;
+static void o2net_complete_nsw(struct o2net_node *nn,
+			       struct o2net_status_wait *nsw,
+			       u64 id, enum o2net_system_error sys_status, 
+			       s32 status)
+{
+	spin_lock(&nn->nn_lock);
+	if (nsw == NULL) {
+		if (id > INT_MAX)
+			goto out;
 
+		nsw = idr_find(&nn->nn_status_idr, id);
+		if (nsw == NULL)
+			goto out;
+	}
+
+	o2net_complete_nsw_locked(nn, nsw, sys_status, status);
+
 out:
-	if (sock)
-		sock_release(sock);
-	return 0;
+	spin_unlock(&nn->nn_lock);
+	return;
 }
 
-void o2net_stop_rx_thread(struct o2nm_node *node)
+static void o2net_complete_nodes_nsw(struct o2net_node *nn)
 {
-	if (node->nd_rx_thread) {
-		mlog(ML_KTHREAD, "waiting for net thread to exit....\n");
-		kthread_stop(node->nd_rx_thread);
-		node->nd_rx_thread = NULL;
-		o2net_recv_task = NULL;
+	struct list_head *iter, *tmp;
+	unsigned int num_kills = 0;
+	struct o2net_status_wait *nsw;
+
+	assert_spin_locked(&nn->nn_lock);
+
+	list_for_each_safe(iter, tmp, &nn->nn_status_list) {
+		nsw = list_entry(iter, struct o2net_status_wait, ns_node_item);
+		o2net_complete_nsw_locked(nn, nsw, O2NET_ERR_DIED, 0);
+		num_kills++;
 	}
+	
+	mlog(0, "completed %d messages for node %u\n", num_kills,
+	     o2net_num_from_nn(nn));
+}
 
-	/* XXX if we stop the thread we've cut off the rx path for all the
-	 * nodes.. we should walk their net_inode_privates and tear down their
-	 * sockets.   tx shouldn't bring up a conn if there is no
-	 * rx thread and rmdir should sync with the rx therad and tx 
-	 * references.. ugh. */
+static int o2net_nsw_completed(struct o2net_node *nn,
+			       struct o2net_status_wait *nsw)
+{
+	int completed;
+	spin_lock(&nn->nn_lock);
+	completed = list_empty(&nsw->ns_node_item);
+	spin_unlock(&nn->nn_lock);
+	return completed;
 }
 
-/*
- * we must be detached at this point because the state detach tears down
- * holds a ref.. this could well be called from detach. 
- */
+/* ------------------------------------------------------------ */
+
 static void sc_kref_release(struct kref *kref)
 {
 	struct o2net_sock_container *sc = container_of(kref,
 					struct o2net_sock_container, sc_kref);
 	sclog(sc, "releasing\n");
+
 	if (sc->sc_sock) {
 		sock_release(sc->sc_sock);
 		sc->sc_sock = NULL;
 	}
+
+	o2nm_node_put(sc->sc_node);
+	sc->sc_node = NULL;
+
 	o2net_proc_del_sc(sc);
 	kfree(sc);
 }
@@ -245,203 +276,254 @@
 	sclog(sc, "put\n");
 	kref_put(&sc->sc_kref, sc_kref_release);
 }
-
-static void sk_register_callbacks(struct sock *sk,
-			          struct o2net_sock_container *sc)
+static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
 {
-	write_lock_bh(&sk->sk_callback_lock);
-	if (sk->sk_user_data != sc) {
-		sk->sk_user_data = sc;
-		kref_get(&sc->sc_kref);
-		sc->sc_data_ready = sk->sk_data_ready;
-		sc->sc_state_change = sk->sk_state_change;
-		sk->sk_data_ready = o2net_data_ready;
-		sk->sk_state_change = o2net_state_change;
-	}
-	write_unlock_bh(&sk->sk_callback_lock);
-}
+	struct o2net_sock_container *sc, *ret = NULL;
+	struct page *page = NULL;
 
-static int sk_unregister_callbacks(struct sock *sk,
-			           struct o2net_sock_container *sc)
-{
-	int ret = 0;
+	page = alloc_page(GFP_NOFS);
+	sc = kcalloc(1, sizeof(*sc), GFP_NOFS);
+	if (sc == NULL || page == NULL)
+		goto out;
 
-	write_lock_bh(&sk->sk_callback_lock);
-	if (sk->sk_user_data == sc) {
-		ret = 1;
-		sk->sk_user_data = NULL;
-		sk->sk_data_ready = sc->sc_data_ready;
-		sk->sk_state_change = sc->sc_state_change;
-	}
-	write_unlock_bh(&sk->sk_callback_lock);
+	kref_init(&sc->sc_kref, sc_kref_release);
+	o2nm_node_get(node);
+	sc->sc_node = node;
 
+	INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed, sc);
+	INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty, sc);
+	INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc, sc);
+
+	sclog(sc, "alloced\n");
+
+	ret = sc;
+	sc->sc_page = page;
+	o2net_proc_add_sc(sc);
+	sc = NULL;
+	page = NULL;
+
+out:
+	if (page)
+		__free_page(page);
+	kfree(sc);
+
 	return ret;
 }
 
-/*
- * When a sock_container is detached it will no longer see traffic once
- * its current users are done.  The sc is removed from the node so
- * transmitting tasks can't find it.  It is removed from the active lists
- * and its callbacks are unregistered so that the receive thread can't
- * find it.  After this point the last person to drop their kref will
- * free the container.
- *
- * If an sc is specified the caller must hold a reference.  A node can be
- * specified instead of an sc such that any sc currently attached to the
- * node will be detached. 
- */
-void o2net_detach_sc(struct o2net_sock_container *sc, struct o2nm_node *node)
+/* ------------------------------------------------------------ */
+
+/* make sure the work gets a ref to drop */
+static void o2net_sc_queue_work(struct o2net_sock_container *sc,
+				struct work_struct *work)
 {
-	int nr_puts = 0;
-	struct sock *sk;
+	kref_get(&sc->sc_kref);
+	if (!queue_work(o2net_wq, work))
+		sc_put(sc);
+}
 
-	/* node is only used to get the sc if it isn't specified */
-	BUG_ON(sc && node);
-	BUG_ON(sc == NULL && node == NULL);
+static void o2net_set_nn_state(struct o2net_node *nn,
+			       struct o2net_sock_container *sc,
+			       unsigned valid, int err)
+{
+	int was_valid = nn->nn_sc_valid;
+	struct o2net_sock_container *old_sc = nn->nn_sc;
 
-	/* no sc, get it from the node */
-	if (sc == NULL) {
-		spin_lock_bh(&node->nd_lock);
-		if (node->nd_sc) {
-			sc = node->nd_sc;
-			node->nd_sc = NULL;
-			o2net_complete_nodes_nsw(node);
-			/* we have this ref now */
-			nr_puts++;
-		}
-		spin_unlock_bh(&node->nd_lock);
-		node = NULL;
-	}
+	assert_spin_locked(&nn->nn_lock);
 
-	if (sc == NULL)
-		goto out;
+	/* the node num comparison and single connect/accept path should stop
+	 * an non-null sc from being overwritten with another */
+	BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
+	mlog_bug_on_msg(err && valid, "err %d valid %u\n", err, valid);
+	mlog_bug_on_msg(valid && !sc, "valid %u sc %p\n", valid, sc);
 
-	sclog(sc, "detaching with node %p\n", node);
+	mlog(ML_CONN, "node %u sc: %p -> %p, valid %u -> %u, err %d -> %d\n",
+	     o2net_num_from_nn(nn), nn->nn_sc, sc, nn->nn_sc_valid, valid,
+	     nn->nn_persistent_error, err);
 
-	spin_lock_bh(&sc->sc_lock);
-	if (sc->sc_node) {
-		/* drop our ref to the node below */
-		node = sc->sc_node;
-		sc->sc_node = NULL;
-		/* resolve our pending connect under sc_lock and while
-		 * we have a node ref.  this is done in the error case
-		 * where state_change puts us on the detach list. */
-		if (sc->sc_pending_connect) {
-			sc->sc_pending_connect = 0;
-			o2net_finish_connect(node);
-		}
+	nn->nn_sc = sc;
+	nn->nn_sc_valid = valid ? 1 : 0;
+	nn->nn_persistent_error = err;
+
+	/* mirrors o2net_tx_can_proceed() */
+	if (nn->nn_persistent_error || nn->nn_sc_valid)
+		wake_up(&nn->nn_sc_wq);
+
+	if (was_valid && !valid) {
+		mlog(ML_NOTICE, "ocfs2:tcp: no longer connected to "
+		       "node %s at %u.%u.%u.%u:%d\n",
+		       old_sc->sc_node->nd_name,
+		       NIPQUAD(old_sc->sc_node->nd_ipv4_address), 
+		       ntohs(old_sc->sc_node->nd_ipv4_port));
+		o2net_complete_nodes_nsw(nn);
 	}
-	if (!list_empty(&sc->sc_item)) {
-		list_del_init(&sc->sc_item);
-		nr_puts++;
+
+	if (!was_valid && valid)
+		mlog(ML_NOTICE, "ocfs2:tcp: connected to node %s at "
+		     "%u.%u.%u.%u:%d\n", sc->sc_node->nd_name,
+		     NIPQUAD(sc->sc_node->nd_ipv4_address), 
+		     ntohs(sc->sc_node->nd_ipv4_port));
+
+	/* trigger the connecting worker func as long as we're not valid,
+	 * it will back off if it shouldn't connect.  This can be called
+	 * from node config teardown and so needs to be careful about
+	 * the work queue actually being up. */
+	if (!valid && o2net_wq) {
+		mlog(ML_CONN, "queueing conn with %u attempts\n",
+		     nn->nn_connect_attempts);
+		queue_delayed_work(o2net_wq, &nn->nn_connect_work,
+				   nn->nn_connect_attempts * 2 * HZ);
 	}
-	spin_unlock_bh(&sc->sc_lock);
 
-	if (node) {
-		spin_lock_bh(&node->nd_lock);
-		/* if the node is still pointing to us, drop that */
-		if (node->nd_sc == sc) {
-			mlog(ML_NOTICE, "ocfs2:tcp: no longer connected to "
-			       "node %s at %u.%u.%u.%u:%d\n", node->nd_name,
-			       NIPQUAD(node->nd_ipv4_address), 
-			       ntohs(node->nd_ipv4_port));
-			node->nd_sc = NULL;
-			o2net_complete_nodes_nsw(node);
-			nr_puts++;
-		}
-		spin_unlock_bh(&node->nd_lock);
-		o2nm_node_put(node);
-		node = NULL;
+	/* keep track of the nn's sc ref for the caller */
+	if ((old_sc == NULL) && sc)
+		kref_get(&sc->sc_kref);
+	if (old_sc && (old_sc != sc)) {
+		o2net_sc_queue_work(old_sc, &old_sc->sc_shutdown_work);
+		sc_put(old_sc);
 	}
+}
 
-	/* drop user_data's ref and tear down the callbacks.  the
-	 * callbacks themselves execute under the callback_lock so
-	 * this serializes with them. */
-	sk = sc->sc_sock->sk;
-	if (sk_unregister_callbacks(sk, sc))
-		nr_puts++;
+/* see o2net_register_callbacks() */
+static void o2net_data_ready(struct sock *sk, int bytes)
+{
+	void (*ready)(struct sock *sk, int bytes);
 
-out:
-	if (sc) {
-		sclog(sc, "detach droping %d refs\n", nr_puts);
-		while(nr_puts--)
-			sc_put(sc);
+	read_lock(&sk->sk_callback_lock);
+	if (sk->sk_user_data) {
+		struct o2net_sock_container *sc = sk->sk_user_data;
+		sclog(sc, "data_ready hit\n");
+		o2net_sc_queue_work(sc, &sc->sc_rx_work);
+		ready = sc->sc_data_ready;
+	} else {
+		ready = sk->sk_data_ready;
 	}
+	read_unlock(&sk->sk_callback_lock);
+
+	ready(sk, bytes);
 }
-EXPORT_SYMBOL_GPL(o2net_detach_sc);
 
-static void o2net_check_cb_lists(void)
+/* see o2net_register_callbacks() */
+static void o2net_state_change(struct sock *sk)
 {
+	void (*state_change)(struct sock *sk);
 	struct o2net_sock_container *sc;
-	/* when we get the sc off the list we inherit the ref that was
-	 * created when the sc was put on the list */
-	spin_lock_bh(&o2net_active_lock);
-	while (!list_empty(&o2net_attach_list)) {
-		sc = list_entry(o2net_attach_list.next,
-				struct o2net_sock_container, sc_item);
 
-		list_del_init(&sc->sc_item);
-		spin_unlock_bh(&o2net_active_lock);
+	read_lock(&sk->sk_callback_lock);
+	sc = sk->sk_user_data;
+	if (sc == NULL) {
+		state_change = sk->sk_state_change;
+		goto out;
+	}
 
-		sclog(sc, "found on connect list\n");
+	sclog(sc, "state_change to %d\n", sk->sk_state);
 
-		o2net_attach_sc(sc);
-		sc_put(sc);
+	state_change = sc->sc_state_change;
 
-		spin_lock_bh(&o2net_active_lock);
+	switch(sk->sk_state) {
+		case TCP_SYN_SENT: 
+		case TCP_SYN_RECV: 
+			break;
+		case TCP_ESTABLISHED: 
+			o2net_sc_queue_work(sc, &sc->sc_connect_work);
+			break;
+		default:
+			o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
+			break;
 	}
+out:
+	read_unlock(&sk->sk_callback_lock);
+	state_change(sk);
+}	
 
-	while (!list_empty(&o2net_detach_list)) {
-		sc = list_entry(o2net_detach_list.next,
-				struct o2net_sock_container, sc_item);
 
-		list_del_init(&sc->sc_item);
-		spin_unlock_bh(&o2net_active_lock);
+/*
+ * we register callbacks so we can queue work on events before calling
+ * the original callbacks.  our callbacks our careful to test user_data
+ * to discover when they've reaced with sk_unregister_callbacks().
+ */
+static void sk_register_callbacks(struct sock *sk,
+			          struct o2net_sock_container *sc)
+{
+	write_lock_bh(&sk->sk_callback_lock);
 
-		sclog(sc, "found on detach list\n");
+	/* accepted sockets inherit the old listen socket data ready */
+	if (sk->sk_data_ready == o2net_listen_data_ready) {
+		sk->sk_data_ready = sk->sk_user_data;
+		sk->sk_user_data = NULL;
+	}
 
-		o2net_detach_sc(sc, NULL);
-		sc_put(sc);
+	BUG_ON(sk->sk_user_data != NULL);
+	sk->sk_user_data = sc;
+	kref_get(&sc->sc_kref);
 
-		spin_lock_bh(&o2net_active_lock);
-	}
-	spin_unlock_bh(&o2net_active_lock);
+	sc->sc_data_ready = sk->sk_data_ready;
+	sc->sc_state_change = sk->sk_state_change;
+	sk->sk_data_ready = o2net_data_ready;
+	sk->sk_state_change = o2net_state_change;
+
+	write_unlock_bh(&sk->sk_callback_lock);
 }
 
-static int o2net_rx_should_wake(struct socket *sock)
+static int sk_unregister_callbacks(struct sock *sk,
+			           struct o2net_sock_container *sc)
 {
-	int empty;
+	int ret = 0;
 
-	spin_lock_bh(&o2net_active_lock);
-	empty = list_empty(&o2net_active_list) &&
-		list_empty(&o2net_detach_list) &&
-		list_empty(&o2net_attach_list);
-	spin_unlock_bh(&o2net_active_lock);
+	write_lock_bh(&sk->sk_callback_lock);
+	if (sk->sk_user_data == sc) {
+		ret = 1;
+		sk->sk_user_data = NULL;
+		sk->sk_data_ready = sc->sc_data_ready;
+		sk->sk_state_change = sc->sc_state_change;
+	}
+	write_unlock_bh(&sk->sk_callback_lock);
 
-	return !empty || tcp_poll(NULL, sock, NULL);
+	return ret;
 }
 
-static int o2net_receive_thread(void *data)
+/* 
+ * this is a little helper that is called by callers who have seen a problem
+ * with an sc and want to detach it from the nn if someone already hasn't beat
+ * them to it..
+ */
+static void o2net_ensure_shutdown(struct o2net_node *nn,
+			           struct o2net_sock_container *sc)
 {
-	struct socket *sock = data;
+	spin_lock(&nn->nn_lock);
+	if (nn->nn_sc == sc)
+		o2net_set_nn_state(nn, NULL, 0, 0);
+	spin_unlock(&nn->nn_lock);
+}
 
-	mlog(ML_KTHREAD, "net thread running...\n");
+/*
+ * This work queue function performs the blocking parts of socket shutdown.  A
+ * few paths lead here.  set_nn_state will trigger this callback if it sees an
+ * sc detached from the nn.  state_change will also trigger this callback
+ * directly when it sees errors.  In that case we need to call set_nn_state
+ * ourselves as state_change couldn't get the nn_lock and call set_nn_state
+ * itself. 
+ */
+static void o2net_shutdown_sc(void *arg)
+{
+	struct o2net_sock_container *sc = arg;
+	struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
 
-       	while(!kthread_should_stop()) {
-		o2net_try_accept(sock);
-		o2net_check_cb_lists();
-		o2net_receive();
+	sclog(sc, "shutting down\n");
 
-		wait_event_interruptible(*sock->sk->sk_sleep,
-					 o2net_rx_should_wake(sock) ||
-					 kthread_should_stop());
+	/* drop the callbacks ref and call shutdown only once */
+	if (sk_unregister_callbacks(sc->sc_sock->sk, sc)) {
+		flush_workqueue(o2net_wq);
+		sc_put(sc);
+		sc->sc_sock->ops->shutdown(sc->sc_sock,
+					   RCV_SHUTDOWN|SEND_SHUTDOWN);
 	}
 
-	mlog(ML_KTHREAD, "net thread exiting\n");
-	sock_release(sock);
-	return 0;
+	o2net_ensure_shutdown(nn, sc);
+	sc_put(sc);
 }
 
+/* ------------------------------------------------------------ */
+
 static int o2net_handler_cmp(struct o2net_msg_handler *nmh, u32 msg_type,
 			     u32 key)
 {
@@ -493,6 +575,11 @@
 	kfree(nmh);
 }
 
+static void o2net_handler_put(struct o2net_msg_handler *nmh)
+{
+	kref_put(&nmh->nh_kref, o2net_handler_kref_release);
+}
+
 /* max_len is protection for the handler func.  incoming messages won't
  * be given to the handler if their payload is longer than the max. */
 int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
@@ -599,10 +686,7 @@
 	return nmh;
 }
 
-static void o2net_handler_put(struct o2net_msg_handler *nmh)
-{
-	kref_put(&nmh->nh_kref, o2net_handler_kref_release);
-}
+/* ------------------------------------------------------------ */
 
 static int o2net_recv_tcp_msg(struct socket *sock, void *data, size_t len)
 {
@@ -660,116 +744,49 @@
 	return ret;
 }
 
-static int o2net_prep_nsw(struct o2nm_node *node, struct o2net_status_wait *nsw)
+static void o2net_init_msg(o2net_msg *msg, u16 data_len, u16 msg_type, u32 key)
 {
-	int ret = 0;
-
-	do { 
-		if (!idr_pre_get(&node->nd_status_idr, GFP_NOFS)) {
-			ret = -EAGAIN;
-			break;
-		}
-		spin_lock_bh(&node->nd_lock);
-#ifndef IDR_GET_NEW_RETURNS_ID
-		ret = idr_get_new(&node->nd_status_idr, nsw, &nsw->ns_id);
-#else
-		/* old semantics */
-		nsw->ns_id = idr_get_new(&node->nd_status_idr, nsw);
-		if (nsw->ns_id < 0)
-			ret = -EAGAIN;
-		else
-			ret = 0;
-#endif
-		if (ret == 0)
-			list_add_tail(&nsw->ns_node_item,
-				      &node->nd_status_list);
-		spin_unlock_bh(&node->nd_lock);
-	} while (ret == -EAGAIN);
-
-	if (ret == 0)  {
-		init_waitqueue_head(&nsw->ns_wq);
-		nsw->ns_sys_status = O2NET_ERR_NONE;
-		nsw->ns_status = 0;
-	}
-
-	return ret;
+	memset(msg, 0, sizeof(o2net_msg));
+	msg->magic = O2NET_MSG_MAGIC;
+	msg->data_len = data_len;
+	msg->msg_type = msg_type;
+	msg->sys_status = O2NET_ERR_NONE;
+	msg->status = 0;
+	msg->key = key;
 }
 
-static void o2net_complete_nsw_locked(struct o2nm_node *node,
-				      struct o2net_status_wait *nsw,
-				      enum o2net_system_error sys_status,
-				      s32 status)
+static int o2net_tx_can_proceed(struct o2net_node *nn,
+			        struct o2net_sock_container **sc_ret,
+				int *error)
 {
-	assert_spin_locked(&node->nd_lock);
+	int ret = 0;
 
-	if (!list_empty(&nsw->ns_node_item)) {
-		list_del_init(&nsw->ns_node_item);
-		nsw->ns_sys_status = sys_status;
-		nsw->ns_status = status;
-		idr_remove(&node->nd_status_idr, nsw->ns_id);
-		wake_up(&nsw->ns_wq);
-	}
-}
+	spin_lock(&nn->nn_lock);
+	if (nn->nn_persistent_error) {
+		ret = 1;
+		*sc_ret = NULL;
+		*error = nn->nn_persistent_error; 
+	} else if (nn->nn_sc_valid) {
+		kref_get(&nn->nn_sc->sc_kref);
 
-static void o2net_complete_nsw(struct o2nm_node *node,
-			       struct o2net_status_wait *nsw,
-			       u64 id, enum o2net_system_error sys_status, 
-			       s32 status)
-{
-	spin_lock_bh(&node->nd_lock);
-	if (nsw == NULL) {
-		if (id > INT_MAX)
-			goto out;
-
-		nsw = idr_find(&node->nd_status_idr, id);
-		if (nsw == NULL)
-			goto out;
+		ret = 1;
+		*sc_ret = nn->nn_sc;
+		*error = 0;
 	}
+	spin_unlock(&nn->nn_lock);
 
-	o2net_complete_nsw_locked(node, nsw, sys_status, status);
-
-out:
-	spin_unlock_bh(&node->nd_lock);
-	return;
+	return ret;
 }
 
-static void o2net_complete_nodes_nsw(struct o2nm_node *node)
-{
-	struct list_head *iter, *tmp;
-	unsigned int num_kills = 0;
-	struct o2net_status_wait *nsw;
-
-	assert_spin_locked(&node->nd_lock);
-
-	list_for_each_safe(iter, tmp, &node->nd_status_list) {
-		nsw = list_entry(iter, struct o2net_status_wait, ns_node_item);
-		o2net_complete_nsw_locked(node, nsw, O2NET_ERR_DIED, 0);
-		num_kills++;
-	}
-	
-	mlog(0, "node %s (%u) died, killed %d messages\n", node->nd_name,
-	     node->nd_num, num_kills);
-}
-
-static int o2net_nsw_completed(struct o2nm_node *node,
-			       struct o2net_status_wait *nsw)
-{
-	int completed;
-	spin_lock_bh(&node->nd_lock);
-	completed = list_empty(&nsw->ns_node_item);
-	spin_unlock_bh(&node->nd_lock);
-	return completed;
-}
-
 int o2net_send_message_iov(u32 msg_type, u32 key, struct iovec *caller_iov,
 			   size_t caller_iovlen, u8 target_node, int *status)
 {
-	int ret;
+	int ret, error = 0;
 	o2net_msg *msg = NULL;
-	size_t i, iovlen, caller_bytes = 0;
+	size_t iovlen, caller_bytes = 0;
 	struct iovec *iov = NULL;
 	struct o2net_sock_container *sc = NULL;
-	struct o2nm_node *node = NULL;
+	struct o2net_node *nn = o2net_nn_from_num(target_node);
 	struct o2net_status_wait nsw = {
 		.ns_node_item = LIST_HEAD_INIT(nsw.ns_node_item),
 	};
@@ -778,12 +795,11 @@
 		.st_task = current,
 		.st_msg_type = msg_type,
 		.st_msg_key = key,
+		.st_node = target_node,
 	};
 
-	BUG_ON(o2net_recv_task && (current == o2net_recv_task));
-
-	if (o2net_recv_task == NULL) {
-		mlog(0, "attempt to tx without a setup rx thread\n");
+	if (o2net_wq == NULL) {
+		mlog(0, "attempt to tx without o2netd running\n");
 		ret = -ESRCH;
 		goto out;
 	}
@@ -794,9 +810,7 @@
 		goto out;
 	}
 
-	for(i = 0; i < caller_iovlen; i++)
-		caller_bytes += caller_iov[i].iov_len;
-
+	caller_bytes = iov_length(caller_iov, caller_iovlen);
 	if (caller_bytes > O2NET_MAX_PAYLOAD_BYTES) {
 		mlog(0, "total payload len %zu too large\n", caller_bytes);
 		ret = -EINVAL;
@@ -808,18 +822,13 @@
 		goto out;
 	}
 
-	node = o2nm_get_node_by_num(target_node);
-	if (node == NULL) {
-		mlog(0, "node %u unknown\n", target_node);
-		ret = -EINVAL;
-		goto out;
-	}
-
-	nst.st_node = node;
 	o2net_proc_add_nst(&nst);
 
 	do_gettimeofday(&nst.st_sock_time);
-	ret = o2net_sc_from_node(node, &sc);
+	ret = wait_event_interruptible(nn->nn_sc_wq,
+				       o2net_tx_can_proceed(nn, &sc, &error));
+	if (!ret && error)
+		ret = error;
 	if (ret)
 		goto out;
 
@@ -840,19 +849,14 @@
 		ret = -ENOMEM;
 		goto out;
 	}
-	memset(msg, 0, sizeof(o2net_msg));
-	msg->magic = O2NET_MSG_MAGIC;
-	msg->data_len = caller_bytes;
-	msg->msg_type = msg_type;
-	msg->sys_status = O2NET_ERR_NONE;
-	msg->status = 0;
-	msg->key = key;
 
+	o2net_init_msg(msg, caller_bytes, msg_type, key);
+
 	iov[0].iov_len = sizeof(o2net_msg);
 	iov[0].iov_base = msg;
 	memcpy(&iov[1], caller_iov, caller_iovlen * sizeof(struct iovec));
 
-	ret = o2net_prep_nsw(node, &nsw);
+	ret = o2net_prep_nsw(nn, &nsw);
 	if (ret)
 		goto out;
 
@@ -873,7 +877,7 @@
 
 	/* wait on other node's handler */
 	do_gettimeofday(&nst.st_status_time);
-	wait_event(nsw.ns_wq, o2net_nsw_completed(node, &nsw));
+	wait_event(nsw.ns_wq, o2net_nsw_completed(nn, &nsw));
 
 	/* Note that we avoid overwriting the callers status return
 	 * variable if a system error was reported on the other
@@ -892,30 +896,11 @@
 		kfree(iov);
 	if (msg)
 		kfree(msg);
-	if (node) {
-		o2net_complete_nsw(node, &nsw, 0, 0, 0);
-		o2nm_node_put(node);
-	}
+	o2net_complete_nsw(nn, &nsw, 0, 0, 0);
 	return ret;
 }
 EXPORT_SYMBOL_GPL(o2net_send_message_iov);
 
-/*
- * o2net_send_message
- *
- *   - this is probably the function you are looking for
- *   - it will package up the message for you, verifying that
- *       the message handler is there and the length is ok,
- *       connect to the other node if there is not already a
- *       socket for it, and optionally wait on a status return
- *       from the other node 
- *   - all you need prior to this call is to have inited the
- *       net stuff, to have a valid inode for the node to contact 
- *       in nm, and to have registered the message handler
- *   - if status was requested, it will be returned to the caller
- *       already converted to host byteorder
- *   - status will not be set on return code != 0
- */
 int o2net_send_message(u32 msg_type, u32 key, void *data, u32 len,
 		       u8 target_node, int *status)
 {
@@ -951,233 +936,12 @@
 	return o2net_send_tcp_msg(sock, &iov, 1, sizeof(o2net_msg));
 }
 
-/* a callback would like the rx thread to do some work on its behalf
- * in process context.. the callback has a ref from sk->user_data, 
- * we grab a ref as we put the sc on the list that the rx thread will
- * inherit when it removes from the list */
-static void o2net_sc_list_add(struct o2net_sock_container *sc,
-			      struct list_head *list)
-{
-	assert_spin_locked(&o2net_active_lock);
-
-	if (list_empty(&sc->sc_item)) {
-		sclog(sc, "adding to list %p\n", list);
-		kref_get(&sc->sc_kref);
-		list_add_tail(&sc->sc_item, list);
-	}
-
-	if (o2net_recv_task)
-		wake_up_process(o2net_recv_task);
-}
-
-/* teardown can race with these guys and stop them in their read lock.. 
- * teardown will clear sk_user_data and reset the callbacks so that these
- * guys can know to call them and not lose the event.. */
-static void o2net_data_ready(struct sock *sk, int bytes)
-{
-	void (*ready)(struct sock *sk, int bytes);
-	struct o2net_sock_container *sc;
-
-	read_lock(&sk->sk_callback_lock);
-	sc = sk->sk_user_data;
-	if (sc == NULL) {
-		ready = sk->sk_data_ready;
-		goto out;
-	}
-
-	sclog(sc, "data_ready hit\n");
-
-	spin_lock(&o2net_active_lock);
-	o2net_sc_list_add(sc, &o2net_active_list);
-	spin_unlock(&o2net_active_lock);
-
-	ready = sc->sc_data_ready;
-out:
-	read_unlock(&sk->sk_callback_lock);
-	ready(sk, bytes);
-}
-
-/* 
- * sk callbacks put sockets with activity on a list and we pluck off the 
- * them and call into the stack.  we get the ref from their entry on
- * the list.
- */
-static int o2net_receive(void)
-{
-	struct o2net_sock_container *sc;
-	o2net_msg *hdr;
-	int err = 0, read_eagain, read_some;
-	void *data;
-	size_t datalen;
-	struct o2nm_node *node = NULL;
-
-	spin_lock_bh(&o2net_active_lock);
-	while (!list_empty(&o2net_active_list)) {
-		sc = list_entry(o2net_active_list.next,
-				struct o2net_sock_container, sc_item);
-
-		/* we now have the ref that adding created */
-		list_del_init(&sc->sc_item);
-		spin_unlock_bh(&o2net_active_lock);
-
-		sclog(sc, "found on active list\n");
-
-		err = 0;
-		read_eagain = 0;
-		read_some = 0;
-
-		/* catch a race with detach or get a ref on the node
-		 * so we can process status messages */
-		spin_lock_bh(&sc->sc_lock);
-		if (sc->sc_node == NULL)
-			err = -ENOTCONN;
-		else {
-			o2nm_node_get(sc->sc_node);
-			node = sc->sc_node;
-		}
-		spin_unlock_bh(&sc->sc_lock);
-		if (err)
-			goto done;
-
-		/* do we need more header? */
-		if (sc->sc_page_off < sizeof(o2net_msg)) {
-			data = page_address(sc->sc_page) + sc->sc_page_off;
-			datalen = sizeof(o2net_msg) - sc->sc_page_off;
-			err = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
-			if (err > 0) {
-				sc->sc_page_off += err;
-				read_some = 1;
-				/* only swab incoming here.. we can
-				 * only get here once as we cross from
-				 * being under to over */
-				if (sc->sc_page_off == sizeof(o2net_msg)) {
-					hdr = page_address(sc->sc_page);
-					o2net_msg_to_host(hdr);
-					if (hdr->data_len > O2NET_MAX_PAYLOAD_BYTES)
-						err = -EOVERFLOW;
-				}
-			}
-			if (err < 0) {
-				if (err == -EAGAIN)
-					read_eagain = 1;
-				goto done;
-			}
-		}
-
-		if (sc->sc_page_off < sizeof(o2net_msg)) {
-			/* oof, still don't have a header */
-			goto done;
-		}
-
-		/* this was swabbed above when we first read it */
-		hdr = page_address(sc->sc_page);
-
-		msglog(hdr, "at page_off %zu\n", sc->sc_page_off);
-
-		/* do we need more payload? */
-		if (sc->sc_page_off - sizeof(o2net_msg) < hdr->data_len) {
-			/* need more payload */
-			data = page_address(sc->sc_page) + sc->sc_page_off;
-			datalen = (sizeof(o2net_msg) + hdr->data_len) -
-				  sc->sc_page_off;
-			err = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
-			if (err > 0) {
-				read_some = 1;
-				sc->sc_page_off += err;
-			}
-			if (err < 0) {
-				if (err == -EAGAIN)
-					read_eagain = 1;
-				goto done;
-			}
-		}
-
-		if (sc->sc_page_off - sizeof(o2net_msg) == hdr->data_len) {
-			/* whooo peee, we have a full message */
-			/* after calling this the message is toast */
-			err = o2net_process_message(node, sc->sc_sock, hdr);
-			sc->sc_page_off = 0;
-		}
-	
-done:
-
-		if (node) {
-			o2nm_node_put(node);
-			node = NULL;
-		}
-
-		sclog(sc, "finished reading with %d\n", err);
-		if (err == -EAGAIN)
-			err = 0;
-
-		/* we might not have consumed all the data that has been
-		 * announced to us through data_ready.. keep the net active
-		 * as long as there may still be remaining data.  
-		 * data_ready might have been called after we saw eagain */
-		if (!err && read_some && !read_eagain) {
-			spin_lock_bh(&o2net_active_lock);
-			o2net_sc_list_add(sc, &o2net_active_list);
-			spin_unlock_bh(&o2net_active_lock);
-		}
-
-		/* this exists to catch the framing errors, all other
-		 * errors should come through state_change, really. */
-		if (err) {
-			sclog(sc, "saw err %d, closing\n", err);
-			o2net_detach_sc(sc, NULL);
-		}
-
-		sc_put(sc);
-		spin_lock_bh(&o2net_active_lock);
-	}
-	spin_unlock_bh(&o2net_active_lock);
-	return 0;
-}
-
-/* if we get a message that the node has gone down then we detach its
- * active socket.  This will send error codes to any transmitters that
- * were waiting for status replies on that node. */
-static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
-				  void *data)
-{
-	o2net_detach_sc(NULL, node);
-}
-
-static struct o2hb_callback_func *o2net_hb_down = NULL;
-#define O2NET_HB_NODE_DOWN_PRI     (0x1)
-
-int o2net_register_hb_callbacks(void)
-{
-	o2net_hb_down = kmalloc(sizeof(*o2net_hb_down), GFP_KERNEL);
-	if (!o2net_hb_down)
-		return -ENOMEM;
-	memset(o2net_hb_down, 0, sizeof(*o2net_hb_down));
-
-	o2hb_setup_callback(o2net_hb_down, O2HB_NODE_DOWN_CB,
-			    o2net_hb_node_down_cb, NULL,
-			    O2NET_HB_NODE_DOWN_PRI);
-	return o2hb_register_callback(o2net_hb_down);
-}
-
-void o2net_unregister_hb_callbacks(void)
-{
-	int status;
-
-	if (o2net_hb_down) {
-		status = o2hb_unregister_callback(o2net_hb_down);
-		if (status < 0)
-			mlog(ML_ERROR, "Status return %d unregistering "
-			     "heartbeat callback!\n", status);
-		kfree(o2net_hb_down);
-		o2net_hb_down = NULL;
-	}
-}
-
 /* this returns -errno if the header was unknown or too large, etc.
  * after this is called the buffer us reused for the next message */
-static int o2net_process_message(struct o2nm_node *node, struct socket *sock,
+static int o2net_process_message(struct o2net_sock_container *sc,
 				 o2net_msg *hdr)
 {
+	struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
 	int ret, handler_status;
 	enum  o2net_system_error syserr;
 	struct o2net_msg_handler *nmh = NULL;
@@ -1186,7 +950,7 @@
 
 	if (hdr->magic == O2NET_MSG_STATUS_MAGIC) {
 		/* special type for returning message status */
-		o2net_complete_nsw(node, NULL, hdr->msg_num, hdr->sys_status,
+		o2net_complete_nsw(nn, NULL, hdr->msg_num, hdr->sys_status,
 				   hdr->status);
 		ret = 0;
 		goto out;
@@ -1220,7 +984,8 @@
 
 out_respond:
 	/* this destroys the hdr, so don't use it after this */
-	ret = o2net_send_status_magic(sock, hdr, syserr, handler_status);
+	ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr,
+				      handler_status);
 	hdr = NULL;
 	mlog(0, "sending handler status %d, syserr %d returned %d\n",
 	     handler_status, syserr, ret);
@@ -1231,188 +996,182 @@
 	return ret;
 }
 
-/* 
- * This is called once a socket has come through accept() or connect() and
- * is ready for business.  Once it is attached to the node
- * then transmitting paths can get ahold of it and send messages.
- */
-static int o2net_attach_sc(struct o2net_sock_container *sc)
+/* this demuxes the queued rx bytes into header or payload bits and calls
+ * handlers as each full message is read off the socket */
+static int o2net_advance_rx(struct o2net_sock_container *sc)
 {
-	int ret = 0, opt = 1;
-	u8 this_node = o2nm_this_node(); /* :( */
-	struct o2net_sock_container *detach = NULL;
-	struct o2nm_node *node = sc->sc_node;
-	mm_segment_t oldfs;
+	o2net_msg *hdr;
+	int ret = 0, made_progress = 0;
+	void *data;
+	size_t datalen;
 
-	sclog(sc, "attaching with node %p\n", node);
+	sclog(sc, "receiving\n");
 
-	BUG_ON(node == NULL);
-
-	spin_lock_bh(&node->nd_lock); 
-	if (node->nd_sc) {
-		/* this is a little exciting.  If we have racing sockets we
-		 * need to agree which to prefer with the remote node.  We only
-		 * get here if both sockets are successfully connected.  The
-		 * node with the lower number loses the socket it initiated
-		 * with connect(). */
-		if (this_node < node->nd_num) {
-			/* lose the one we started with connect() */ 
-			if (sc->sc_from_connect)
-				ret = -EEXIST;
-		} else {
-			/* lose the one we accepted */
-			if (!sc->sc_from_connect)
-				ret = -EEXIST;
+	/* do we need more header? */
+	if (sc->sc_page_off < sizeof(o2net_msg)) {
+		data = page_address(sc->sc_page) + sc->sc_page_off;
+		datalen = sizeof(o2net_msg) - sc->sc_page_off;
+		ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
+		if (ret > 0) {
+			made_progress = 1;
+			sc->sc_page_off += ret;
+			/* only swab incoming here.. we can
+			 * only get here once as we cross from
+			 * being under to over */
+			if (sc->sc_page_off == sizeof(o2net_msg)) {
+				hdr = page_address(sc->sc_page);
+				o2net_msg_to_host(hdr);
+				if (hdr->data_len > O2NET_MAX_PAYLOAD_BYTES)
+					ret = -EOVERFLOW;
+			}
 		}
-		/* we're keeping the new sc and the old one has to go */
-		if (ret == 0) {
-			detach = node->nd_sc;
-			node->nd_sc = NULL;
-			/* we get its ref and drop it below */
-		}
+		if (ret < 0)
+			goto out;
 	}
-	if (node->nd_sc == NULL) {
-		node->nd_sc = sc;
-		kref_get(&sc->sc_kref);
-		/* resolve our pending connect before we drop the node lock */
-		if (sc->sc_pending_connect) {
-			sc->sc_pending_connect = 0;
-			o2net_finish_connect(node);
-		} else {
-			/* accepts arriving should wake sleepers, too */
-			atomic_inc(&node->nd_sc_generation);
-			wake_up_all(&node->nd_sc_wq);
-		}
-		mlog(ML_NOTICE, "%s connection to node %s at %u.%u.%u.%u:%d\n", 
-		     sc->sc_from_connect ? "initiated" : "accepted",
-		     node->nd_name, NIPQUAD(node->nd_ipv4_address), 
-		     ntohs(node->nd_ipv4_port));
-	}
-	spin_unlock_bh(&node->nd_lock); 
-	if (ret)
+
+	if (sc->sc_page_off < sizeof(o2net_msg)) {
+		/* oof, still don't have a header */
 		goto out;
-	if (detach) {
-		o2net_detach_sc(detach, NULL);
-		sc_put(detach);
-		detach = NULL;
 	}
 
-	oldfs = get_fs();
-	set_fs(KERNEL_DS);
-	ret = sc->sc_sock->ops->setsockopt(sc->sc_sock, SOL_TCP, TCP_NODELAY,
-					   (char __user *)&opt, sizeof(opt));
-	set_fs(oldfs);
+	/* this was swabbed above when we first read it */
+	hdr = page_address(sc->sc_page);
 
-	sk_register_callbacks(sc->sc_sock->sk, sc);
+	msglog(hdr, "at page_off %zu\n", sc->sc_page_off);
 
-	/* record it as active initially to make sure we didn't miss
-	 * any incoming data while we were setting it up */
-	spin_lock_bh(&o2net_active_lock);
-	o2net_sc_list_add(sc, &o2net_active_list);
-	spin_unlock_bh(&o2net_active_lock);
+	/* do we need more payload? */
+	if (sc->sc_page_off - sizeof(o2net_msg) < hdr->data_len) {
+		/* need more payload */
+		data = page_address(sc->sc_page) + sc->sc_page_off;
+		datalen = (sizeof(o2net_msg) + hdr->data_len) -
+			  sc->sc_page_off;
+		ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
+		if (ret > 0) {
+			made_progress = 1;
+			sc->sc_page_off += ret;
+		}
+		if (ret < 0)
+			goto out;
+	}
+
+	if (sc->sc_page_off - sizeof(o2net_msg) == hdr->data_len) {
+		/* whooo peee, we have a full message */
+		/* after calling this the message is toast */
+		ret = o2net_process_message(sc, hdr);
+		sc->sc_page_off = 0;
+	}
+
 out:
-	sclog(sc, "attaching now node %p returned %d\n", node, ret);
+	sclog(sc, "finished receiving: progress %d ret %d\n", made_progress,
+	      ret);
+	if (made_progress)
+		ret = made_progress;
 	return ret;
 }
 
-static void o2net_state_change(struct sock *sk)
+/* this work func is triggerd by data ready.  it reads until it can
+ * read no more.  if data_ready hits while we're doing our work the work
+ * struct will be marked and we'll be called again. */
+static void o2net_rx_until_empty(void *arg)
 {
-	void (*state_change)(struct sock *sk);
-	struct o2net_sock_container *sc;
-	struct list_head *list = NULL;
+	struct o2net_sock_container *sc = arg;
+	int ret;
 
-	write_lock(&sk->sk_callback_lock);
-	
-	/* we might have raced with the node being torn down and this
-	 * sc being detached from the node */
-	sc = sk->sk_user_data;
-	if (sc == NULL) {
-		state_change = sk->sk_state_change;
-		goto out;
-	}
+	do {
+		ret = o2net_advance_rx(sc);
+	} while (ret > 0);
 
-	sclog(sc, "state_change to %d\n", sk->sk_state);
-
-	state_change = sc->sc_state_change;
-
-	switch(sk->sk_state) {
-		case TCP_SYN_SENT: 
-		case TCP_SYN_RECV: 
-			break;
-		case TCP_ESTABLISHED: 
-			list = &o2net_attach_list;
-			break;
-		default:
-			list = &o2net_detach_list;
-			break;
+	if (ret && ret != -EAGAIN) {
+		struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
+		sclog(sc, "saw error %d, closing\n", ret);
+		o2net_ensure_shutdown(nn, sc);
 	}
 
-	/* if we've finished a connect or have seen an error we let
-	 * the rx thread know so it can act accordingly on our behalf */
-	if (list) {
-		spin_lock(&o2net_active_lock);
-		o2net_sc_list_add(sc, list);
-		spin_unlock(&o2net_active_lock);
-	}
-out:
-	write_unlock(&sk->sk_callback_lock);
-	state_change(sk);
-}	
+	sc_put(sc);
+}
 
-static struct o2net_sock_container *sc_alloc(struct o2nm_node *node,
-					     int from_conn)
+int o2net_set_nodelay(struct socket *sock)
 {
-	struct o2net_sock_container *sc, *ret = NULL;
-	struct page *page = NULL;
+	int ret, opt = 1;
+	mm_segment_t oldfs;
 
-	page = alloc_page(GFP_NOFS);
-	sc = kcalloc(1, sizeof(*sc), GFP_NOFS);
-	if (sc == NULL || page == NULL)
-		goto out;
+	oldfs = get_fs();
+	set_fs(KERNEL_DS);
+	ret = sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY,
+				    (char __user *)&opt, sizeof(opt));
+	set_fs(oldfs);
 
-	kref_init(&sc->sc_kref, sc_kref_release);
-	spin_lock_init(&sc->sc_lock);
-	o2nm_node_get(node);
-	sc->sc_node = node;
-	sc->sc_from_connect = from_conn;
-	sc->sc_pending_connect = from_conn;
+	return ret;
+}
 
-	INIT_LIST_HEAD(&sc->sc_item);
-	INIT_LIST_HEAD(&sc->sc_handlers);
+/* ------------------------------------------------------------ */
 
-	sclog(sc, "alloced\n");
+/* state_change saw this sc go into _ESTABLISHED. */
+static void o2net_sc_connect_completed(void *arg)
+{
+	struct o2net_sock_container *sc = arg;
+	struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
 
-	ret = sc;
-	sc->sc_page = page;
-	o2net_proc_add_sc(sc);
-	sc = NULL;
-	page = NULL;
+	mlog(ML_CONN, "sc %p for node %s connected\n", sc,
+	     sc->sc_node->nd_name);
 
-out:
-	if (page)
-		__free_page(page);
-	kfree(sc);
-
-	return ret;
+	spin_lock(&nn->nn_lock);
+	/* set valid if it hasn't been shutdown already.. */
+	if (nn->nn_sc == sc) {
+		nn->nn_connect_attempts = 0;
+		o2net_set_nn_state(nn, sc, 1, 0);
+	}
+	spin_unlock(&nn->nn_lock);
+	sc_put(sc);
 }
 
-static void o2net_finish_connect(struct o2nm_node *node)
+/* this work func is kicked whenever a path sets the nn state which doesn't
+ * have valid set.  This includes seeing hb come up, losing a connection,
+ * having a connect attempt fail, etc. This centralizes the logic which decides
+ * if a connect attempt should be made or if we should give up and all future
+ * transmit attempts should fail */
+static void o2net_start_connect(void *arg)
 {
-	atomic_dec(&node->nd_pending_connects);
-	atomic_inc(&node->nd_sc_generation);
-	/* don't care about this rare thundering herd */
-	wake_up_all(&node->nd_sc_wq);
-}
-
-static int o2net_start_connect(struct o2nm_node *node, u32 addr, u16 port)
-{
-	struct socket *sock = NULL;
-	struct sockaddr_in myaddr, remoteaddr;
+	struct o2net_node *nn = arg;
 	struct o2net_sock_container *sc = NULL;
-	int ret;
+	struct o2nm_node *node = NULL;
+	struct socket *sock = NULL;
+	struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
+	int ret = 0;
 
-	sc = sc_alloc(node, 1);
+	/* if we're greater we initiate tx, otherwise we accept */
+	if (o2nm_this_node() <= o2net_num_from_nn(nn))
+		goto out;
+
+	/* watch for racing with tearing a node down */
+	node = o2nm_get_node_by_num(o2net_num_from_nn(nn));
+	if (node == NULL) {
+		ret = 0;
+		goto out;
+	}
+
+	spin_lock(&nn->nn_lock);
+	/* see if we already have one pending or have given up */
+	if (nn->nn_sc || nn->nn_persistent_error)
+		arg = NULL;
+	else if (nn->nn_connect_attempts == O2NET_MAX_CONNECT_ATTEMPTS) {
+		mlog(ML_NOTICE, "%u connections to node %u [%s] have failed "
+		     "since it started heartbeating.  No more connections "
+		     "will be initiated.  Network communication attempts will "
+		     "return errors.\n", nn->nn_connect_attempts,
+		     node->nd_num, node->nd_name);
+
+		o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
+		arg = NULL;
+	} else
+		nn->nn_connect_attempts++;
+	spin_unlock(&nn->nn_lock);
+	if (arg == NULL) /* *shrug*, needed some indicator */
+		goto out;
+
+	sc = sc_alloc(node);
 	if (sc == NULL) {
+		mlog(0, "couldn't allocate sc\n");
 		ret = -ENOMEM;
 		goto out;
 	}
@@ -1422,10 +1181,11 @@
 		mlog(0, "can't create socket: %d\n", ret);
 		goto out;
 	}
+	sc->sc_sock = sock; /* freed by sc_kref_release */
 
-	memset(&myaddr, 0, sizeof(myaddr));
 	myaddr.sin_family = AF_INET;
-	myaddr.sin_port = htons(0);  // any port
+	myaddr.sin_port = htons(0); /* any port */
+
 	ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr,
 			      sizeof(myaddr));
 	if (ret) {
@@ -1433,17 +1193,17 @@
 		goto out;
 	}
 	
-	memset(&remoteaddr, 0, sizeof(remoteaddr));
-	remoteaddr.sin_family = AF_INET;
-	remoteaddr.sin_addr.s_addr = addr;
-	remoteaddr.sin_port = port;
+	o2net_set_nodelay(sc->sc_sock);
+	sk_register_callbacks(sc->sc_sock->sk, sc);
 
-	/* from this point on sc_kref_release has the responsibility
-	 * of dropping the socket */
-	sc->sc_sock = sock;
-	sock = NULL;
+	spin_lock(&nn->nn_lock);
+	/* connect completion will set nn->nn_sc_valid */
+	o2net_set_nn_state(nn, sc, 0, 0);
+	spin_unlock(&nn->nn_lock);
 
-	sk_register_callbacks(sc->sc_sock->sk, sc);
+	remoteaddr.sin_family = AF_INET;
+	remoteaddr.sin_addr.s_addr = node->nd_ipv4_address;
+	remoteaddr.sin_port = node->nd_ipv4_port;
 
 	ret = sc->sc_sock->ops->connect(sc->sc_sock,
 					(struct sockaddr *)&remoteaddr, 
@@ -1452,120 +1212,92 @@
 	if (ret == -EINPROGRESS)
 		ret = 0;
 
-	sclog(sc, "->connect gave %d\n", ret);
 out:
-	/* if ret == 0 then the callback has the responsibility of calling
-	 * o2net_finish_connect */
+	mlog(ML_CONN, "finished with %d\n", ret);
 	if (ret) {
-		o2net_finish_connect(node);
-		if (sc) {
-			/* stop detach from trying to finish again */
-			sc->sc_pending_connect = 0;
-			o2net_detach_sc(sc, NULL);
-		}
-		if (sock)
-			sock_release(sock);
+		/* XXX log? */
+		if (sc)
+			o2net_ensure_shutdown(nn, sc);
 	}
 	if (sc)
 		sc_put(sc);
-	return ret;
+	if (node)
+		o2nm_node_put(node);
+
+	return;
 }
 
-static void o2net_sock_drain(struct socket *sock)
+/* ------------------------------------------------------------ */
+
+void o2net_disconnect_node(struct o2nm_node *node)
 {
-	int             len;
-	mm_segment_t    oldfs;
-	static char	junk[PAGE_SIZE];
-	struct iovec iov = {
-		.iov_base = junk,
-		.iov_len = sizeof(junk),
-	};
-	struct msghdr msg = {
-		.msg_iov      = &iov,
-		.msg_iovlen   = 1,
-		.msg_flags    = MSG_DONTWAIT,
-	};
+	struct o2net_node *nn = o2net_nn_from_num(node->nd_num);
 
-	oldfs = get_fs();
-	set_fs(KERNEL_DS);
-	for(len = 1; sock->sk && len > 0; )
-		len = sock_recvmsg(sock, &msg, PAGE_SIZE, MSG_DONTWAIT);
+	/* don't bother trying to connect until it's heartbeating again */ 
+	spin_lock(&nn->nn_lock);
+	nn->nn_connect_attempts = O2NET_MAX_CONNECT_ATTEMPTS;
+	o2net_set_nn_state(nn, NULL, 0, -EINVAL);
+	cancel_delayed_work(&nn->nn_connect_work);
+	spin_unlock(&nn->nn_lock);
 
-	set_fs(oldfs);
+	if (o2net_wq)
+		flush_workqueue(o2net_wq);
 }
 
+static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
+				  void *data)
+{
+	o2net_disconnect_node(node);
+}
 
-/*
- * some tx code path would like to send a message.   For them to do so
- * we need to follow an o2nm_node struct to find an active sock container.
- * If we find a node without a sock container then we try and issue a
- * connect and wait for its outcome.  We only really block waiting for
- * the node to have an active socket.  The rx thread might race with
- * the connect() to accept() a socket that we'll happily use.
- */
-static int o2net_sc_from_node(struct o2nm_node *node,
-			      struct o2net_sock_container **sc_ret)
+static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
+				void *data)
 {
-	struct o2net_sock_container *sc = NULL;
-	int ret = 0, gen = 0, issue_connect = 0;
+	struct o2net_node *nn = o2net_nn_from_num(node_num);
 
-	spin_lock_bh(&node->nd_lock); 
-	if (node->nd_sc) {
-		sc = node->nd_sc;
-		kref_get(&sc->sc_kref);
-	} else {
-		if (atomic_read(&node->nd_pending_connects) == 0) {
-			atomic_inc(&node->nd_pending_connects);
-			issue_connect = 1;
-		}
-		gen = atomic_read(&node->nd_sc_generation);
-	}
-	spin_unlock_bh(&node->nd_lock); 
+	/* ok, we can try to connect again */
+	spin_lock(&nn->nn_lock);
+	nn->nn_connect_attempts = 0;
+	o2net_set_nn_state(nn, NULL, 0, 0);
+	spin_unlock(&nn->nn_lock);
+}
 
-	if (sc)
-		goto out;
+void o2net_unregister_hb_callbacks(void)
+{
+	int ret;
 
-	if (issue_connect) {
-		/* either net_start_connect or the callbacks it registers
-		 * have the responsibility of calling _finish_connect() */
-		ret = o2net_start_connect(node, node->nd_ipv4_address,
-					  node->nd_ipv4_port);
-		if (ret)
-			goto out;
-	}
+	ret = o2hb_unregister_callback(&o2net_hb_up);
+	if (ret < 0)
+		mlog(ML_ERROR, "Status return %d unregistering heartbeat up "
+		     "callback!\n", ret);
 
-	/* wait for nd_sc to change, either our connect finishing or
-	 * an accept arriving */
-	ret = wait_event_interruptible(node->nd_sc_wq,
-				       atomic_read(&node->nd_sc_generation) !=
-				       gen);
-	if (ret)
-		goto out;
+	ret = o2hb_unregister_callback(&o2net_hb_down);
+	if (ret < 0)
+		mlog(ML_ERROR, "Status return %d unregistering heartbeat down "
+		     "callback!\n", ret);
+}
 
-	/* we only wait for one iteration right now */
-	spin_lock_bh(&node->nd_lock); 
-	if (node->nd_sc) {
-		sc = node->nd_sc;
-		kref_get(&sc->sc_kref);
-	} else {
-		ret = -ENOTCONN;
-	}
-	spin_unlock_bh(&node->nd_lock); 
+int o2net_register_hb_callbacks(void)
+{
+	int ret;
 
-out:
-	if (sc) {
-		sclog(sc, "returning for node %s (%u)\n", node->nd_name,
-			 node->nd_num);
-		*sc_ret = sc;
-	}
+	o2hb_setup_callback(&o2net_hb_down, O2HB_NODE_DOWN_CB,
+			    o2net_hb_node_down_cb, NULL, O2NET_HB_PRI);
+	o2hb_setup_callback(&o2net_hb_up, O2HB_NODE_UP_CB,
+			    o2net_hb_node_up_cb, NULL, O2NET_HB_PRI);
 
-	BUG_ON(ret == 0 && sc == NULL);
-	BUG_ON(ret && sc);
-	mlog(0, "sc_get for node %s (%u) gave %d, %p\n", node->nd_name,
-	     node->nd_num, ret, sc);
+	ret = o2hb_register_callback(&o2net_hb_up);
+	if (ret == 0)
+		ret = o2hb_register_callback(&o2net_hb_down);
+
+	if (ret)
+		o2net_unregister_hb_callbacks();
+
 	return ret;
 }
 
+/* ------------------------------------------------------------ */
+
 #ifdef MISSING_SOCK_CREATE_LITE
 static inline int sock_create_lite(int family, int type, int protocol,
 				   struct socket **res)
@@ -1582,13 +1314,14 @@
 }
 #endif /* MISSING_SOCK_CREATE_LITE */
 
-static void o2net_try_accept(struct socket *sock)
+static int o2net_accept_one(struct socket *sock)
 {
 	int ret, slen;
 	struct sockaddr_in sin;
 	struct socket *new_sock = NULL;
 	struct o2nm_node *node = NULL;
 	struct o2net_sock_container *sc = NULL;
+	struct o2net_node *nn;
 
 	BUG_ON(sock == NULL);
 	ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
@@ -1602,21 +1335,58 @@
 	if (ret < 0)
 		goto out;
 
+	o2net_set_nodelay(new_sock);
+
 	slen = sizeof(sin);
 	ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,
 				       &slen, 1);
 	if (ret < 0)
 		goto out;
-	
+
 	node = o2nm_get_node_by_ip(sin.sin_addr.s_addr);
 	if (node == NULL) {
 		mlog(ML_NOTICE, "attempt to connect from unknown node at "
 		     "%u.%u.%u.%u:%d\n", NIPQUAD(sin.sin_addr.s_addr),
 		     ntohs(sin.sin_port));
+		ret = -EINVAL;
 		goto out;
 	}
 
-	sc = sc_alloc(node, 0);
+	if (o2nm_this_node() > node->nd_num) {
+		mlog(ML_NOTICE, "unexpected connect attempted from a lower "
+		     "numbered node '%s' at " "%u.%u.%u.%u:%d with num %u\n",
+		     node->nd_name, NIPQUAD(sin.sin_addr.s_addr), 
+		     ntohs(sin.sin_port), node->nd_num);
+		ret = -EINVAL;
+		goto out;
+	}
+
+	if (!o2hb_check_node_heartbeating(node->nd_num)) {
+		mlog(ML_NOTICE, "attempt to connect from node '%s' at "
+		     "%u.%u.%u.%u:%d but it isn't heartbeating\n",
+		     node->nd_name, NIPQUAD(sin.sin_addr.s_addr), 
+		     ntohs(sin.sin_port));
+		ret = -EINVAL;
+		goto out;
+	}
+
+	nn = o2net_nn_from_num(node->nd_num);
+
+	spin_lock(&nn->nn_lock);
+	if (nn->nn_sc)
+		ret = -EBUSY;
+	else
+		ret = 0;
+	spin_unlock(&nn->nn_lock);
+	if (ret) {
+		mlog(ML_NOTICE, "attempt to connect from node '%s' at "
+		     "%u.%u.%u.%u:%d but it already has an open connection\n",
+		     node->nd_name, NIPQUAD(sin.sin_addr.s_addr), 
+		     ntohs(sin.sin_port));
+		goto out;
+	}
+
+	sc = sc_alloc(node);
 	if (sc == NULL) {
 		ret = -ENOMEM;
 		goto out;
@@ -1625,57 +1395,177 @@
 	sc->sc_sock = new_sock;
 	new_sock = NULL;
 
-	ret = o2net_attach_sc(sc);
+	spin_lock(&nn->nn_lock);
+	o2net_set_nn_state(nn, sc, 1, 0);
+	spin_unlock(&nn->nn_lock);
 
+	sk_register_callbacks(sc->sc_sock->sk, sc);
+	o2net_sc_queue_work(sc, &sc->sc_rx_work);
+
 out:
-	if (ret) {
-		if (new_sock) {
-			o2net_sock_drain(new_sock);
-			sock_release(new_sock);
-		}
-	}
+	if (new_sock)
+		sock_release(new_sock);
 	if (node)
 		o2nm_node_put(node);
 	if (sc)
 		sc_put(sc);
-	return;
+	return ret;
 }
 
-static struct socket *o2net_init_tcp_recv_sock(u16 port)
+static void o2net_accept_many(void *arg)
 {
-	struct sockaddr_in sin;
-	struct socket *sock;
-	int error;
+	struct socket *sock = arg;
+	while (o2net_accept_one(sock) == 0)
+		cond_resched();
+}
 
-	error = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
-	if (error < 0) {
-		mlog(ML_ERROR, "unable to create socket, error=%d\n", error);
-		goto bail;
+static void o2net_listen_data_ready(struct sock *sk, int bytes)
+{
+	void (*ready)(struct sock *sk, int bytes);
+
+	read_lock(&sk->sk_callback_lock);
+	ready = sk->sk_user_data;
+	if (ready == NULL) { /* check for teardown race */
+		ready = sk->sk_data_ready;
+		goto out;
 	}
 
-	memset(&sin, 0, sizeof(sin));
-	sin.sin_family = PF_INET;
-	sin.sin_addr.s_addr = htonl(INADDR_ANY);
-	sin.sin_port = port;
+	/* ->sk_data_ready is also called for a newly established child socket
+	 * before it has been accepted and the acceptor has set up their
+	 * data_ready.. we only want to queue listen work for our listening
+	 * socket */
+	if (sk->sk_state == TCP_LISTEN) {
+		mlog(ML_TCP, "bytes: %d\n", bytes);
+		schedule_work(&o2net_listen_work);
+	}
 
-	error = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
-	if (error < 0) {
-		mlog(ML_ERROR, "unable to bind socket to port %d, error=%d\n", 
-		     ntohs(port), error);
-		goto bail;
+out:
+	read_unlock(&sk->sk_callback_lock);
+	ready(sk, bytes);
+}
+
+static int o2net_open_listening_sock(u16 port)
+{
+	struct socket *sock = NULL;
+	int ret;
+	struct sockaddr_in sin = {
+		.sin_family = PF_INET,
+		.sin_addr.s_addr = htonl(INADDR_ANY),
+		.sin_port = port,
+	};
+
+	ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
+	if (ret < 0) {
+		mlog(ML_ERROR, "unable to create socket, ret=%d\n", ret);
+		goto out;
 	}
 
-	/* !!! dunno about these... */
+	write_lock_bh(&sock->sk->sk_callback_lock);
+	sock->sk->sk_user_data = sock->sk->sk_data_ready;
+	sock->sk->sk_data_ready = o2net_listen_data_ready;
+	write_unlock_bh(&sock->sk->sk_callback_lock);
+
+	o2net_listen_sock = sock;
+	INIT_WORK(&o2net_listen_work, o2net_accept_many, sock);
+
 	sock->sk->sk_reuse = 1;
-	error = sock->ops->listen(sock, 64);
+	ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
+	if (ret < 0) {
+		mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n", 
+		     ntohs(port), ret);
+		goto out;
+	}
 
-bail:
-	if (error) {
-	       if (sock)
+	ret = sock->ops->listen(sock, 64);
+	if (ret < 0) {
+		mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n", 
+		     ntohs(port), ret);
+	}
+
+out:
+	if (ret) {
+		o2net_listen_sock = NULL;
+		if (sock)
 			sock_release(sock);
-	       sock = ERR_PTR(error);
 	}
+	return ret;
+}
 
-	BUG_ON(sock == NULL);
-	return sock;
+/*
+ * called from node manager when we should bring up our network listening
+ * socket.  node manager handles all the serialization to only call this
+ * once and to match it with o2net_stop_listening().
+ */
+int o2net_start_listening(u16 port)
+{
+	int ret = 0;
+
+	BUG_ON(o2net_wq != NULL);
+	BUG_ON(o2net_listen_sock != NULL);
+
+	mlog(ML_KTHREAD, "starting o2net thread...\n");
+	o2net_wq = create_singlethread_workqueue("o2net");
+	if (o2net_wq == NULL) {
+		mlog(ML_ERROR, "unable to launch o2net thread\n");
+		return -ENOMEM; /* ? */
+	}
+
+	ret = o2net_open_listening_sock(port);
+	if (ret) {
+		destroy_workqueue(o2net_wq);
+		o2net_wq = NULL;
+	}
+	return ret;
 }
+
+void o2net_stop_listening(void)
+{
+	struct socket *sock = o2net_listen_sock;
+	size_t i;
+
+	BUG_ON(o2net_wq == NULL);
+	BUG_ON(o2net_listen_sock == NULL);
+
+	/* stop the listening socket from generating work */
+	write_lock_bh(&sock->sk->sk_callback_lock);
+	sock->sk->sk_data_ready = sock->sk->sk_user_data;
+	sock->sk->sk_user_data = NULL;
+	write_unlock_bh(&sock->sk->sk_callback_lock);
+
+	for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {
+		struct o2nm_node *node = o2nm_get_node_by_num(i);
+		if (node) {
+			o2net_disconnect_node(node);
+			o2nm_node_put(node);
+		}
+	}
+	
+	/* finish all work and tear down the work queue */  
+	mlog(ML_KTHREAD, "waiting for o2net thread to exit....\n");
+	destroy_workqueue(o2net_wq);
+	o2net_wq = NULL;
+
+	sock_release(o2net_listen_sock);
+	o2net_listen_sock = NULL;
+}
+
+/* ------------------------------------------------------------ */
+
+int __init o2net_init(void)
+{
+	unsigned long i;
+
+	for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {
+		struct o2net_node *nn = o2net_nn_from_num(i);
+
+		spin_lock_init(&nn->nn_lock);
+		INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn);
+		/* until we see hb from a node we'll return einval */
+		nn->nn_persistent_error = -EINVAL;
+		init_waitqueue_head(&nn->nn_sc_wq);
+		idr_init(&nn->nn_status_idr);
+		INIT_LIST_HEAD(&nn->nn_status_list);
+	}
+
+	return 0;
+}

Modified: trunk/fs/ocfs2/cluster/tcp.h
===================================================================
--- trunk/fs/ocfs2/cluster/tcp.h	2005-06-17 05:57:53 UTC (rev 2403)
+++ trunk/fs/ocfs2/cluster/tcp.h	2005-06-17 19:11:38 UTC (rev 2404)
@@ -134,11 +134,11 @@
 
 int o2net_register_hb_callbacks(void);
 void o2net_unregister_hb_callbacks(void);
-int o2net_start_rx_thread(struct o2nm_node *node);
-void o2net_stop_rx_thread(struct o2nm_node *node);
-struct o2net_sock_container;
-void o2net_detach_sc(struct o2net_sock_container *sc, struct o2nm_node *node);
+int o2net_start_listening(u16 port);
+void o2net_stop_listening(void);
+void o2net_disconnect_node(struct o2nm_node *node);
 
+int __init o2net_init(void);
 int o2net_proc_init(struct proc_dir_entry *parent);
 void o2net_proc_exit(struct proc_dir_entry *parent);
 

Modified: trunk/fs/ocfs2/cluster/tcp_internal.h
===================================================================
--- trunk/fs/ocfs2/cluster/tcp_internal.h	2005-06-17 05:57:53 UTC (rev 2403)
+++ trunk/fs/ocfs2/cluster/tcp_internal.h	2005-06-17 19:11:38 UTC (rev 2404)
@@ -17,27 +17,47 @@
  * License along with this program; if not, write to the
  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  * Boston, MA 021110-1307, USA.
- *
  */
 
 #ifndef O2CLUSTER_TCP_INTERNAL_H
 #define O2CLUSTER_TCP_INTERNAL_H
 
+#define O2NET_MAX_CONNECT_ATTEMPTS	5
+
+struct o2net_node {
+	/* this is never called from int/bh */
+	spinlock_t			nn_lock;
+
+	/* set the moment an sc is allocated and a connect is started */
+	struct o2net_sock_container	*nn_sc;
+	/* _valid is only set after the handshake passes and tx can happen */
+	unsigned			nn_sc_valid:1;
+	/* if this is set tx just returns it */
+	int				nn_persistent_error;
+
+
+	/* threads waiting for an sc to arrive wait on the wq for generation
+	 * to increase.  it is increased when a connecting socket succeeds
+	 * or fails or when an accepted socket is attached. */
+	wait_queue_head_t		nn_sc_wq;
+
+	struct idr			nn_status_idr;
+	struct list_head		nn_status_list;
+
+	struct work_struct		nn_connect_work;
+	unsigned			nn_connect_attempts;
+};
+
 struct o2net_sock_container {
-	/* sockets themselves don't seem to have a nice way to refcount them
-	 * above sock_release.  one could use iget/iput, but that seems
-	 * to interact poorly with sock_release() itself calling iput. */
-
 	struct kref		sc_kref;
-	spinlock_t		sc_lock;
+	/* the next two are vaild for the life time of the sc */
 	struct socket		*sc_sock;
 	struct o2nm_node	*sc_node;
-	unsigned		sc_from_connect:1,
-				sc_pending_connect:1;
 
-	struct list_head	sc_item;
+	struct work_struct	sc_rx_work;
+	struct work_struct	sc_shutdown_work;
+	struct work_struct	sc_connect_work;
 
-	struct list_head	sc_handlers;
 	struct page 		*sc_page;
 	size_t			sc_page_off;
 
@@ -79,10 +99,10 @@
 struct o2net_send_tracking {
 	struct list_head		st_net_proc_item;
 	struct task_struct		*st_task;
-	struct o2nm_node		*st_node;
 	struct o2net_sock_container	*st_sc;
 	u32				st_msg_type;
 	u32				st_msg_key;
+	u8				st_node;
 	struct timeval			st_sock_time;
 	struct timeval			st_send_time;
 	struct timeval			st_status_time;



More information about the Ocfs2-commits mailing list