[Ocfs2-commits] zab commits r2138 - trunk/fs/ocfs2/cluster

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Tue Apr 12 16:08:06 CDT 2005


Author: zab
Signed-off-by: mfasheh
Date: 2005-04-12 16:08:05 -0500 (Tue, 12 Apr 2005)
New Revision: 2138

Modified:
   trunk/fs/ocfs2/cluster/nodemanager.c
   trunk/fs/ocfs2/cluster/nodemanager.h
   trunk/fs/ocfs2/cluster/tcp.c
   trunk/fs/ocfs2/cluster/tcp.h
Log:
Strengthen the lifecycle management of sockets to remote nodes.

o replace 'net_inode_private' with a kref-managed 'sock_container'.  see the
  tcp.c comment for a description of the relationship between nodes and
  sock containers.
o simplify how tranmitters wait for connect to finish and have them satisified
  by an accept that comes in while they're waiting
o deal with racing connects by comparing node numbers to decide whose initiated
  socket gets disconnected
o use the kernel's idr allocator to manage status replies
o wake status waiters when a sock is detached and won't be received from
  again
o factor out sk callback registration and catch errors with state_change,
  not error_report

Signed-off-by: mfasheh



Modified: trunk/fs/ocfs2/cluster/nodemanager.c
===================================================================
--- trunk/fs/ocfs2/cluster/nodemanager.c	2005-04-12 20:45:35 UTC (rev 2137)
+++ trunk/fs/ocfs2/cluster/nodemanager.c	2005-04-12 21:08:05 UTC (rev 2138)
@@ -184,6 +184,12 @@
 }
 EXPORT_SYMBOL(nm_node_put);
 
+void nm_node_get(struct nm_node *node)
+{
+	config_item_get(&node->nd_item);
+}
+EXPORT_SYMBOL(nm_node_get);
+
 u8 nm_this_node(void)
 {
 	u8 node_num = NM_MAX_NODES;
@@ -513,7 +519,6 @@
 {
 	struct nm_node *node = NULL;
 	struct config_item *ret = NULL;
-	net_inode_private *nip;
 
 	if (strlen(name) > NM_MAX_NAME_LEN)
 		goto out; /* ENAMETOOLONG */
@@ -523,21 +528,13 @@
 		goto out; /* ENOMEM */
 
 	strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
-
-	/* this should be somewhere else */
-	nip = &node->nd_net_inode_private;
-	spin_lock_init(&nip->sock_lock);
-	INIT_LIST_HEAD(&nip->pending_waiters);
-	init_waitqueue_head(&nip->waitq);
-	INIT_LIST_HEAD(&nip->handlers);
-	INIT_LIST_HEAD(&nip->active_item);
-	nip->page = alloc_page(GFP_KERNEL);
-	if (nip->page == NULL) {
-		nmprintk("page allocation failed\n");
-		goto out; /* ENOMEM */
-	}
-
 	config_item_init_type_name(&node->nd_item, name, &nm_node_type);
+	spin_lock_init(&node->nd_lock);
+	atomic_set(&node->nd_pending_connects, 0);
+	atomic_set(&node->nd_sc_generation, 0);
+	init_waitqueue_head(&node->nd_sc_wq);
+	idr_init(&node->nd_status_idr);
+	INIT_LIST_HEAD(&node->nd_status_list);
 
 	ret = &node->nd_item;
 
@@ -548,12 +545,13 @@
 	return ret;
 }
 
-static void nm_node_group_drop_item(struct config_group *group, struct config_item *item)
+static void nm_node_group_drop_item(struct config_group *group,
+				    struct config_item *item)
 {
 	struct nm_node *node = to_nm_node(item);
 	struct nm_cluster *cluster = to_nm_cluster(group->cg_item.ci_parent);
 
-	net_stop_node_sock(node);
+	net_detach_sc(NULL, node);
 
 	if (cluster->cl_has_local &&
 	    (cluster->cl_local_node == node->nd_num)) {

Modified: trunk/fs/ocfs2/cluster/nodemanager.h
===================================================================
--- trunk/fs/ocfs2/cluster/nodemanager.h	2005-04-12 20:45:35 UTC (rev 2137)
+++ trunk/fs/ocfs2/cluster/nodemanager.h	2005-04-12 21:08:05 UTC (rev 2138)
@@ -27,53 +27,14 @@
 #ifndef CLUSTER_NODEMANAGER_H
 #define CLUSTER_NODEMANAGER_H
 
-#define NM_ASSERT(x)       ({  if (!(x)) { printk("nm: assert failed! %s:%d\n", __FILE__, __LINE__); BUG(); } })
-
 #include "ocfs2_nodemanager.h"
 
 /* This totally doesn't belong here. */
 #include "configfs.h"
+#include <linux/idr.h>
 
-
-/* TODO: move this */
-/*
- * this stores the per-socket state for each socket that we associate
- * with a node.  for remote nodes this is a socket that is established
- * on demand and trades messages.  For a local node this is just a listening
- * socket that spawns message sockets from other nodes.
- */
-struct sock;
-/* this is still called net_inode_private for hysterical raisins.  one
- * has to draw the cleanup line somewhere.. */
-typedef struct _net_inode_private
-{
-	/* only used by the local node. */
-	struct task_struct	*rx_thread;
-	/* the rest is for remote nodes */
-
-	/* sockets themselves don't seem to have a nice way to refcount them
-	 * above sock_release.  one could use iget/iput, but that seems
-	 * to interact poory with sock_release() itself calling iput. */
-	spinlock_t		sock_lock;
-	struct socket		*sock;
-	unsigned long		sock_refs;
-	unsigned		sock_pending:1, /* wait before using ->sock */
-				defer_release:1; /* sock busted,release soon */
-	struct list_head	pending_waiters;
-	wait_queue_head_t	waitq;
-
-	struct list_head	handlers;
-	struct list_head	active_item;
-	struct page 		*page;
-	size_t			page_off;
-
-
-	void			(*orig_state_change)(struct sock *sk);
-	void                    (*orig_error_report)(struct sock *sk);
-	void			(*orig_data_ready)(struct sock *sk, int bytes);
-} net_inode_private;
-
 struct nm_node {
+	spinlock_t		nd_lock;
 	struct config_item	nd_item; 
 	char			nd_name[NM_MAX_NAME_LEN+1]; /* replace? */
 	__u8			nd_num;
@@ -87,9 +48,20 @@
 
 	unsigned long		nd_set_attributes;
 
-	/* we're making simple assertions that a node can only have one network
-	 * identity and report at one place in a heartbeat */
-	net_inode_private	nd_net_inode_private;
+	/* only used by the local node. */
+	struct task_struct	*nd_rx_thread;
+
+	/* protected by nd_lock.  It isn't so hot that we have all these
+	 * net-private things out here in nodemanager.h. */
+	struct net_sock_container *nd_sc;
+	/* threads waiting for an sc to arrive wait on the wq for generation
+	 * to increase.  it is increased when a connecting socket succeeds
+	 * or fails or when an accepted socket is attached. */
+	atomic_t		nd_pending_connects;
+	atomic_t		nd_sc_generation;
+	wait_queue_head_t	nd_sc_wq;
+	struct idr		nd_status_idr;
+	struct list_head	nd_status_list;
 };
 
 u8 nm_this_node(void);
@@ -97,6 +69,7 @@
 int nm_configured_node_map(unsigned long *map, unsigned bytes);
 struct nm_node * nm_get_node_by_num(u8 node_num);
 struct nm_node * nm_get_node_by_ip(u32 addr);
+void nm_node_get(struct nm_node *node);
 void nm_node_put(struct nm_node *node);
 
 #endif /* CLUSTER_NODEMANAGER_H */

Modified: trunk/fs/ocfs2/cluster/tcp.c
===================================================================
--- trunk/fs/ocfs2/cluster/tcp.c	2005-04-12 20:45:35 UTC (rev 2137)
+++ trunk/fs/ocfs2/cluster/tcp.c	2005-04-12 21:08:05 UTC (rev 2138)
@@ -37,19 +37,24 @@
  * referenced beyond the call.  Handlers may block but are discouraged from
  * doing so.
  *
- * Any framing errors (bad magic, unknown message types, large payload lengths)
- * closes a connection.
+ * Any framing errors (bad magic, large payload lengths) close a connection.
  *
- * struct socket pointers live in the net_inode_private structure off of the
- * node manager inodes.  Only one socket is active under an inode at a time and
- * callers refcount the socket in members of n_i_p.  While a connect() is
- * pending transmitters will block on the connect state machine until it
- * completes.  tx and rx get references to the socket while they're doing their
- * work.  If they see an error they mark the socket for release at the next
- * final decref.  sk_error_report doesn't get a reference, it just marks the
- * socket for release and kicks the rx thread.  This means that new references
- * can get the known-dead socket and see errors.
+ * Our sock_container holds the state we associate with a socket.  It's current
+ * framing state is held there as well as the refcounting we do around when
+ * it is safe to tear down the socket.
+ * 
+ * A caller who wants to communicate with a node gets a ref to the sock
+ * container by finding it the nm_node that it wants to communicate with.
+ * The sock container will only be found on the node once there is a valid
+ * socket in the container.  The socket is only finally torn down from
+ * the container when the container loses all of its references -- so as
+ * long as you hold a ref on the container you can trust that the socket
+ * is valid for use with kernel socket APIs.
  *
+ * Usually blocking paths get and drop refs on the container, which is easy.
+ * The exception to this are the socket callbacks.  They push their work
+ * back up to the rx thread who takes it from there. 
+ *
  * One can imagine the direction a more sophisticated API would head in:
  * (there are certainly a half dozen examples in the kernel)
  *   * tx
@@ -63,22 +68,14 @@
  *      - handers must be callable from bh context
  * but it really depends on what the semantics and messages are.
  *
- * asap
- * 	- only have lookup succeed for active nodes (fully configured)
- * 	- only initiate connections if rx thread is running?
- * 	- don't allow node rmdir if it has socket and rx thread is running
+ * XXX
  * 	- tear down all node sockets on rx thread exit
  * 	- have rx thread stop active tx and wait for them
- * 	- make sure ->net.page gets torn down with net_inode_private
- * 	- tear down sockets on exit.. via removing their inodes?
- *
- * XXX
+ * 	- don't allow node rmdir if it has socket and rx thread is running
  * 	- disable preemt before calling rx handler when debugging
  * 	- find explicit stack call to drain rx queue
  * 	- add trivial version trading message at the start of a conn
  * 	- go nuts adding static
- * 	- nsc waiting is buggy, should be on socket.. wake w/err if socket dies
- * 	- compare socks in attach_sock so both size don't close
  */
 
 #include <linux/module.h>
@@ -114,10 +111,12 @@
 #define netprintk0(x)           
 #define msgprintk(hdr, fmt, args...)
 #define msgprintk0(hdr, fmt)
+#define scprintk(sc, fmt, args...)
+#define scprintk0(sc, fmt)
 #else
 #define netprintk(x, arg...)    printk("(tcp:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__, ##arg)
 #define netprintk0(x)           printk("(tcp:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__)
-/* yeah, a little gross, but it gets the job done */
+
 #define __msg_fmt "[mag %u len %u typ %u status %d key %u num %u] "
 #define __msg_args __hdr->magic, __hdr->data_len, __hdr->msg_type, 	\
  	__hdr->status,	__hdr->key, __hdr->msg_num
@@ -129,6 +128,22 @@
 	typeof(hdr) __hdr = (hdr);					\
 	printk(__msg_fmt fmt, __msg_args);				\
 } while (0)
+
+#define __sc_fmt 							\
+	"[sc %p refs %d sock %p node %p from_con %u item_on %d "	\
+	"page %p pg_off %zu] "
+#define __sc_args __sc, atomic_read(&__sc->sc_kref.refcount), 		\
+	__sc->sc_sock,	__sc->sc_node, __sc->sc_from_connect, 		\
+	!list_empty(&__sc->sc_item), __sc->sc_page, __sc->sc_page_off
+#define scprintk(sc, fmt, args...) do {					\
+	typeof(sc) __sc = (sc);						\
+	printk(__sc_fmt fmt, __sc_args, args);				\
+} while (0)
+#define scprintk0(sc, fmt) do {						\
+	typeof(sc) __sc = (sc);						\
+	printk(__sc_fmt fmt, __sc_args);				\
+} while (0)
+
 #endif
 
 /* let's only pollute this unit with these ridiculous definitions */
@@ -143,10 +158,32 @@
 #define sk_callback_lock	callback_lock
 #define sk_user_data		user_data
 #define sk_data_ready		data_ready
-#define sk_error_report		error_report
 #define sk_state_change		state_change
 #endif
 
+struct net_sock_container {
+	/* sockets themselves don't seem to have a nice way to refcount them
+	 * above sock_release.  one could use iget/iput, but that seems
+	 * to interact poorly with sock_release() itself calling iput. */
+
+	struct kref		sc_kref;
+	spinlock_t		sc_lock;
+	struct socket		*sc_sock;
+	struct nm_node		*sc_node;
+	unsigned		sc_from_connect:1,
+				sc_pending_connect:1;
+
+	struct list_head	sc_item;
+
+	struct list_head	sc_handlers;
+	struct page 		*sc_page;
+	size_t			sc_page_off;
+
+	/* original handlers for the sockets */
+	void			(*sc_state_change)(struct sock *sk);
+	void			(*sc_data_ready)(struct sock *sk, int bytes);
+};
+
 struct net_msg_handler {
 	struct rb_node		nh_node;
 	u32			nh_max_len;
@@ -158,59 +195,51 @@
 	struct list_head	nh_unregister_item;
 };
 
-typedef struct _net_status_ctxt {
-	u8 target_node;
-	struct list_head list;
-	enum net_system_error sys_status;
-	s32 status;
-	u64 msg_num;
-	wait_queue_head_t wq;
-	atomic_t woken;
-} net_status_ctxt;
+struct net_status_wait {
+	enum net_system_error	ns_sys_status;
+	s32			ns_status;
+	int			ns_id;
+	wait_queue_head_t	ns_wq;
+	struct list_head	ns_node_item;
+};
 
-
-/* all this state should eventually be brought up by object activation
- * and tied to that object rather than being globally valid at insmod */
-static spinlock_t net_status_lock = SPIN_LOCK_UNLOCKED;
-static LIST_HEAD(net_status_list);
-
 static rwlock_t net_handler_lock = RW_LOCK_UNLOCKED;
 static struct rb_root net_handler_tree = RB_ROOT;
 
 /* this lock is also grabbed from bh context, non-bh use _bh() locking */
 static spinlock_t net_active_lock = SPIN_LOCK_UNLOCKED;
 static LIST_HEAD(net_active_list);
+static LIST_HEAD(net_detach_list);
+static LIST_HEAD(net_attach_list);
 
 /* XXX someday we'll need better accounting */
 static struct task_struct *net_recv_task = NULL;
 
-static inline void net_abort_status_return(net_status_ctxt *nsc)
-{
-	spin_lock(&net_status_lock);
-	if (!list_empty(&nsc->list))
-		list_del_init(&nsc->list);
-	spin_unlock(&net_status_lock);
-}
-
 /////////////////////
 static struct socket *net_init_tcp_recv_sock(u16 port);
 static int net_receive_thread(void *data);
 static int net_receive(void);
 static void net_try_accept(struct socket *sock);
-static int net_process_message(struct socket *sock, net_msg *hdr);
+static int net_process_message(struct nm_node *node, struct socket *sock,
+			       net_msg *hdr);
 
-static int net_sock_addref_or_connect(u8 node_num, struct socket **sock_ret);
-static void net_sock_decref(struct socket *sock, int error);
+static void net_data_ready(struct sock *sk, int bytes);
+static int net_sc_from_node(struct nm_node *node,
+			    struct net_sock_container **sc_ret);
+static int net_attach_sc(struct net_sock_container *sc);
+static void net_finish_connect(struct nm_node *node);
+static struct net_sock_container *sc_alloc(struct nm_node *node, int from_conn);
+static void net_state_change(struct sock *sk);
+static void net_complete_nodes_nsw(struct nm_node *node);
 
 //////////////////////
 
 int net_start_rx_thread(struct nm_node *node)
 {
 	struct socket *sock;
-	net_inode_private *net = &node->nd_net_inode_private;
 	int ret = 0;
 
-	BUG_ON(net->rx_thread != NULL);
+	BUG_ON(node->nd_rx_thread != NULL);
 	BUG_ON(net_recv_task != NULL);
 
 	/* if the thread was setting up the rx socket we'd like to have it
@@ -219,16 +248,17 @@
 	sock = net_init_tcp_recv_sock(node->nd_ipv4_port);
 	if (IS_ERR(sock)) {
 		ret = PTR_ERR(sock);
+		sock = NULL;
 		goto out;
 	}
 
 	netprintk0("starting net receive thread...\n");
 
-	net->rx_thread = kthread_run(net_receive_thread, sock,
+	node->nd_rx_thread = kthread_run(net_receive_thread, sock,
 				     "netrecv-%s", node->nd_name);
-	if (IS_ERR(net->rx_thread)) {
-		ret = PTR_ERR(net->rx_thread);
-		net->rx_thread = NULL;
+	if (IS_ERR(node->nd_rx_thread)) {
+		ret = PTR_ERR(node->nd_rx_thread);
+		node->nd_rx_thread = NULL;
 		netprintk("unable to launch net receive thread, error=%ld\n",
 			  (long)ret);
 		goto out;
@@ -236,7 +266,7 @@
 
 	/* once the thread is running it has ownership of the sock */
 	sock = NULL;
-	net_recv_task = net->rx_thread;
+	net_recv_task = node->nd_rx_thread;
 
 out:
 	if (sock)
@@ -246,11 +276,10 @@
 
 void net_stop_rx_thread(struct nm_node *node)
 {
-	net_inode_private *net = &node->nd_net_inode_private;
-	if (net->rx_thread) {
+	if (node->nd_rx_thread) {
 		netprintk("waiting for net thread to exit....\n");
-		kthread_stop(net->rx_thread);
-		net->rx_thread = NULL;
+		kthread_stop(node->nd_rx_thread);
+		node->nd_rx_thread = NULL;
 		net_recv_task = NULL;
 	}
 
@@ -261,36 +290,196 @@
 	 * references.. ugh. */
 }
 
-void net_stop_node_sock(struct nm_node *node)
+/*
+ * we must be detached at this point because the state detach tears down
+ * holds a ref.. this could well be called from detach. 
+ */
+static void sc_kref_release(struct kref *kref)
 {
-	net_inode_private *net = &node->nd_net_inode_private;
-	struct socket *sock = NULL;
+	struct net_sock_container *sc = container_of(kref,
+					struct net_sock_container, sc_kref);
+	scprintk0(sc, "releasing\n");
+	if (sc->sc_sock) {
+		sock_release(sc->sc_sock);
+		sc->sc_sock = NULL;
+	}
+}
 
-	/* make sure this deals with all aspects of the net->sock
-	 * life time */
-	spin_lock_bh(&net->sock_lock);
-	if (net->sock) {
-		sock = net->sock;
-		net->sock_refs++;
+static void sc_put(struct net_sock_container *sc)
+{
+	scprintk0(sc, "put\n");
+	kref_put(&sc->sc_kref, sc_kref_release);
+}
+
+static void sk_register_callbacks(struct sock *sk,
+			          struct net_sock_container *sc)
+{
+	write_lock_bh(&sk->sk_callback_lock);
+	if (sk->sk_user_data != sc) {
+		sk->sk_user_data = sc;
+		kref_get(&sc->sc_kref);
+		sc->sc_data_ready = sk->sk_data_ready;
+		sc->sc_state_change = sk->sk_state_change;
+		sk->sk_data_ready = net_data_ready;
+		sk->sk_state_change = net_state_change;
 	}
-	spin_unlock_bh(&net->sock_lock);
-	if (sock) {
-		printk("shutting down sock %p\n", sock);
-		net_sock_decref(sock, -ESHUTDOWN);
+	write_unlock_bh(&sk->sk_callback_lock);
+}
+
+static int sk_unregister_callbacks(struct sock *sk,
+			           struct net_sock_container *sc)
+{
+	int ret = 0;
+
+	write_lock_bh(&sk->sk_callback_lock);
+	if (sk->sk_user_data == sc) {
+		ret = 1;
+		sk->sk_user_data = NULL;
+		sk->sk_data_ready = sc->sc_data_ready;
+		sk->sk_state_change = sc->sc_state_change;
 	}
-	if (net->page) {
-		__free_page(net->page);
-		net->page = NULL;
+	write_unlock_bh(&sk->sk_callback_lock);
+
+	return ret;
+}
+
+/*
+ * When a sock_container is detached it will no longer see traffic once
+ * its current users are done.  The sc is removed from the node so
+ * transmitting tasks can't find it.  It is removed from the active lists
+ * and its callbacks are unregistered so that the receive thread can't
+ * find it.  After this point the last person to drop their kref will
+ * free the container.
+ *
+ * If an sc is specified the caller must hold a reference.  A node can be
+ * specified instead of an sc such that any sc currently attached to the
+ * node will be detached. 
+ */
+void net_detach_sc(struct net_sock_container *sc, struct nm_node *node)
+{
+	int nr_puts = 0;
+	struct sock *sk;
+
+	/* node is only used to get the sc if it isn't specified */
+	BUG_ON(sc && node);
+	BUG_ON(sc == NULL && node == NULL);
+
+	/* no sc, get it from the node */
+	if (sc == NULL) {
+		spin_lock_bh(&node->nd_lock);
+		if (node->nd_sc) {
+			sc = node->nd_sc;
+			node->nd_sc = NULL;
+			net_complete_nodes_nsw(node);
+			/* we have this ref now */
+			nr_puts++;
+		}
+		spin_unlock_bh(&node->nd_lock);
+		node = NULL;
 	}
+
+	if (sc == NULL)
+		goto out;
+
+	scprintk(sc, "detaching with node %p\n", node);
+
+	spin_lock_bh(&sc->sc_lock);
+	if (sc->sc_node) {
+		/* drop our ref to the node below */
+		node = sc->sc_node;
+		sc->sc_node = NULL;
+		/* resolve our pending connect under sc_lock and while
+		 * we have a node ref.  this is done in the error case
+		 * where state_change puts us on the detach list. */
+		if (sc->sc_pending_connect) {
+			sc->sc_pending_connect = 0;
+			net_finish_connect(node);
+		}
+	}
+	if (!list_empty(&sc->sc_item)) {
+		list_del_init(&sc->sc_item);
+		nr_puts++;
+	}
+	spin_unlock_bh(&sc->sc_lock);
+
+	if (node) {
+		spin_lock_bh(&node->nd_lock);
+		/* if the node is still pointing to us, drop that */
+		if (node->nd_sc == sc) {
+			printk(KERN_NOTICE "ocfs2:tcp: no longer connected to "
+			       "node %s at %u.%u.%u.%u:%d\n", node->nd_name,
+			       NIPQUAD(node->nd_ipv4_address), 
+			       ntohs(node->nd_ipv4_port));
+			node->nd_sc = NULL;
+			net_complete_nodes_nsw(node);
+			nr_puts++;
+		}
+		spin_unlock_bh(&node->nd_lock);
+		nm_node_put(node);
+		node = NULL;
+	}
+
+	/* drop user_data's ref and tear down the callbacks.  the
+	 * callbacks themselves execute under the callback_lock so
+	 * this serializes with them. */
+	sk = sc->sc_sock->sk;
+	if (sk_unregister_callbacks(sk, sc))
+		nr_puts++;
+
+out:
+	if (sc) {
+		scprintk(sc, "detach droping %d refs\n", nr_puts);
+		while(nr_puts--)
+			sc_put(sc);
+	}
 }
+EXPORT_SYMBOL(net_detach_sc);
 
+static void net_check_cb_lists(void)
+{
+	struct net_sock_container *sc;
+	/* when we get the sc off the list we inherit the ref that was
+	 * created when the sc was put on the list */
+	spin_lock_bh(&net_active_lock);
+	while (!list_empty(&net_attach_list)) {
+		sc = list_entry(net_attach_list.next,
+				struct net_sock_container, sc_item);
 
+		list_del_init(&sc->sc_item);
+		spin_unlock_bh(&net_active_lock);
+
+		scprintk0(sc, "found on connect list\n");
+
+		net_attach_sc(sc);
+		sc_put(sc);
+
+		spin_lock_bh(&net_active_lock);
+	}
+
+	while (!list_empty(&net_detach_list)) {
+		sc = list_entry(net_detach_list.next,
+				struct net_sock_container, sc_item);
+
+		list_del_init(&sc->sc_item);
+		spin_unlock_bh(&net_active_lock);
+
+		scprintk0(sc, "found on detach list\n");
+
+		net_detach_sc(sc, NULL);
+		sc_put(sc);
+
+		spin_lock_bh(&net_active_lock);
+	}
+	spin_unlock_bh(&net_active_lock);
+}
+
 static int net_rx_should_wake(struct socket *sock)
 {
 	int empty;
 
 	spin_lock_bh(&net_active_lock);
-	empty = list_empty(&net_active_list);
+	empty = list_empty(&net_active_list) && list_empty(&net_detach_list) &&
+		list_empty(&net_attach_list);
 	spin_unlock_bh(&net_active_lock);
 
 	return !empty || tcp_sk(sock->sk)->accept_queue;
@@ -304,6 +493,7 @@
 
        	while(!kthread_should_stop()) {
 		net_try_accept(sock);
+		net_check_cb_lists();
 		net_receive();
 
 		wait_event_interruptible(*sock->sk->sk_sleep,
@@ -521,35 +711,119 @@
 	return ret;
 }
 
-static u64 net_next_msg_num(void)
+static int net_prep_nsw(struct nm_node *node, struct net_status_wait *nsw)
 {
-	static spinlock_t net_msg_num_lock = SPIN_LOCK_UNLOCKED;
-	static u64 net_msg_num = 1;
-	u64 ret;
+	int ret = 0;
 
-	spin_lock(&net_msg_num_lock);
-	ret = net_msg_num++;
-	spin_unlock(&net_msg_num_lock);
+	do { 
+		if (!idr_pre_get(&node->nd_status_idr, GFP_NOFS)) {
+			ret = -EAGAIN;
+			break;
+		}
+		spin_lock_bh(&node->nd_lock);
+		ret = idr_get_new(&node->nd_status_idr, nsw, &nsw->ns_id);
+		if (ret == 0)
+			list_add_tail(&nsw->ns_node_item,
+				      &node->nd_status_list);
+		spin_unlock_bh(&node->nd_lock);
+	} while (ret == -EAGAIN);
 
+	if (ret == 0)  {
+		init_waitqueue_head(&nsw->ns_wq);
+		nsw->ns_sys_status = NET_ERR_NONE;
+		nsw->ns_status = 0;
+	}
+
 	return ret;
 }
 
+static void net_complete_nsw_locked(struct nm_node *node,
+				    struct net_status_wait *nsw,
+				    enum net_system_error sys_status,
+				    s32 status)
+{
+	assert_spin_locked(&node->nd_lock);
+
+	if (!list_empty(&nsw->ns_node_item)) {
+		list_del_init(&nsw->ns_node_item);
+		nsw->ns_sys_status = sys_status;
+		nsw->ns_status = status;
+		idr_remove(&node->nd_status_idr, nsw->ns_id);
+		wake_up(&nsw->ns_wq);
+	}
+}
+
+static void net_complete_nsw(struct nm_node *node, struct net_status_wait *nsw,
+			     u64 id, enum net_system_error sys_status, 
+			     s32 status)
+{
+	spin_lock_bh(&node->nd_lock);
+	if (nsw == NULL) {
+		if (id > INT_MAX)
+			goto out;
+
+		nsw = idr_find(&node->nd_status_idr, id);
+		if (nsw == NULL)
+			goto out;
+	}
+
+	net_complete_nsw_locked(node, nsw, sys_status, status);
+
+out:
+	spin_unlock_bh(&node->nd_lock);
+	return;
+}
+
+static void net_complete_nodes_nsw(struct nm_node *node)
+{
+	struct list_head *iter, *tmp;
+	unsigned int num_kills = 0;
+	struct net_status_wait *nsw;
+
+	assert_spin_locked(&node->nd_lock);
+
+	list_for_each_safe(iter, tmp, &node->nd_status_list) {
+		nsw = list_entry(iter, struct net_status_wait, ns_node_item);
+		net_complete_nsw_locked(node, nsw, NET_ERR_DIED, 0);
+		num_kills++;
+	}
+	
+	netprintk("node %s (%u) died, killed %d messages\n", node->nd_name,
+		  node->nd_num, num_kills);
+}
+
+static int net_nsw_completed(struct nm_node * node,
+			     struct net_status_wait *nsw)
+{
+	int completed;
+	spin_lock_bh(&node->nd_lock);
+	completed = list_empty(&nsw->ns_node_item);
+	spin_unlock_bh(&node->nd_lock);
+	return completed;
+}
+
 int net_send_message_iov(u32 msg_type, u32 key, struct iovec *caller_iov,
 			 size_t caller_iovlen, u8 target_node,
 			 int *status)
 {
 	int ret;
-	int cleanup_wq = 0;
-	int cleanup_sock = 1;
 	net_msg *msg = NULL;
-	net_status_ctxt nsc;
-	wait_queue_t sleep;
 	size_t i, iovlen, caller_bytes = 0;
 	struct iovec *iov = NULL;
-	struct socket *sock = NULL;
+	struct net_sock_container *sc = NULL;
+	struct nm_node *node = NULL;
+	struct net_status_wait nsw = {
+		.ns_node_item = LIST_HEAD_INIT(nsw.ns_node_item),
+	};
 
 	BUG_ON(net_recv_task && (current == net_recv_task));
 
+	if (net_recv_task == NULL) {
+		netprintk0("attempt to tx without a setup rx thread\n");
+		ret = -ESRCH;
+		goto out;
+	}
+
 	if (caller_iovlen == 0) {
 		netprintk0("bad iovec array length\n");
 		ret = -EINVAL;
@@ -565,12 +839,22 @@
 		goto out;
 	}
 
-	ret = net_sock_addref_or_connect(target_node, &sock);
+	if (target_node == nm_this_node()) {
+		ret = -ELOOP;
+		goto out;
+	}
+
+	node = nm_get_node_by_num(target_node);
+	if (node == NULL) {
+		netprintk("node %u unknown\n", target_node);
+		ret = -EINVAL;
+		goto out;
+	}
+
+	ret = net_sc_from_node(node, &sc);
 	if (ret)
 		goto out;
 
-	netprintk0("returned from net_sock_addref_or_connect, building msg\n");
-
 	/* build up our iovec */
 	iovlen = caller_iovlen + 1;
 	iov = kmalloc(sizeof(struct iovec) * iovlen, GFP_KERNEL);
@@ -593,66 +877,52 @@
 	msg->sys_status = NET_ERR_NONE;
 	msg->status = 0;
 	msg->key = key;
-	msg->msg_num = net_next_msg_num();
 
 	iov[0].iov_len = sizeof(net_msg);
 	iov[0].iov_base = msg;
 	memcpy(&iov[1], caller_iov, caller_iovlen * sizeof(struct iovec));
 
-	/* Setup for status return wait */
-	init_waitqueue_head(&nsc.wq);
-	atomic_set(&nsc.woken, 0);
-	nsc.msg_num = msg->msg_num;
-	nsc.sys_status = NET_ERR_NONE;
-	nsc.status = 0;
-	nsc.target_node = target_node;
+	ret = net_prep_nsw(node, &nsw);
+	if (ret)
+		goto out;
 
-	init_waitqueue_entry(&sleep, current);
-	add_wait_queue(&nsc.wq, &sleep);
-	cleanup_wq = 1;
+	msg->msg_num = nsw.ns_id;
 
-	spin_lock(&net_status_lock);
-	list_add_tail(&nsc.list, &net_status_list);
-	spin_unlock(&net_status_lock);
-
 	/* finally, convert the message header to network byte-order
 	 * and send */
 	net_msg_to_net(msg);
-	ret = net_send_tcp_msg(sock, iov, iovlen,
+	ret = net_send_tcp_msg(sc->sc_sock, iov, iovlen,
 			       sizeof(net_msg) + caller_bytes);
 	net_msg_to_host(msg);  /* just swapping for printk, its unused now */
 	msgprintk(msg, "sending returned %d\n", ret);
 	if (ret < 0) {
-		net_abort_status_return(&nsc);
 		netprintk("error returned from net_send_tcp_msg=%d\n", ret);
 		goto out;
 	}
 
 	/* wait on other node's handler */
-	wait_event(nsc.wq, (atomic_read(&nsc.woken) == 1));
+	wait_event(nsw.ns_wq, net_nsw_completed(node, &nsw));
 
 	/* Note that we avoid overwriting the callers status return
 	 * variable if a system error was reported on the other
 	 * side. Callers beware. */
-	ret = net_sys_err_to_errno(nsc.sys_status);
+	ret = net_sys_err_to_errno(nsw.ns_sys_status);
 	if (status && !ret)
-		*status = nsc.status;
+		*status = nsw.ns_status;
 
 	netprintk("woken, returning system status %d, user status %d",
-		  ret, nsc.status);
-
-	/* At this point only destroy the socket if the other node died. */
-	if (ret != -EHOSTDOWN)
-		cleanup_sock = 0;
+		  ret, nsw.ns_status);
 out:
-	if (cleanup_wq)
-		remove_wait_queue(&nsc.wq, &sleep);
-	if (sock)
-		net_sock_decref(sock, cleanup_sock);
+	if (sc)
+		sc_put(sc);
 	if (iov)
 		kfree(iov);
 	if (msg)
 		kfree(msg);
+	if (node) {
+		net_complete_nsw(node, &nsw, 0, 0, 0);
+		nm_node_put(node);
+	}
 	return ret;
 }
 EXPORT_SYMBOL(net_send_message_iov);
@@ -708,45 +978,21 @@
 	return net_send_tcp_msg(sock, &iov, 1, sizeof(net_msg));
 }
 
-static inline int net_is_valid_error_type(u32 err_type)
+/* a callback would like the rx thread to do some work on its behalf
+ * in process context.. the callback has a ref from sk->user_data, 
+ * we grab a ref as we put the sc on the list that the rx thread will
+ * inherit when it removes from the list */
+static void net_sc_list_add(struct net_sock_container *sc,
+			    struct list_head *list)
 {
-	if (err_type == NET_ALREADY_CONNECTED ||
-	    err_type == NET_UNKNOWN_HOST)
-		return 1;
-	return 0;
-}
+	assert_spin_locked(&net_active_lock);
 
-static void net_send_error(struct socket *sock, u16 err_type)
-{
-	net_msg hdr = {
-		.magic        = NET_MSG_MAGIC,
-		.msg_type     = err_type,
-		.data_len     = 0,
-	};
-	struct iovec iov = {
-		.iov_base = &hdr,
-		.iov_len = sizeof(hdr),
-	};
-
-	if (!net_is_valid_error_type(err_type)) {
-		netprintk("bug! bad error type! %u\n", err_type);
-		goto out;
+	if (list_empty(&sc->sc_item)) {
+		scprintk(sc, "adding to list %p\n", list);
+		kref_get(&sc->sc_kref);
+		list_add_tail(&sc->sc_item, list);
 	}
 
-	msgprintk(&hdr, "about to send error %u\n", err_type);
-	net_msg_to_net(&hdr);
-	net_send_tcp_msg(sock, &iov, 1, sizeof(net_msg));
-out:
-        return;
-}
-
-static void net_make_active(net_inode_private *net)
-{
-	assert_spin_locked(&net_active_lock);
-
-	if (list_empty(&net->active_item))
-		list_add_tail(&net->active_item, &net_active_list);
-
 	if (net_recv_task)
 		wake_up_process(net_recv_task);
 }
@@ -757,114 +1003,83 @@
 static void net_data_ready(struct sock *sk, int bytes)
 {
 	void (*ready)(struct sock *sk, int bytes);
-	net_inode_private *net;
+	struct net_sock_container *sc;
 
 	read_lock(&sk->sk_callback_lock);
-	net = sk->sk_user_data;
-	if (net == NULL) {
+	sc = sk->sk_user_data;
+	if (sc == NULL) {
 		ready = sk->sk_data_ready;
 		goto out;
 	}
 
-	netprintk("data_ready hit for net %p\n", net);
+	scprintk0(sc, "data_ready hit\n");
 
 	spin_lock(&net_active_lock);
-	net_make_active(net);
+	net_sc_list_add(sc, &net_active_list);
 	spin_unlock(&net_active_lock);
 
-	ready = net->orig_data_ready;
+	ready = sc->sc_data_ready;
 out:
 	read_unlock(&sk->sk_callback_lock);
 	ready(sk, bytes);
 
 }
-static void net_error_report(struct sock *sk)
-{
-	void (*report)(struct sock *sk);
-	net_inode_private *net;
 
-	read_lock(&sk->sk_callback_lock);
-	net = sk->sk_user_data;
-	if (net == NULL) {
-		report = sk->sk_error_report;
-		goto out;
-	}
-
-	netprintk("error_report hit for net %p\n", net);
-
-	spin_lock(&net_active_lock);
-	net_make_active(net);
-	spin_unlock(&net_active_lock);
-
-	report = net->orig_error_report;
-out:
-	read_unlock(&sk->sk_callback_lock);
-	report(sk);
-}
-
+/* 
+ * sk callbacks put sockets with activity on a list and we pluck off the 
+ * them and call into the stack.  we get the ref from their entry on
+ * the list.
+ */
 static int net_receive(void)
 {
-	LIST_HEAD(snapshot_list);
-	net_inode_private *net;
-	struct socket *sock;
+	struct net_sock_container *sc;
 	net_msg *hdr;
 	int err = 0, read_eagain, read_some;
 	void *data;
 	size_t datalen;
+	struct nm_node *node = NULL;
 
-	/* process in batches so that the receive thread gets
-	 * a chance to accept new sockets now and again */
 	spin_lock_bh(&net_active_lock);
-	list_splice_init(&net_active_list, &snapshot_list);
-	spin_unlock_bh(&net_active_lock);
+	while (!list_empty(&net_active_list)) {
+		sc = list_entry(net_active_list.next,
+				struct net_sock_container, sc_item);
 
-	/* we don't need locks to test our list because we're the
-	 * only people who remove active_items from lists */
-	while (!list_empty(&snapshot_list)) {
-		net = list_entry(snapshot_list.next, net_inode_private,
-				 active_item);
-
-		/* remove the net from the active list so that data_ready
-		 * can put it back on if it hits just after we read */
-		spin_lock_bh(&net_active_lock);
-		list_del_init(&net->active_item);
+		/* we now have the ref that adding created */
+		list_del_init(&sc->sc_item);
 		spin_unlock_bh(&net_active_lock);
 
-		sock = NULL;
+		scprintk0(sc, "found on active list\n");
 
 		err = 0;
 		read_eagain = 0;
 		read_some = 0;
 
-		/* basically a manual addref that doesn't connect :/ */
-		spin_lock_bh(&net->sock_lock);
-		if (net->sock && !net->sock_pending) {
-			sock = net->sock;
-			net->sock_refs++;
-			if (net->defer_release)
-				err = -ENOTCONN;
+		/* catch a race with detach or get a ref on the node
+		 * so we can process status messages */
+		spin_lock_bh(&sc->sc_lock);
+		if (sc->sc_node == NULL)
+			err = -ENOTCONN;
+		else {
+			nm_node_get(sc->sc_node);
+			node = sc->sc_node;
 		}
-		spin_unlock_bh(&net->sock_lock);
-
-		if (sock == NULL)
-			continue;
-
+		spin_unlock_bh(&sc->sc_lock);
 		if (err)
 			goto done;
 
 		/* do we need more header? */
-		if (net->page_off < sizeof(net_msg)) {
-			data = page_address(net->page) + net->page_off;
-			datalen = sizeof(net_msg) - net->page_off;
-			err = net_recv_tcp_msg(sock, data, datalen);
+		if (sc->sc_page_off < sizeof(net_msg)) {
+			data = page_address(sc->sc_page) + sc->sc_page_off;
+			datalen = sizeof(net_msg) - sc->sc_page_off;
+			err = net_recv_tcp_msg(sc->sc_sock, data, datalen);
 			if (err > 0) {
-				net->page_off += err;
+				sc->sc_page_off += err;
 				read_some = 1;
 				/* only swab incoming here.. we can
 				 * only get here once as we cross from
 				 * being under to over */
-				if (net->page_off == sizeof(net_msg)) {
-					hdr = page_address(net->page);
+				if (sc->sc_page_off == sizeof(net_msg)) {
+					hdr = page_address(sc->sc_page);
 					net_msg_to_host(hdr);
 					if (hdr->data_len > NET_MAX_PAYLOAD_BYTES)
 						err = -EOVERFLOW;
@@ -877,26 +1092,26 @@
 			}
 		}
 
-		if (net->page_off < sizeof(net_msg)) {
+		if (sc->sc_page_off < sizeof(net_msg)) {
 			/* oof, still don't have a header */
 			goto done;
 		}
 
 		/* this was swabbed above when we first read it */
-		hdr = page_address(net->page);
+		hdr = page_address(sc->sc_page);
 
-		msgprintk(hdr, "at page_off %zu\n", net->page_off);
+		msgprintk(hdr, "at page_off %zu\n", sc->sc_page_off);
 
 		/* do we need more payload? */
-		if (net->page_off - sizeof(net_msg) < hdr->data_len) {
+		if (sc->sc_page_off - sizeof(net_msg) < hdr->data_len) {
 			/* need more payload */
-			data = page_address(net->page) + net->page_off;
+			data = page_address(sc->sc_page) + sc->sc_page_off;
 			datalen = (sizeof(net_msg) + hdr->data_len) -
-				  net->page_off;
-			err = net_recv_tcp_msg(sock, data, datalen);
+				  sc->sc_page_off;
+			err = net_recv_tcp_msg(sc->sc_sock, data, datalen);
 			if (err > 0) {
 				read_some = 1;
-				net->page_off += err;
+				sc->sc_page_off += err;
 			}
 			if (err < 0) {
 				if (err == -EAGAIN)
@@ -905,88 +1120,56 @@
 			}
 		}
 
-		if (net->page_off - sizeof(net_msg) == hdr->data_len) {
+		if (sc->sc_page_off - sizeof(net_msg) == hdr->data_len) {
 			/* whooo peee, we have a full message */
 			/* after calling this the message is toast */
-			err = net_process_message(sock, hdr);
-			net->page_off = 0;
+			err = net_process_message(node, sc->sc_sock, hdr);
+			sc->sc_page_off = 0;
 		}
 	
 done:
+
+		if (node) {
+			nm_node_put(node);
+			node = NULL;
+		}
+
+		scprintk(sc, "finished reading with %d\n", err);
+		if (err == -EAGAIN)
+			err = 0;
+
 		/* we might not have consumed all the data that has been
 		 * announced to us through data_ready.. keep the net active
 		 * as long as there may still be remaining data.  
 		 * data_ready might have been called after we saw eagain */
-		spin_lock_bh(&net_active_lock);
-		if (read_some && !read_eagain)
-			net_make_active(net);
-		spin_unlock_bh(&net_active_lock);
-
-		netprintk("net %p finished reading with %d\n", net, err);
-		if (sock && err < 0 && err != -EAGAIN) {
-			netprintk("socket saw err %d, closing\n", err);
-			net_sock_decref(sock, err);
+		if (!err && read_some && !read_eagain) {
+			spin_lock_bh(&net_active_lock);
+			net_sc_list_add(sc, &net_active_list);
+			spin_unlock_bh(&net_active_lock);
 		}
-	}
 
-	return 0;
-}
-
-static void net_do_status_return(net_msg *hdr)
-{
-	net_status_ctxt *nsc = NULL;
-	struct list_head *iter;
-
-	spin_lock(&net_status_lock);
-	list_for_each(iter, &net_status_list) {
-		nsc = list_entry(iter, net_status_ctxt, list);
-		if (nsc->msg_num == hdr->msg_num) {
-			nsc->sys_status = hdr->sys_status;
-			nsc->status = hdr->status;
-			atomic_set(&nsc->woken, 1);
-			list_del_init(&nsc->list);
-			wake_up(&nsc->wq);
-			break;
+		/* this exists to catch the framing errors, all other
+		 * errors should come through state_change, really. */
+		if (err) {
+			scprintk(sc, "saw err %d, closing\n", err);
+			net_detach_sc(sc, NULL);
 		}
-		nsc = NULL;
-	}
-	spin_unlock(&net_status_lock);
 
-	msgprintk(hdr, "sent to nsc %p\n", nsc);
-}
-
-static void net_kill_node_messages(u8 node)
-{
-	unsigned int num_kills = 0;
-	net_status_ctxt *nsc = NULL;
-	struct list_head *iter, *tmp;
-
-	spin_lock(&net_status_lock);
-	list_for_each_safe(iter, tmp, &net_status_list) {
-		nsc = list_entry(iter, net_status_ctxt, list);
-		if (nsc->target_node == node) {
-			nsc->sys_status = NET_ERR_DIED;
-			atomic_set(&nsc->woken, 1);
-			list_del_init(&nsc->list);
-			wake_up(&nsc->wq);
-
-			num_kills++;
-		}
+		sc_put(sc);
+		spin_lock_bh(&net_active_lock);
 	}
-	spin_unlock(&net_status_lock);
-
-	netprintk("node %u died, killed %d messages\n", node, num_kills);
+	spin_unlock_bh(&net_active_lock);
+	return 0;
 }
 
-/* this callback is registered on insmod and torn down on rmmod.  
- * the list and locks that it uses to kill messages are statically
- * defined so it should be ok.. it just has to carefully be called
- * after hb is ready and before hb is torn down */
+/* if we get a message that the node has gone down then we detach its
+ * active socket.  This will send error codes to any transmitters that
+ * were waiting for status replies on that node. */
 static void net_hb_node_down_cb(struct nm_node *node,
 				int node_num,
 				void *data)
 {
-	net_kill_node_messages(node_num);
+	net_detach_sc(NULL, node);
 }
 
 static struct hb_callback_func	*net_hb_down = NULL;
@@ -1020,7 +1203,8 @@
 
 /* this returns -errno if the header was unknown or too large, etc.
  * after this is called the buffer us reused for the next message */
-static int net_process_message(struct socket *sock, net_msg *hdr)
+static int net_process_message(struct nm_node *node, struct socket *sock,
+			       net_msg *hdr)
 {
 	int ret, handler_status;
 	enum net_system_error syserr;
@@ -1031,7 +1215,8 @@
 
 	if (hdr->magic == NET_MSG_STATUS_MAGIC) {
 		/* special type for returning message status */
-		net_do_status_return(hdr);
+		net_complete_nsw(node, NULL, hdr->msg_num, hdr->sys_status,
+				 hdr->status);
 		ret = 0;
 		goto out;
 	} else if (hdr->magic != NET_MSG_MAGIC) {
@@ -1040,17 +1225,6 @@
 		goto out;
 	}
 
-	if (net_is_valid_error_type(hdr->msg_type)) {
-		if (hdr->msg_type == NET_ALREADY_CONNECTED) {
-			msgprintk0(hdr, "error: there is already a socket "
-				   "for this connection\n");
-		} else if (hdr->msg_type == NET_UNKNOWN_HOST) {
-			msgprintk0(hdr, "error: unknown host\n");
-		}
-		ret = 0;
-		goto out;
-	}
-
 	/* find a handler for it */
 	ret = 0;
 	handler_status = 0;
@@ -1084,145 +1258,187 @@
 	return ret;
 }
 
-/*
- * The rest of the file is all about managing sockets.
+/* 
+ * This is called once a socket has come through accept() or connect() and
+ * is ready for business.  Once it is attached to the node
+ * then transmitting paths can get ahold of it and send messages.
  */
-
-static int net_attach_sock(net_inode_private *net, struct socket *sock)
+static int net_attach_sc(struct net_sock_container *sc)
 {
 	struct sock *sk;
 	int ret = 0;
+	u8 this_node = nm_this_node(); /* :( */
+	struct net_sock_container *detach = NULL;
+	struct nm_node *node = sc->sc_node;
 
-	netprintk("attaching sock %p to net %p\n", sock, net);
+	scprintk(sc, "attaching with node %p\n", node);
 
-	/* this could be racing with an active connect, it needs to
-	 * compare the socks consistently so both sides agree to close
-	 * the same socket */
-	spin_lock_bh(&net->sock_lock); 
-	if (net->sock != NULL && net->sock != sock)
-		ret = -EEXIST;
-	else 
-		net->sock = sock;
-	spin_unlock_bh(&net->sock_lock); 
+	BUG_ON(node == NULL);
+
+	spin_lock_bh(&node->nd_lock); 
+	if (node->nd_sc) {
+		/* this is a little exciting.  If we have racing sockets we
+		 * need to agree which to prefer with the remote node.  We only
+		 * get here if both sockets are successfully connected.  The
+		 * node with the lower number loses the socket it initiated
+		 * with connect(). */
+		if (this_node < node->nd_num) {
+			/* lose the one we started with connect() */ 
+			if (sc->sc_from_connect)
+				ret = -EEXIST;
+		} else {
+			/* lose the one we accepted */
+			if (!sc->sc_from_connect)
+				ret = -EEXIST;
+		}
+		/* we're keeping the new sc and the old one has to go */
+		if (ret == 0) {
+			detach = node->nd_sc;
+			node->nd_sc = NULL;
+			/* we get its ref and drop it below */
+		}
+	}
+	if (node->nd_sc == NULL) {
+		node->nd_sc = sc;
+		kref_get(&sc->sc_kref);
+		/* resolve our pending connect before we drop the node lock */
+		if (sc->sc_pending_connect) {
+			sc->sc_pending_connect = 0;
+			net_finish_connect(node);
+		} else {
+			/* accepts arriving should wake sleepers, too */
+			atomic_inc(&node->nd_sc_generation);
+			wake_up_all(&node->nd_sc_wq);
+		}
+		printk(KERN_NOTICE "ocfs2:tcp: %s connection to node %s at "
+		       "%u.%u.%u.%u:%d\n", 
+		       sc->sc_from_connect ? "initiated" : "accepted",
+		       node->nd_name, NIPQUAD(node->nd_ipv4_address), 
+		       ntohs(node->nd_ipv4_port));
+	}
+	spin_unlock_bh(&node->nd_lock); 
 	if (ret)
 		goto out;
+	if (detach) {
+		net_detach_sc(detach, NULL);
+		sc_put(detach);
+		detach = NULL;
+	}
 
-	sk = net->sock->sk;
+	sk = sc->sc_sock->sk;
 	tcp_sk(sk)->nonagle = 1;
+	sk_register_callbacks(sk, sc);
 
-	write_lock_bh(&sk->sk_callback_lock);
-	net->orig_data_ready = sk->sk_data_ready;
-	net->orig_error_report = sk->sk_error_report;
-
-	sk->sk_user_data = net;
-	sk->sk_data_ready = net_data_ready;
-	sk->sk_error_report = net_error_report;
-	write_unlock_bh(&sk->sk_callback_lock);
-
 	/* record it as active initially to make sure we didn't miss
 	 * any incoming data while we were setting it up */
 	spin_lock_bh(&net_active_lock);
-	net_make_active(net);
+	net_sc_list_add(sc, &net_active_list);
 	spin_unlock_bh(&net_active_lock);
 out:
-	netprintk("attaching sock %p to net %p returned: %d\n", sock, net,
-		  ret);
+	scprintk(sc, "attaching now node %p returned %d\n", node, ret);
 	return ret;
 }
 
-struct waiting_for_sock {
-	int			rc;
-	struct list_head	waiting_item;
-	wait_queue_t 		entry;
-};
-
-static void net_wake_sock_waiters(net_inode_private *net, int rc)
-{
-	struct list_head *pos, *tmp;
-	struct waiting_for_sock *wfs;
-
-	assert_spin_locked(&net->sock_lock);
-
-	netprintk("net %p waking waiters with rc %d\n", net, rc);
-
-	list_for_each_safe(pos, tmp, &net->pending_waiters) {
-		wfs = list_entry(pos, struct waiting_for_sock, waiting_item);
-		list_del_init(&wfs->waiting_item);
-
-		wfs->rc = rc;
-	}
-
-	wake_up(&net->waitq);
-}
-
-/* we register this callback when we start a connect.  once we're done
- * with the connect state machine we unregister ourselves.  teardown of
- * active connected sockets only happens in the rx thread.  this socket
- * can only make it to the rx thread through here and we reset _state_change.
- * this can't race with teardown */
 static void net_state_change(struct sock *sk)
 {
-	net_inode_private *net;
 	void (*state_change)(struct sock *sk);
-	int rc = 0, should_wake = 1;
+	struct net_sock_container *sc;
+	struct list_head *list = NULL;
 
 	write_lock(&sk->sk_callback_lock);
 	
-	net = sk->sk_user_data;
-	BUG_ON(net == NULL);
-	BUG_ON(net->sock == NULL);
+	/* we might have raced with the node being torn down and this
+	 * sc being detached from the node */
+	sc = sk->sk_user_data;
+	if (sc == NULL) {
+		state_change = sk->sk_state_change;
+		goto out;
+	}
 
-	state_change = net->orig_state_change;
+	scprintk(sc, "state_change to %d\n", sk->sk_state);
 
+	state_change = sc->sc_state_change;
+
 	switch(sk->sk_state) {
 		case TCP_SYN_SENT: 
 		case TCP_SYN_RECV: 
-			should_wake = 0;
 			break;
 		case TCP_ESTABLISHED: 
-			rc = 0;
+			list = &net_attach_list;
 			break;
 		default:
-			rc = -ENOTCONN;
+			list = &net_detach_list;
 			break;
 	}
 
-	netprintk("net %p sock %p went to state %d; should_wake %d rc %d\n",
-		  net, net->sock, sk->sk_state, should_wake, rc);
+	/* if we've finished a connect or have seen an error we let
+	 * the rx thread know so it can act accordingly on our behalf */
+	if (list) {
+		spin_lock(&net_active_lock);
+		net_sc_list_add(sc, list);
+		spin_unlock(&net_active_lock);
+	}
+out:
+	write_unlock(&sk->sk_callback_lock);
+	state_change(sk);
+}	
 
-	if (should_wake) {
-		spin_lock(&net->sock_lock);
-		if (net->sock_pending) {
-			net->sock_pending = 0;
-			/* let the rx thread do the final _release when
-			 * they go to grab this guy */ 
-			if (rc)
-				net->defer_release = 1;
+static struct net_sock_container *sc_alloc(struct nm_node *node, int from_conn)
+{
+	struct net_sock_container *sc, *ret = NULL;
+	struct page *page = NULL;
 
-			sk->sk_state_change = net->orig_state_change;
-			net_wake_sock_waiters(net, rc);
-		} else
-			should_wake = 0;
-		spin_unlock(&net->sock_lock);
-	}
+	page = alloc_page(GFP_NOFS);
+	sc = kcalloc(1, sizeof(*sc), GFP_NOFS);
+	if (sc == NULL || page == NULL)
+		goto out;
 
-	write_unlock(&sk->sk_callback_lock);
+	kref_init(&sc->sc_kref, sc_kref_release);
+	spin_lock_init(&sc->sc_lock);
+	nm_node_get(node);
+	sc->sc_node = node;
+	sc->sc_from_connect = from_conn;
+	sc->sc_pending_connect = from_conn;
 
-	/* net_attach grabs every lock in the known universe so we do it
-	 * out here */
-	if (should_wake)
-		net_attach_sock(net, net->sock);
+	INIT_LIST_HEAD(&sc->sc_item);
+	INIT_LIST_HEAD(&sc->sc_handlers);
 
-	state_change(sk);
-}	
+	scprintk0(sc, "alloced\n");
 
-static int net_start_connect(net_inode_private *net, u32 addr, u16 port)
+	ret = sc;
+	sc->sc_page = page;
+	sc = NULL;
+	page = NULL;
+
+out:
+	if (page)
+		__free_page(page);
+	kfree(sc);
+
+	return ret;
+}
+
+static void net_finish_connect(struct nm_node *node)
 {
+	atomic_dec(&node->nd_pending_connects);
+	atomic_inc(&node->nd_sc_generation);
+	/* don't care about this rare thundering herd */
+	wake_up_all(&node->nd_sc_wq);
+}
+
+static int net_start_connect(struct nm_node *node, u32 addr, u16 port)
+{
 	struct socket *sock = NULL;
-	struct sock *sk;
 	struct sockaddr_in myaddr, remoteaddr;
+	struct net_sock_container *sc = NULL;
 	int ret;
 
+	sc = sc_alloc(node, 1);
+	if (sc == NULL) {
+		ret = -ENOMEM;
+		goto out;
+	}
+
 	ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
 	if (ret < 0) {
 		netprintk("can't create socket: %d\n", ret);
@@ -1239,36 +1455,41 @@
 		goto out;
 	}
 	
-	memset (&remoteaddr, 0, sizeof (remoteaddr));
+	memset(&remoteaddr, 0, sizeof(remoteaddr));
 	remoteaddr.sin_family = AF_INET;
 	remoteaddr.sin_addr.s_addr = addr;
 	remoteaddr.sin_port = port;
 
-	net->sock = sock;
+	/* from this point on sc_kref_release has the responsibility
+	 * of dropping the socket */
+	sc->sc_sock = sock;
+	sock = NULL;
 
-	sk = sock->sk;
-	write_lock_bh(&sk->sk_callback_lock);
-	sk->sk_user_data = net;
-	net->orig_state_change = sk->sk_state_change;
-	sk->sk_state_change = net_state_change;
-	write_unlock_bh(&sk->sk_callback_lock);
+	sk_register_callbacks(sc->sc_sock->sk, sc);
 
-	ret = sock->ops->connect(sock, (struct sockaddr *)&remoteaddr, 
-				 sizeof(remoteaddr), O_NONBLOCK);
+	ret = sc->sc_sock->ops->connect(sc->sc_sock,
+					(struct sockaddr *)&remoteaddr, 
+					sizeof(remoteaddr),
+					O_NONBLOCK);
 	if (ret == -EINPROGRESS)
 		ret = 0;
-	netprintk("starting connect for net %p sock %p gave %d\n", net, sock,
-		  ret);
+
+	scprintk(sc, "->connect gave %d\n", ret);
 out:
+	/* if ret == 0 then the callback has the responsibility of calling
+	 * net_finish_connect */
 	if (ret) {
-		spin_lock_bh(&net->sock_lock);
-		net->sock_pending = 0;
-		net->sock = NULL;
-		net_wake_sock_waiters(net, ret);
-		spin_unlock_bh(&net->sock_lock);
+		net_finish_connect(node);
+		if (sc) {
+			/* stop detach from trying to finish again */
+			sc->sc_pending_connect = 0;
+			net_detach_sc(sc, NULL);
+		}
 		if (sock)
 			sock_release(sock);
 	}
+	if (sc)
+		sc_put(sc);
 	return ret;
 }
 
@@ -1295,197 +1516,130 @@
 	set_fs(oldfs);
 }
 
-static void net_sock_decref(struct socket *sock, int error)
+
+/*
+ * some tx code path would like to send a message.   For them to do so
+ * we need to follow an nm_node struct to find an active sock container.
+ * If we find a node without a sock container then we try and issue a
+ * connect and wait for its outcome.  We only really block waiting for
+ * the node to have an active socket.  The rx thread might race with
+ * the connect() to accept() a socket that we'll happily use.
+ */
+static int net_sc_from_node(struct nm_node *node,
+			    struct net_sock_container **sc_ret)
 {
-	net_inode_private *net = NULL;
-	int release = 0;
+	struct net_sock_container *sc = NULL;
+	int ret = 0, gen = 0, issue_connect = 0;
 
-	/* we hold a ref, this should be stable */
-	net = sock->sk->sk_user_data;
-	BUG_ON(net == NULL);
-
-	spin_lock_bh(&net->sock_lock); 
-
-	BUG_ON(net->sock_pending);
-	BUG_ON(net->sock_refs == 0);
-	BUG_ON(net->sock == NULL);
-
-	netprintk("decref for net %p ->sock %p err %d: refs %lu defer %u\n",
-		  net, net->sock, error, net->sock_refs, net->defer_release);
-
-	if (error)
-		net->defer_release = 1;
-	if (--net->sock_refs == 0 && net->defer_release) {
-		sock = net->sock;
-		net->sock = NULL;
-		net->defer_release = 0;
-		release = 1;
-	}
-	spin_unlock_bh(&net->sock_lock); 
-
-	if (release) {
-		/* stop any callbacks from hitting before we tear down
-		 * this sock */
-		if (sock->sk && sock->sk->sk_user_data) {
-			struct sock *sk = sock->sk;
-
-			write_lock_bh(&sk->sk_callback_lock);
-			sk->sk_data_ready = net->orig_data_ready;
-			sk->sk_error_report = net->orig_error_report;
-			sk->sk_user_data = NULL;
-			write_unlock_bh(&sk->sk_callback_lock);
-			/* XXX can we sync with bottom halves here? */
+	spin_lock_bh(&node->nd_lock); 
+	if (node->nd_sc) {
+		sc = node->nd_sc;
+		kref_get(&sc->sc_kref);
+	} else {
+		if (atomic_read(&node->nd_pending_connects) == 0) {
+			atomic_inc(&node->nd_pending_connects);
+			issue_connect = 1;
 		}
-		sock_release(sock);
+		gen = atomic_read(&node->nd_sc_generation);
 	}
-}
+	spin_unlock_bh(&node->nd_lock); 
 
-static int wfs_complete(net_inode_private *net, struct waiting_for_sock *wfs)
-{
-	int empty;
-
-	spin_lock_bh(&net->sock_lock); 
-	empty = list_empty(&wfs->waiting_item);
-	spin_unlock_bh(&net->sock_lock); 
-
-	return empty;
-}
-
-static int net_sock_addref_or_connect(u8 target_node, struct socket **sock_ret)
-{
-	struct nm_node *node = NULL;
-	net_inode_private *net = NULL;
-	struct socket *sock = NULL;
-	int ret = 0, wait = 0, set_pending = 0;
-	struct waiting_for_sock wfs;
-
-	/* XXX think about passing refs around.. */
-	node = nm_get_node_by_num(target_node);
-	if (node == NULL) {
-		netprintk("node %u unknown\n", target_node);
-		ret = -EINVAL;
+	if (sc)
 		goto out;
-	}
-	/* XXX verify that node is fully configured, rx thread is going */
-	net = &node->nd_net_inode_private;
 
-	spin_lock_bh(&net->sock_lock); 
-	if (net->sock && !net->sock_pending) {
-		/* just get a ref.  this could be a defer_release socket */
-		sock = net->sock;
-		net->sock_refs++;
-	} else {
-		netprintk("Initiating the connect!\n");
-
-		if (!net->sock_pending) {
-			/* ok, we'll be initiating the connect */
-			net->sock_pending = 1;
-			set_pending = 1;
-		}
-		list_add_tail(&wfs.waiting_item, &net->pending_waiters);
-		init_waitqueue_entry(&wfs.entry, current);
-		add_wait_queue(&net->waitq, &wfs.entry);
-		wait = 1;
-	}
-	spin_unlock_bh(&net->sock_lock); 
-
-	if (set_pending) {
-		ret = net_start_connect(net, node->nd_ipv4_address,
+	if (issue_connect) {
+		/* either net_start_connect or the callbacks it registers
+		 * have the responsibility of calling _finish_connect() */
+		ret = net_start_connect(node, node->nd_ipv4_address,
 					node->nd_ipv4_port);
 		if (ret)
 			goto out;
 	}
 
-	if (wait) {
-		ret = wait_event_interruptible(net->waitq,
-					       wfs_complete(net, &wfs));
-		if (ret == 0)
-			ret = wfs.rc;
-		netprintk("sleeping for net %p gave %d\n", net, ret);
-		if (ret)
-			goto out;
+	/* wait for nd_sc to change, either our connect finishing or
+	 * an accept arriving */
+	ret = wait_event_interruptible(node->nd_sc_wq,
+				      atomic_read(&node->nd_sc_generation) !=
+				      gen);
+	if (ret)
+		goto out;
 
-		/* try again to get a good socket.   if we can't, just
-		 * forget about it. */
-		spin_lock_bh(&net->sock_lock); 
-		if (net->sock && !net->sock_pending) {
-			sock = net->sock;
-			net->sock_refs++;
-		} else
-			ret = -ENOTCONN;
-		spin_unlock_bh(&net->sock_lock); 
-		if (ret)
-			goto out;
+	/* we only wait for one iteration right now */
+	spin_lock_bh(&node->nd_lock); 
+	if (node->nd_sc) {
+		sc = node->nd_sc;
+		kref_get(&sc->sc_kref);
+	} else {
+		ret = -ENOTCONN;
 	}
-	
+	spin_unlock_bh(&node->nd_lock); 
+
 out:
-	if (wait) {
-		spin_lock_bh(&net->sock_lock); 
-		if (!list_empty(&wfs.waiting_item))
-			list_del_init(&wfs.waiting_item);
-		remove_wait_queue(&net->waitq, &wfs.entry);
-		spin_unlock_bh(&net->sock_lock); 
+	if (sc) {
+		scprintk(sc, "returning for node %s (%u)\n", node->nd_name,
+			 node->nd_num);
+		*sc_ret = sc;
 	}
-	if (sock)
-		*sock_ret = sock;
-	if (node)
-		nm_node_put(node);
 
-	BUG_ON(ret == 0 && sock == NULL);
-	netprintk("addref for net %p gave %d\n", net, ret);
+	BUG_ON(ret == 0 && sc == NULL);
+	BUG_ON(ret && sc);
+	netprintk("sc_get for node %s (%u) gave %d, %p\n", node->nd_name,
+		  node->nd_num, ret, sc);
 	return ret;
 }
 
 static void net_try_accept(struct socket *sock)
 {
-	int error, slen;
+	int ret, slen;
 	struct sockaddr_in sin;
 	struct socket *new_sock = NULL;
 	struct nm_node *node = NULL;
+	struct net_sock_container *sc = NULL;
 
 	BUG_ON(sock == NULL);
-	error = sock_create_lite(sock->sk->sk_family,
-				 sock->sk->sk_type,
-				 sock->sk->sk_protocol,
-				 &new_sock);
-	if (error)
+	ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
+			       sock->sk->sk_protocol, &new_sock);
+	if (ret)
 		goto out;
 
 	new_sock->type = sock->type;
 	new_sock->ops = sock->ops;
-	error = sock->ops->accept(sock, new_sock, O_NONBLOCK);
-	if (error < 0)
+	ret = sock->ops->accept(sock, new_sock, O_NONBLOCK);
+	if (ret < 0)
 		goto out;
 
 	slen = sizeof(sin);
-	error = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,
+	ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,
 				       &slen, 1);
-	if (error < 0)
+	if (ret < 0)
 		goto out;
 	
-	netprintk("attempt to connect from %u.%u.%u.%u:%04x\n", 
-		NIPQUAD(sin.sin_addr.s_addr), ntohs(sin.sin_port));
-
 	node = nm_get_node_by_ip(sin.sin_addr.s_addr);
 	if (node == NULL) {
-		netprintk0("connect from unknown host...\n");
-		net_send_error(new_sock, NET_UNKNOWN_HOST);
+		printk(KERN_WARNING "attempt to connect from unknown node at "
+			  "%u.%u.%u.%u:%d\n", NIPQUAD(sin.sin_addr.s_addr),
+			  ntohs(sin.sin_port));
 		goto out;
 	}
 
-	netprintk("connect from known host: %s\n", node->nd_name);
-
 	if (ntohs(sin.sin_port) >= 1024)
 		netprintk("warning: connect from unprivileged port: "
 			  "%u.%u.%u.%u:%d\n", NIPQUAD(sin.sin_addr.s_addr),
 			  ntohs(sin.sin_port));
 
-	error = net_attach_sock(&node->nd_net_inode_private, new_sock);
-	if (error == -EEXIST)
-		net_send_error(new_sock, NET_ALREADY_CONNECTED);
+	sc = sc_alloc(node, 0);
+	if (sc == NULL) {
+		ret = -ENOMEM;
+		goto out;
+	}
 
+	sc->sc_sock = new_sock;
+	new_sock = NULL;
+
+	ret = net_attach_sc(sc);
+
 out:
-	if (error) {
+	if (ret) {
 		if (new_sock) {
 			net_sock_drain(new_sock);
 			sock_release(new_sock);
@@ -1493,6 +1647,8 @@
 	}
 	if (node)
 		nm_node_put(node);
+	if (sc)
+		sc_put(sc);
 	return;
 }
 

Modified: trunk/fs/ocfs2/cluster/tcp.h
===================================================================
--- trunk/fs/ocfs2/cluster/tcp.h	2005-04-12 20:45:35 UTC (rev 2137)
+++ trunk/fs/ocfs2/cluster/tcp.h	2005-04-12 21:08:05 UTC (rev 2138)
@@ -167,6 +167,7 @@
 void net_unregister_hb_callbacks(void);
 int net_start_rx_thread(struct nm_node *node);
 void net_stop_rx_thread(struct nm_node *node);
-void net_stop_node_sock(struct nm_node *node);
+struct net_sock_container;
+void net_detach_sc(struct net_sock_container *sc, struct nm_node *node);
 
 #endif /* CLUSTER_TCP_H */



More information about the Ocfs2-commits mailing list