[Ocfs2-commits] zab commits r1773 - in trunk: cluster src

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Fri Jan 14 17:42:00 CST 2005


Author: zab
Date: 2005-01-14 17:41:58 -0600 (Fri, 14 Jan 2005)
New Revision: 1773

Modified:
   trunk/cluster/dlmmod.c
   trunk/cluster/dlmmod.h
   trunk/cluster/nodemanager.c
   trunk/cluster/nodemanager.h
   trunk/cluster/tcp.c
   trunk/cluster/tcp.h
   trunk/src/ocfs.h
   trunk/src/vote.c
Log:
Simplify the rx paths in tcp.c:
                                                                                
o tie into data_ready/error_report to maintain a list of active rx sockets
o the rx thread fills per-socket rx buffers with non-blocking reads
o no discarding data, no peeking, no rx thread on socket's sk_sleep
o get rid of all the callers per-handler buffers
o add a cute msgprintk() to dump the net_msg header and a message
o get rid of the HND_IN_USE check


Modified: trunk/cluster/dlmmod.c
===================================================================
--- trunk/cluster/dlmmod.c	2005-01-14 01:29:40 UTC (rev 1772)
+++ trunk/cluster/dlmmod.c	2005-01-14 23:41:58 UTC (rev 1773)
@@ -1049,7 +1049,6 @@
 	dlm_ctxt *tmp = NULL, *dlm = NULL;
 	struct inode *group = NULL;
 	int tmpret, i;
-	char *netbuf;
 
 	if (strlen(domain) > NM_MAX_NAME_LEN) {
 		dlmprintk0("domain name length too long\n");
@@ -1090,19 +1089,10 @@
 		dlmprintk0("could not allocate dlm domain name\n");
 		goto leave;
 	}
-	dlm->net_buf = (char *) __get_free_page(GFP_KERNEL);
-	if (!dlm->net_buf) {
-		kfree(dlm->name);
-		kfree(dlm);
-		dlm = NULL;
-		dlmprintk0("could not allocate dlm network temporary buffer\n");
-		goto leave;
-	}
 	dlm->resources = (struct list_head *) __get_free_page(GFP_KERNEL);
 	if (!dlm->resources) {
 		kfree(dlm->name);
 		kfree(dlm);
-		free_page((unsigned long)dlm->net_buf);
 		dlm = NULL;
 		dlmprintk0("could not allocate dlm hash\n");
 		goto leave;
@@ -1196,68 +1186,57 @@
 		goto error;
 #endif
 
-	netbuf = dlm->net_buf;
 	tmpret = net_register_handler(DLM_MASTER_REQUEST_RESP_MSG, key, 0, 
 				      sizeof(dlm_master_request_resp), 
 				      dlm_master_request_resp_handler,
-				      dlm, netbuf);
+				      dlm);
 	if (tmpret)
 		goto error;
 
-	netbuf += L1_CACHE_ALIGN(sizeof(dlm_master_request_resp));
-
 	tmpret = net_register_handler(DLM_MASTER_REQUEST_MSG, key, 0, 
 				      sizeof(dlm_master_request), 
 				      dlm_master_request_handler,
-				      dlm, netbuf);
+				      dlm);
 
 	if (tmpret)
 		goto error;
-	netbuf += L1_CACHE_ALIGN(sizeof(dlm_master_request));
 
 	tmpret = net_register_handler(DLM_ASSERT_MASTER_MSG, key, 0, 
 				      sizeof(dlm_assert_master), 
 				      dlm_assert_master_handler,
-				      dlm, netbuf);
+				      dlm);
 	if (tmpret)
 		goto error;
-	netbuf += L1_CACHE_ALIGN(sizeof(dlm_assert_master));
 	tmpret = net_register_handler(DLM_CREATE_LOCK_MSG, key, 0, 
 				      sizeof(dlm_create_lock), 
 				      dlm_create_lock_handler,
-				      dlm, netbuf);
+				      dlm);
 	if (tmpret)
 		goto error;
-	netbuf += L1_CACHE_ALIGN(sizeof(dlm_create_lock));
 	tmpret = net_register_handler(DLM_CONVERT_LOCK_MSG, key, 
 				      NET_HND_VAR_LEN, 
 				      DLM_CONVERT_LOCK_MAX_LEN,
 				      dlm_convert_lock_handler,
-				      dlm, netbuf);
+				      dlm);
 	if (tmpret)
 		goto error;
-	netbuf += L1_CACHE_ALIGN(DLM_CONVERT_LOCK_MAX_LEN);
 
 	tmpret = net_register_handler(DLM_UNLOCK_LOCK_MSG, key, 
 				      NET_HND_VAR_LEN,
 				      DLM_UNLOCK_LOCK_MAX_LEN,
 				      dlm_unlock_lock_handler,
-				      dlm, netbuf);
+				      dlm);
 	if (tmpret)
 		goto error;
-	netbuf += L1_CACHE_ALIGN(DLM_UNLOCK_LOCK_MAX_LEN);
 				
 	tmpret = net_register_handler(DLM_PROXY_AST_MSG, key, 
 				      NET_HND_VAR_LEN,
 				      DLM_PROXY_AST_MAX_LEN,
 				      dlm_proxy_ast_handler,
-				      dlm, netbuf);
+				      dlm);
 	if (tmpret)
 		goto error;
-	netbuf += L1_CACHE_ALIGN(DLM_PROXY_AST_MAX_LEN);
 
-dlmprintk("netbuf=%p net_buf=%p diff=%d\n", netbuf, dlm->net_buf, ((char *)netbuf - (char *)dlm->net_buf));   // currently 960
-	
 	tmpret = dlm_launch_thread(dlm);
 	if (tmpret == 0)
 		goto leave;
@@ -1268,7 +1247,6 @@
 	spin_lock(&dlm_domain_lock);
 	list_del(&dlm->list);
 	spin_unlock(&dlm_domain_lock);
-	free_page((unsigned long)dlm->net_buf);
 	free_page((unsigned long)dlm->resources);
 	kfree(dlm->name);
 	kfree(dlm);

Modified: trunk/cluster/dlmmod.h
===================================================================
--- trunk/cluster/dlmmod.h	2005-01-14 01:29:40 UTC (rev 1772)
+++ trunk/cluster/dlmmod.h	2005-01-14 23:41:58 UTC (rev 1773)
@@ -194,7 +194,6 @@
 	spinlock_t spinlock;
 	struct rw_semaphore recovery_sem;
 	char *name;
-	char *net_buf;
 	util_thread_info thread;
 	struct inode *group;
 	u32 key;

Modified: trunk/cluster/nodemanager.c
===================================================================
--- trunk/cluster/nodemanager.c	2005-01-14 01:29:40 UTC (rev 1772)
+++ trunk/cluster/nodemanager.c	2005-01-14 23:41:58 UTC (rev 1773)
@@ -448,6 +448,7 @@
 	int ino, node_num, bucket;
 	int ret = -EINVAL;
 	nm_node_inode_private *n = NULL;
+	struct page *page = NULL;
 
 	nmprintk("add cluster node ...\n");
 
@@ -500,10 +501,19 @@
 	memcpy(&n->node, &data->arg_u.node, sizeof(nm_node_info));
 	INIT_LIST_HEAD(&n->ip_hash);
 	n->net.sock = NULL;
-	INIT_LIST_HEAD(&n->net.list);
+	INIT_LIST_HEAD(&n->net.active_item);
 	spin_lock_init(&n->net.sock_lock);
 	n->net.flags = 0;
+	n->net.page = NULL;
+	n->net.page_off = 0;
 
+	page = alloc_page(GFP_KERNEL);
+	if (page == NULL) {
+		nmprintk("page allocation failed\n");
+		goto leave;
+	}
+	n->net.page = page;
+
 	/* hash on first ip address */
 	spin_lock(&nm_ip_hash_lock);
 	bucket = hash_long(n->node.ifaces[0].addr_u.ip_addr4, NM_HASH_BITS);
@@ -518,6 +528,8 @@
 
 leave:
 	if (ret < 0) {
+		if (page)
+			__free_page(page);
 		if (inode) {
 			if (inode->u.generic_ip)
 				kfree(inode->u.generic_ip);

Modified: trunk/cluster/nodemanager.h
===================================================================
--- trunk/cluster/nodemanager.h	2005-01-14 01:29:40 UTC (rev 1772)
+++ trunk/cluster/nodemanager.h	2005-01-14 23:41:58 UTC (rev 1773)
@@ -76,15 +76,20 @@
 } nm_group_inode_private;
 
 /* TODO: move this */
+struct sock;
 #define NET_FLAG_CREATING_SOCKET   0x00000001
 typedef struct _net_inode_private
 {
-	struct socket *sock;
-	wait_queue_t sleep;
-	spinlock_t sock_lock;
-	struct list_head handlers;
-	struct list_head list;
-	int flags;
+	struct socket		*sock;
+	spinlock_t		sock_lock;
+	struct list_head	handlers;
+	struct list_head	active_item;
+	int			flags;
+	struct page 		*page;
+	size_t			page_off;
+
+	void			(*orig_data_ready)(struct sock *sk, int bytes);
+	void                    (*orig_error_report)(struct sock *sk);
 } net_inode_private;
 
 typedef struct _nm_node_inode_private

Modified: trunk/cluster/tcp.c
===================================================================
--- trunk/cluster/tcp.c	2005-01-14 01:29:40 UTC (rev 1772)
+++ trunk/cluster/tcp.c	2005-01-14 23:41:58 UTC (rev 1773)
@@ -1,10 +1,6 @@
 /* -*- mode: c; c-basic-offset: 8; -*-
  * vim: noexpandtab sw=8 ts=8 sts=0:
  *
- * tcp.c
- *
- * tcp network stuff
- *
  * Copyright (C) 2004 Oracle.  All rights reserved.
  *
  * This program is free software; you can redistribute it and/or
@@ -22,28 +18,56 @@
  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  * Boston, MA 021110-1307, USA.
  *
- * Authors: Kurt Hackel
+ * ----
  *
+ * Callers for this were originally written against a very simple 
+ * synchronus API.  This implementation reflects those simple callers.  Some
+ * day I'm sure we'll need to move to a more robust posting/callback 
+ * mechanism.
+ *
+ * Transmit calls pass in kernel virtual addresses and block copying this into
+ * the socket's tx buffers via a usual blocking sendmsg.  They'll block
+ * waiting for a failed socket to timeout.  TX callers can also pass in
+ * a poniter to an 'int' which gets filled with an errno off the wire
+ * in response to the message they send.
+ *
+ * Handlers for unsolicited messages are registered.  Each socket has
+ * a page that incoming data is copied into.  First the header, then
+ * the data.  Handlers are called from only one thread with a reference
+ * to this per-socket page.  This page is destroyed after the handler
+ * call, so it can't be referenced beyond the call.  Handlers may block
+ * but are discouraged from doing so.
+ *
+ * Any framing errors (bad magic, unknown message types, large payload
+ * lengths) closes a connection.
+ *
+ * One can imagine the direction a more sophisticated API would head in:
+ * (there are certainly a half dozen examples in the kernel)
+ *   * tx
+ *   	- passes in page/off/len to send, gets put on a queue
+ *   	- if response data is needed, passes in preallocated page/off/len
+ *   	- tx header includes message id to associate reply with posted rx buf
+ *   	- write_space triggers passing the p/o/l tx queue to ->sendpage()
+ *   * rx
+ *      - data_ready uses tcp_read_sock to parse message header
+ *      - header identifies whether to copy into posted rx buf or unsolicited
+ *      - handers must be callable from bh context
+ * but it really depends on what the semantics and messages are.
+ *
  * XXX we should resolve these before release
- * 	- no recv_list, use data_ready
- * 	- kill all 'lock; list_for_each() { unlock; lock; }' patterns
  * 	- make sure max lens are enforced.. alloc max, don't send > max
  * 	- disable preemt before calling rx handler when debugging
- * 	- pack/swab packets
- * 	- more conservative locking patterns to avoid double unlocks
+ * 	- decide on just one packet size and repack it
  * 	- find explicit stack call to drain rx queue
  * 	- simplify rx thread exit path (completion, etc)
  * 	- goto out style exiting
  * 	- implement net_remove_handlers
- * 	- check stack to be sure that blocking rx waits for all data
- * 	- if spin sending be sure to exclude concurrent racing senders
  * 	- refcounting around sock against tx/teardown/etc
  * 	- get sin/iov/msg off the stack, per sock structures
- * 	- just rely on rx calls to discover busted sockets?
- * 	- make sk_state another define
  * 	- add trivial version trading message at the start of a conn
+ * 	- tear down sockets on exit.. via removing their inodes?
+ * 	- make sure ->net.page gets torn down with net_inode_private
  *
- * 	- document caller block, serialized blocking rx handling
  * 	- move gsd into its own file
  * 	- move to userspace connection management?
  * 	
@@ -79,13 +103,27 @@
 #include "tcp.h"
 #include "nodemanager.h"
 
-//#if 0
+#if 1
 #define netprintk(x, arg...)    printk("(tcp:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__, ##arg)
 #define netprintk0(x)           printk("(tcp:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__)
-//#else
-#if 0
+/* yeah, a little gross, but it gets the job done */
+#define __msg_fmt "[mag %u len %u typ %u status %d key %u num %u] "
+#define __msg_args __hdr->magic, __hdr->data_len, __hdr->msg_type, 	\
+ 	__hdr->status,	__hdr->key, __hdr->msg_num
+#define msgprintk(hdr, fmt, args...) do {				\
+	typeof(hdr) __hdr = (hdr);					\
+	printk(__msg_fmt fmt, __msg_args, args);			\
+} while (0)
+#define msgprintk0(hdr, fmt) do {					\
+	typeof(hdr) __hdr = (hdr);					\
+	printk(__msg_fmt fmt, __msg_args);				\
+} while (0)
+
+#else
 #define netprintk(x, arg...)    
 #define netprintk0(x)           
+#define msgprintk(hdr, fmt, args...)
+#define msgprintk0(hdr, fmt)
 #endif
 
 /* let's only pollute this unit with these ridiculous definitions */
@@ -97,33 +135,34 @@
 #define sk_family		family
 #define sk_type			type
 #define sk_protocol		protocol
+#define sk_callback_lock	callback_lock
+#define sk_user_data		user_data
+#define sk_data_ready		data_ready
+#define sk_error_report		error_report
 #endif
 
 struct socket *recv_sock = NULL;
 static u16 ip_version, ip_port;
-static void *net_junk_buf = NULL;
 static struct inode *net_inode = NULL;
 static u16 net_node_num;
 
-char *gsd_buf = NULL;
 char *gsd_handler_buf = NULL;
 
 
+/* all this state should eventually be brought up by object activation
+ * and tied to that object rather than being globally valid at insmod */
 static spinlock_t net_handler_lock = SPIN_LOCK_UNLOCKED;
-static spinlock_t net_list_lock = SPIN_LOCK_UNLOCKED;
 static spinlock_t net_status_lock = SPIN_LOCK_UNLOCKED;
 static LIST_HEAD(net_handlers);
-static LIST_HEAD(net_recv_list);
-static LIST_HEAD(net_dispatch_list);
 static LIST_HEAD(net_status_list);
+/* this lock is also grabbed from bh context, non-bh use _bh() locking */
+static spinlock_t net_active_lock = SPIN_LOCK_UNLOCKED;
+static LIST_HEAD(net_active_list);
 
-static DECLARE_WAIT_QUEUE_HEAD(net_disp_thread_wait_queue);
-static DECLARE_WAIT_QUEUE_HEAD(net_recv_thread_wait_queue);
 static int net_recv_pid = -1;
 static struct task_struct *net_recv_task = NULL;
 static struct completion net_recv_complete;
 
-
 static inline void net_abort_status_return(net_status_ctxt *nsc)
 {
 	spin_lock(&net_status_lock);
@@ -139,17 +178,14 @@
 static void __exit net_driver_exit (void);
 static int net_add_handler(net_msg_handler *nmh);
 static void net_remove_handlers(void);
-static int net_check_message_valid(net_msg *msg, u32 len);
 static void net_dump_and_close_sock(struct socket *sock, struct inode *inode);
-static void net_drain_sock(struct socket *sock);
-static void net_discard_data(net_msg *hdr, struct socket *sock);
-static int net_recv_message_header(net_msg *hdr, struct socket *sock);
 static int net_init_tcp_recv_sock(void);
 static int net_receive_thread(void *data);
 static int net_receive(void);
 static int net_accept_tcp_connections(void);
 static void net_release_tcp_sock(void);
-static int net_dispatch_message(struct inode *inode, struct socket *sock, int len, net_msg_handler *hnd);
+static int net_process_message(struct inode *inode, struct socket *sock,
+			       net_msg *hdr);
 static int net_ioctl (struct inode *inode, struct file *filp, unsigned int cmd, unsigned long arg);
 
 int gsd_message_action(gsd_message *g);
@@ -168,11 +204,8 @@
 /* called with net_handler_lock held so we can verify the flags :/ */
 static void __net_put_handler(net_msg_handler *nmh)
 {
-	if (atomic_dec_and_test(&nmh->refcnt)) {
-		if (nmh->flags & NET_HND_IN_USE)
-			netprintk0("EEEEK! killing inuse handler! bugbug!\n");
+	if (atomic_dec_and_test(&nmh->refcnt))
 		kfree(nmh);
-	}
 }
 
 static void net_put_handler(net_msg_handler *nmh)
@@ -391,14 +424,11 @@
 	net_recv_task = NULL;
 	init_completion (&net_recv_complete);
 
-	net_junk_buf = (void *) __get_free_page(GFP_KERNEL);
-	if (!net_junk_buf)
-		return -ENOMEM;
-
 	netprintk0("starting net receive thread...\n");
 	net_recv_pid = kernel_thread (net_receive_thread, NULL, CLONE_FS | CLONE_FILES | CLONE_SIGHAND);
 	if (net_recv_pid < 0) {
-		netprintk("unable to launch net receive thread, error=%d", net_recv_pid);
+		netprintk("unable to launch net receive thread, error=%d\n",
+			  net_recv_pid);
 		net_shutdown();
 		return -EINVAL;
 	}
@@ -409,33 +439,20 @@
 
 static void net_shutdown(void)
 {
-	netprintk ("waiting for net thread to exit....");
+	netprintk ("waiting for net thread to exit....\n");
 	send_sig (SIGINT, net_recv_task, 0);
 	wait_for_completion (&net_recv_complete);
-	free_page((unsigned long)net_junk_buf);
 	netprintk ("net thread exited\n");
 }
 
 static int net_rx_should_wake(void)
 {
-	int empty = 1;
-	struct list_head *iter;
-	net_inode_private *net;
-	struct socket *socket;
+	int empty;
 
-	spin_lock(&net_list_lock);
-	list_for_each(iter, &net_recv_list) {
-		net = list_entry(iter, net_inode_private, list);
-		socket = net->sock;
+	spin_lock_bh(&net_active_lock);
+	empty = list_empty(&net_active_list);
+	spin_unlock_bh(&net_active_lock);
 
-		if (socket &&
-		    !skb_queue_empty(&socket->sk->sk_receive_queue)) {
-			empty = 0;
-			break;
-		}
-	}
-	spin_unlock(&net_list_lock);
-
 	return !empty || tcp_sk(recv_sock->sk)->accept_queue;
 }
 
@@ -451,11 +468,8 @@
        	if (status >= 0 && recv_sock) {
 		add_wait_queue_exclusive(recv_sock->sk->sk_sleep, &main_wait);
 		while (1) {
-			status = 0;
-			if (tcp_sk(recv_sock->sk)->accept_queue)
-				status = net_accept_tcp_connections();
-			if (!list_empty(&net_recv_list))
-				status = net_receive();
+			net_accept_tcp_connections();
+			net_receive();
 
 			wait_event_interruptible(*recv_sock->sk->sk_sleep,
 						 net_rx_should_wake());
@@ -478,27 +492,12 @@
 	return 0;
 }
 
-typedef union _my_timing_t
-{
-	__u64 q;
-	__u32 lohi[2];
-} my_timing_t;
-
-
-static int net_check_message_valid(net_msg *msg, u32 len)
-{
-	return 1;
-}
-
 //////////////////////////////////////////////////////////////////////////////
 /* for lack of a better place to do this */
 
 int gsd_setup()
 {
 	int ret;
-	gsd_buf = (char *) __get_free_page(GFP_KERNEL);
-	if (!gsd_buf)
-		return -ENOMEM;
 	/* need this stupidity until I can divorce the actual nm actions
 	 * from the output they send to their user buffer */
 	gsd_handler_buf = (char *) __get_free_page(GFP_KERNEL);
@@ -506,14 +505,13 @@
 		return -ENOMEM;
 
 	ret = net_register_handler(GSD_MESSAGE, 0, 0, sizeof(gsd_message),
-				   gsd_message_handler, NULL, gsd_buf);
+				   gsd_message_handler, NULL);
 
 	return ret;
 }
 
 void gsd_teardown()
 {
-	free_page((unsigned long)gsd_buf);
 	free_page((unsigned long)gsd_handler_buf);
 }
 
@@ -589,25 +587,17 @@
 //////////////////////////////////////////////////////////////////////////////
 
 int net_register_handler(u32 msg_type, u32 key, int flags, u32 max_len, 
-			 net_msg_handler_func *func, void *data, void *buf)
+			 net_msg_handler_func *func, void *data)
 {
 	net_msg_handler *nmh, *found=NULL;
-	u32 packet_len = sizeof(net_msg) + max_len;
 	int ret;
 
-	if (packet_len < NET_MIN_MSG_LEN || packet_len > NET_MAX_MSG_LEN) {
+	if (max_len > NET_MAX_PAYLOAD_BYTES) {
 		netprintk("max_len for message handler out of range: %u\n", 
 			max_len);
 		return -EINVAL;
 	}
 
-	/* if expecting any message payload, must pass a prealloced buffer */
-	if (!buf && max_len) {
-		netprintk("max_len > 0 (%u), but no buffer supplied!\n",
-		       max_len);
-		return -EINVAL;
-	}
-
 	if (!msg_type) {
 		netprintk("no message type provided: %u, %p\n", msg_type, func);
 		return -EINVAL;
@@ -631,11 +621,6 @@
 	nmh->key = key;
 	spin_lock_init(&nmh->lock);
 	atomic_set(&nmh->refcnt, 0);
-	if (max_len == 0) {
-		nmh->buf = &nmh->hdr;
-	} else {
-		nmh->buf = buf;
-	}
 	nmh->flags = flags;
 	INIT_LIST_HEAD(&nmh->list);
 	net_get_handler(nmh);
@@ -711,22 +696,16 @@
 	 * on shutdown... */
 }
 
-
-
-
-/*
- * net_recv_tcp_msg()
- *
- */
-int net_recv_tcp_msg (struct inode *inode, struct socket *sock, void *data, u32 *packet_len)
+static int net_recv_tcp_msg(struct inode *inode, struct socket *sock,
+			    void *data, size_t len)
 {
 	nm_node_inode_private *priv;
 	nm_node_info *node;
-	int status = -EINVAL, error;
+	int ret;
 	mm_segment_t oldfs;
 	struct sockaddr_in sin;
 	struct iovec iov = { 
-		.iov_len = *packet_len, 
+		.iov_len = len,
 		.iov_base = data 
 	};
 	struct msghdr msg = { 
@@ -736,10 +715,9 @@
 		.msg_iov = &iov, 
 		.msg_name = (struct sockaddr *) &sin, 
 		.msg_namelen = sizeof (sin),
-       		.msg_flags = 0 
+       		.msg_flags = MSG_DONTWAIT,
 	};
 
-
 	priv = (nm_node_inode_private *)inode->u.generic_ip;
 	node = &priv->node;
 	if (!sock) {
@@ -747,42 +725,24 @@
 		/* TODO: sock refcounting... i think we can get/put the sk */
 		sock = priv->net.sock;
 		spin_unlock(&priv->net.sock_lock); 
-		if (!sock)
-			return -EINVAL;
+		if (!sock) {
+			ret = -EINVAL;
+			goto out;
+		}
 	}
 	
-	memset (&sin, 0, sizeof (sin));
-	oldfs = get_fs ();
-	set_fs (get_ds ());
-	error = sock_recvmsg (sock, &msg, *packet_len, msg.msg_flags);
-	set_fs (oldfs);
+	memset(&sin, 0, sizeof (sin));
+	oldfs = get_fs();
+	set_fs(get_ds());
+	ret = sock_recvmsg(sock, &msg, len, msg.msg_flags);
+	set_fs(oldfs);
 
-	status = 0;
-	if (error < 0) {
-		if (error == -ERESTARTSYS) {
-			status = -EBADF;
-			netprintk ("Shutting down\n");
-		} else {
-			status = -EINVAL;
-			netprintk ("unable to recvmsg, error=%d\n", error);
-		}
-		goto bail;
-	} else {
-		*packet_len = iov.iov_len;
-		status = 0;
-		netprintk("woot.  recevied len=%d\n", *packet_len);
-		if (!net_check_message_valid(data, *packet_len)) {
-			netprintk0("eeeek bad net message!\n");
-			status = -EINVAL;
-		}
-	}
-
 	//netprintk ("Received packet from: %d.%d.%d.%d\n",
 	//		NIPQUAD (sin.sin_addr.s_addr));
 
-bail:
-	return status;
-}				/* net_recv_tcp_msg */
+out:
+	return ret;
+}
 
 static int net_sock_sendmsg(struct socket *sock, struct msghdr *msg,
 			    size_t size)
@@ -896,7 +856,7 @@
 		spin_unlock(&net->sock_lock);
 		ret = net_init_tcp_sock(inode);
 		if (!(ret == 0 || ret == -EEXIST)) {
-			netprintk0("failed to create socket!");
+			netprintk0("failed to create socket!\n");
 			ret = -EINVAL;
 			goto done;
 		}
@@ -976,6 +936,8 @@
 	net_msg_to_net(msg);
 	ret = net_send_tcp_msg(inode, NULL, iov, iovlen,
 			       sizeof(net_msg) + caller_bytes);
+	net_msg_to_host(msg);  /* just swapping for printk, its unused now */
+	msgprintk(msg, "sending returned %d\n", ret);
 
 	if (status) {
 		if (ret >= 0) {
@@ -1057,217 +1019,297 @@
 	hdr->status = err;
 	hdr->magic = NET_MSG_STATUS_MAGIC;  // twiddle the magic
 	hdr->data_len = 0;
-	netprintk("about to send status %d\n", err);
 
+	msgprintk(hdr, "about to send status magic %d\n", err);
 	/* hdr has been in host byteorder this whole time */
 	net_msg_to_net(hdr);
 	return net_send_tcp_msg(inode, sock, &iov, 1, sizeof(net_msg));
 }
-/*
- * net_receive: receive from and dispatch all sockets with data pending
- */
+
+static void net_got_sock_callback(net_inode_private *net, struct sock *sk)
+{
+	BUG_ON(net == NULL);
+	BUG_ON(net->sock == NULL);
+	BUG_ON(net->sock->sk != sk);
+
+	spin_lock(&net_active_lock);
+	if (list_empty(&net->active_item))
+		list_add_tail(&net->active_item, &net_active_list);
+	spin_unlock(&net_active_lock);
+
+	if (recv_sock != NULL)
+		wake_up(recv_sock->sk->sk_sleep);
+}
+
+static void net_data_ready(struct sock *sk, int bytes)
+{
+	net_inode_private *net = sk->sk_user_data;
+	void (*ready)(struct sock *sk, int bytes);
+
+	read_lock(&sk->sk_callback_lock);
+	net_got_sock_callback(net, sk);
+	ready = net->orig_data_ready;
+	read_unlock(&sk->sk_callback_lock);
+
+	ready(sk, bytes);
+
+}
+static void net_error_report(struct sock *sk)
+{
+	net_inode_private *net = sk->sk_user_data;
+	void (*report)(struct sock *sk);
+
+	read_lock(&sk->sk_callback_lock);
+	net_got_sock_callback(net, sk);
+	report = net->orig_error_report;
+	read_unlock(&sk->sk_callback_lock);
+
+	report(sk);
+}
+
 static int net_receive(void)
 {
 	struct inode *inode;
-	struct list_head *iter, *tmpiter;
+	LIST_HEAD(snapshot_list);
 	nm_node_inode_private *priv;
 	net_inode_private *net;
 	struct socket *sock;
-	struct sock *sk;
-	net_msg hdr;
-	net_msg_handler *hnd = NULL;
-	int err = 0;
-	int tmperr;
+	net_msg *hdr;
+	int err = 0, read_eagain;
+	void *data;
+	size_t datalen;
 
-start_over:	
-	spin_lock(&net_list_lock);
-	list_for_each_safe(iter, tmpiter, &net_recv_list) {
-		net = list_entry(iter, net_inode_private, list);
+	/* process in batches so that the receive thread gets
+	 * a chance to accept new sockets now and again */
+	spin_lock_bh(&net_active_lock);
+	list_splice_init(&net_active_list, &snapshot_list);
+	spin_unlock_bh(&net_active_lock);
+
+	/* we don't need locks to test our list because we're the
+	 * only people who remove active_items from lists */
+	while (!list_empty(&snapshot_list)) {
+		net = list_entry(snapshot_list.next, net_inode_private,
+				 active_item);
+
+		/* remove the net from the active list so that data_ready
+		 * can put it back on if it hits just after we read */
+		spin_lock_bh(&net_active_lock);
+		list_del_init(&net->active_item);
+		spin_unlock_bh(&net_active_lock);
+
 		priv = container_of(net, nm_node_inode_private, net);
 	       	inode = priv->inode;
 		sock = net->sock;
-		
-		if (!sock) {
-			//netprintk0("no socket yet....\n");
-			continue;
-		}
+		BUG_ON(sock == NULL); /* real refcounting, please! */
 
-		if (sock->sk->sk_state != TCP_ESTABLISHED &&
-		    sock->sk->sk_state != TCP_CLOSE_WAIT) {
-			netprintk0("kill it and continue\n");
-			net_dump_and_close_sock(sock, inode);
-			continue;
+		err = 0;
+		read_eagain = 0;
+
+		/* do we need more header? */
+		if (net->page_off < sizeof(net_msg)) {
+			data = page_address(net->page) + net->page_off;
+			datalen = sizeof(net_msg) - net->page_off;
+			err = net_recv_tcp_msg(inode, sock, data, datalen);
+			if (err > 0) {
+				net->page_off += err;
+				/* only swab incoming here.. we can
+				 * only get here once as we cross from
+				 * being under to over */
+				if (net->page_off == sizeof(net_msg)) {
+					hdr = page_address(net->page);
+					net_msg_to_host(hdr);
+					if (hdr->data_len > NET_MAX_PAYLOAD_BYTES)
+						err = -EOVERFLOW;
+				}
+			}
+			if (err < 0) {
+				if (err == -EAGAIN)
+					read_eagain = 1;
+				goto done;
+			}
 		}
-	
-		sk = sock->sk;
-		if (skb_queue_empty(&sk->sk_receive_queue)) {
-			//netprintk("queue empty for %lu\n", inode->i_ino);
-			continue;
-		}
-	
-			
 
-		list_del(&net->list);
-		spin_unlock(&net_list_lock);
-	
-		memset(&hdr, 0, sizeof(net_msg));
-		err = net_recv_message_header(&hdr, sock);
-		if (err < 0) {
-			netprintk0("failed to receive message!\n");
-			goto error;
+		if (net->page_off < sizeof(net_msg)) {
+			/* oof, still don't have a header */
+			goto done;
 		}
 
-		/* convert the header to host byteorder */
-		net_msg_to_host(&hdr);
-		netprintk("received message header... magic=%u type=%u key=%u\n", 
-			  hdr.magic, hdr.msg_type, hdr.key);
+		/* this was swabbed above when we first read it */
+		hdr = page_address(net->page);
 
-		if (hdr.magic == NET_MSG_STATUS_MAGIC) {
-			net_discard_data(&hdr, sock);
-			/* special type for returning message status */
-			net_do_status_return(hdr.msg_num, hdr.status);
-			err = 0;
-			goto error;
-		} else if (hdr.magic != NET_MSG_MAGIC) {
-			netprintk("bad magic: %u\n", hdr.magic);
-			goto error;
-		}
-		
-		if (net_is_valid_error_type(hdr.msg_type)) {
-			/* do error handling */
-			netprintk("this is a standard error message: type=%d\n", hdr.msg_type);
-			if (hdr.msg_type == NET_ALREADY_CONNECTED) {
-				netprintk0("error: there is already a socket for this connection\n");
-			} else if (hdr.msg_type == NET_UNKNOWN_HOST) {
-				netprintk0("error: unknown host\n");
+		msgprintk(hdr, "at page_off %zu\n", net->page_off);
+
+		/* do we need more payload? */
+		if (net->page_off - sizeof(net_msg) < hdr->data_len) {
+			/* need more payload */
+			data = page_address(net->page) + net->page_off;
+			datalen = (sizeof(net_msg) + hdr->data_len) -
+				  net->page_off;
+			err = net_recv_tcp_msg(inode, sock, data, datalen);
+			if (err > 0)
+				net->page_off += err;
+			if (err < 0) {
+				if (err == -EAGAIN)
+					read_eagain = 1;
+				goto done;
 			}
-			net_discard_data(&hdr, sock);
-			err = 0;
-			goto error;
 		}
 
-		/* find a handler for it */
-		hnd = net_lookup_handler(hdr.msg_type, hdr.key);
-		
-		if (!hnd) {
-			err = -EINVAL;
-			netprintk0("no handler for message.\n");
-			goto error;
+		if (net->page_off - sizeof(net_msg) == hdr->data_len) {
+			/* whooo peee, we have a full message */
+			/* after calling this the message is toast */
+			err = net_process_message(inode, sock, hdr);
+			net->page_off = 0;
 		}
-		err = net_dispatch_message(inode, sock, hdr.data_len, hnd);
+	
+done:
+		/* we might not have consumed all the data that has been
+		 * announced to us through data_ready.. keep the net active
+		 * as long as there may still be remaining data.  
+		 * data_ready might have been called after we saw eagain */
+		spin_lock_bh(&net_active_lock);
+		if (!read_eagain && list_empty(&net->active_item))
+			list_add_tail(&net->active_item, &net_active_list);
+		spin_unlock_bh(&net_active_lock);
 
-		/* if node has requested status return, do it now */
-		if (hdr.status) {
-			tmperr = net_send_status_magic(inode, sock, &hdr, err);
-			netprintk0("yay, sent!\n");
-		} else if (err < 0) {
-			netprintk("dispatch (%u/%u) returned %d\n",
-				  hdr.msg_type, hdr.key, err);
+		netprintk("net %p finished reading with %d\n", net, err);
+		if (err < 0 && err != -EAGAIN) {
+			netprintk("socket saw err %d, closing\n", err);
+			net_dump_and_close_sock(sock, inode);
 		}
-
-
-		net_put_handler(hnd);
-
-		// re-add this socket
-		spin_lock(&net_list_lock);
-		list_add_tail(&net->list, &net_recv_list);
-		spin_unlock(&net_list_lock);
-		goto start_over;
-
-error:
-		if (err < 0) {
-			if (net_link_down(err, sock)) {
-				// do NOT re-add this socket
-				netprintk("link down! err=%d\n", err);
-				net_dump_and_close_sock(sock, inode);
-			} else {
-				netprintk("bad message... node=%lu.\n", inode->i_ino);
-				net_discard_data(&hdr, sock);
-				// re-add this socket
-				spin_lock(&net_list_lock);
-				list_add_tail(&net->list, &net_recv_list);
-				spin_unlock(&net_list_lock);
-			}
-		} else {
-			// re-add this socket
-			spin_lock(&net_list_lock);
-			list_add_tail(&net->list, &net_recv_list);
-			spin_unlock(&net_list_lock);
-		}
-		goto start_over;
 	}
-	spin_unlock(&net_list_lock);
 
 	return 0;
 }
 
 
-
-
-void net_do_status_return(u64 msg_num, s32 status)
+static void net_do_status_return(net_msg *hdr)
 {
-	net_status_ctxt *nsc;
+	net_status_ctxt *nsc = NULL;
 	struct list_head *iter;
 
 	spin_lock(&net_status_lock);
 	list_for_each(iter, &net_status_list) {
 		nsc = list_entry(iter, net_status_ctxt, list);
-		if (nsc->msg_num == msg_num) {
-			nsc->status = status;
+		if (nsc->msg_num == hdr->msg_num) {
+			nsc->status = hdr->status;
 			atomic_set(&nsc->woken, 1);
 			list_del(&nsc->list);
-			spin_unlock(&net_status_lock);
 			wake_up(&nsc->wq);
-			return;
+			break;
 		}
+		nsc = NULL;
 	}
 	spin_unlock(&net_status_lock);
+
+	msgprintk(hdr, "sent to nsc %p\n", nsc);
 }
 
-static int net_dispatch_message(struct inode *inode, struct socket *sock, int len, net_msg_handler *hnd)
+/* this returns -errno if the header was unknown or too large, etc.
+ * after this is called the buffer us reused for the next message */
+static int net_process_message(struct inode *inode, struct socket *sock,
+			       net_msg *hdr)
 {
-	int ret = -EINVAL;
-	int packet_len;
+	int ret;
+	net_msg_handler *hnd = NULL;
 
-	packet_len = len + sizeof(net_msg);
+	netprintk("received message header... magic=%u type=%u key=%u\n", 
+		  hdr->magic, hdr->msg_type, hdr->key);
 
-	spin_lock(&hnd->lock);
-	if (hnd->flags & NET_HND_IN_USE) {
-		netprintk0("EEEEEK!  handler in use! bugbug\n");
-		spin_unlock(&hnd->lock);
-		return -EINVAL;
+	if (hdr->magic == NET_MSG_STATUS_MAGIC) {
+		/* special type for returning message status */
+		net_do_status_return(hdr);
+		ret = 0;
+		goto out;
+	} else if (hdr->magic != NET_MSG_MAGIC) {
+		msgprintk0(hdr, "bad magic\n");
+		ret = -EINVAL;
+		goto out;
 	}
-	if (len > hnd->max_len) {
-		netprintk("eek! advertised message data len is too large %u (max: %u)\n",
-		       len, hnd->max_len);
-		spin_unlock(&hnd->lock);
-		return -EINVAL;
+
+	if (net_is_valid_error_type(hdr->msg_type)) {
+		if (hdr->msg_type == NET_ALREADY_CONNECTED) {
+			msgprintk0(hdr, "error: there is already a socket "
+				   "for this connection\n");
+		} else if (hdr->msg_type == NET_UNKNOWN_HOST) {
+			msgprintk0(hdr, "error: unknown host\n");
+		}
+		ret = 0;
+		goto out;
 	}
-	hnd->flags |= NET_HND_IN_USE;
-	spin_unlock(&hnd->lock);
 
-	memset(hnd->buf, 0, packet_len);
-	ret = net_recv_tcp_msg(inode, sock, hnd->buf, &packet_len);
-	if (ret < 0) {
-		netprintk("net_recv_tcp_msg returned: %d\n", ret);
-	} else {
-		/* convert just the header to host byteorder
-		 * handler func is responsible for the data */
-		net_msg_to_host((net_msg *)hnd->buf);
-
-		net_num_dispatched++;
-		ret = (hnd->func)((net_msg *)hnd->buf, packet_len, hnd->data);
+	/* find a handler for it */
+	hnd = net_lookup_handler(hdr->msg_type, hdr->key);
+	if (!hnd) {
+		ret = -EINVAL;
+		msgprintk0(hdr, "no handler for message.\n");
+		goto out;
 	}
-	
+
+	ret = 0;
 	spin_lock(&hnd->lock);
-	hnd->flags &= ~NET_HND_IN_USE;
+	if (hdr->data_len > hnd->max_len)
+		ret = -EOVERFLOW;
 	spin_unlock(&hnd->lock);
+	if (ret) {
+		msgprintk(hdr, "advertised data_len > handlers max_len (%u)\n",
+			  hnd->max_len);
+		goto out;
+	}
 
+	net_num_dispatched++;
+	ret = (hnd->func)(hdr, sizeof(net_msg) + hdr->data_len, hnd->data);
+	
+	/* if node has requested status return, do it now */
+	if (hdr->status) {
+		int tmpret;
+		/* this destroys the hdr, so don't use it after this */
+		tmpret = net_send_status_magic(inode, sock, hdr, ret);
+		hdr = NULL;
+		netprintk("sending status %d returned %d\n", ret, tmpret);
+		ret = 0;
+	} else if (ret < 0) {
+		msgprintk(hdr, "dispatch returned %d\n", ret);
+	}
+
+out:
+	if (hnd)
+		net_put_handler(hnd);
 	return ret;
 }
 
+static void net_record_new_sock(net_inode_private *net)
+{
+	struct sock *sk;
 
+	BUG_ON(net->sock == NULL);
+	BUG_ON(net->sock->sk == NULL);
 
+	netprintk("added net %p to net_active_list\n", net);
+
+	sk = net->sock->sk;
+	write_lock_bh(&sk->sk_callback_lock);
+	if (sk->sk_user_data != net) {
+		net->orig_data_ready = sk->sk_data_ready;
+		net->orig_error_report = sk->sk_error_report;
+
+		sk->sk_user_data = net;
+		sk->sk_data_ready = net_data_ready;
+		sk->sk_error_report = net_error_report;
+	}
+	write_unlock_bh(&sk->sk_callback_lock);
+
+	/* record it as active initially to make sure we didn't miss
+	 * any incoming data while we were setting it up */
+	spin_lock_bh(&net_active_lock);
+	if (list_empty(&net->active_item))
+		list_add_tail(&net->active_item, &net_active_list);
+	spin_unlock_bh(&net_active_lock);
+
+	if (recv_sock != NULL)
+		wake_up(recv_sock->sk->sk_sleep);
+}
+
 /*
  * net_accept_tcp_connections()
  *
@@ -1334,19 +1376,10 @@
 						NIPQUAD(sin.sin_addr.s_addr), ntohs(sin.sin_port));
 	 			spin_lock(&priv->net.sock_lock); 
 				if (!priv->net.sock) {
-					netprintk("new sock, doesnt exist\n");
+					netprintk("new sock, giving net %p sock %p\n", net, sock); 
 					exists = 0;
 					priv->net.sock = sock;
-					if (current != net_recv_task) {
-						netprintk("net_recv_task=%p... maybe i should add THAT instead\n", net_recv_task);
-						if (net_recv_task == NULL) 
-							BUG();
-						init_waitqueue_entry(&priv->net.sleep, net_recv_task);
-					} else {
-						netprintk("process %p added to waitqueue\n", current);
-						init_waitqueue_entry(&priv->net.sleep, current);
-					}
-					add_wait_queue(sock->sk->sk_sleep, &(priv->net.sleep));
+
 				}
 	 			spin_unlock(&priv->net.sock_lock); 
 
@@ -1355,11 +1388,7 @@
 					net_send_error(sock, NET_ALREADY_CONNECTED);
 					net_dump_and_close_sock(sock, inode);
 				} else {
-					spin_lock(&net_list_lock);
-					netprintk("added inode %lu to net_recv_list\n", inode->i_ino);
-					if (list_empty(&net->list))
-						list_add_tail(&net->list, &net_recv_list);
-					spin_unlock(&net_list_lock);
+					net_record_new_sock(net);
 				}
 			}
 
@@ -1409,115 +1438,11 @@
         return len;
 }
 
-
-static int net_recv_message_header(net_msg *hdr, struct socket *sock)
-{
-	int status;
-	mm_segment_t oldfs;
-	struct iovec iov = {
-		.iov_base = hdr,
-		.iov_len = sizeof(net_msg)
-	};
-	struct msghdr msg = {
-		.msg_iov = &iov,
-		.msg_iovlen = 1,
-		.msg_control = NULL,
-		.msg_controllen = 0,
-		.msg_name = 0,    // (struct sockaddr *) &sin,
-		.msg_namelen = 0, // sizeof (sin),
-		.msg_flags = 0
-	};
-
-	status = 0;
-	oldfs = get_fs(); set_fs(KERNEL_DS);
-	status = sock_recvmsg(sock, &msg, sizeof(net_msg), MSG_PEEK);
-	set_fs(oldfs);
-
-	if (status < 0) {
-		if (status == -ERESTARTSYS) {
-			status = -EBADF;
-			netprintk ("Shutting down\n");
-		} else {
-			status = -EINVAL;
-			netprintk ("unable to recvmsg, error=%d\n", status);
-		}
-	}
-	// error or bytes received
-	return status;
-}
-
 static void net_dump_and_close_sock(struct socket *sock, struct inode *inode)
 {
 	nm_node_inode_private *priv = NULL;
-
-	net_drain_sock(sock);
-
-	if (sock->sk) {
-		if (inode) {
-	       		priv = inode->u.generic_ip;
-			if (priv) {
-	 			spin_lock(&priv->net.sock_lock); 
-				remove_wait_queue(sock->sk->sk_sleep, &(priv->net.sleep));
-				priv->net.sock = NULL;
-	 			spin_unlock(&priv->net.sock_lock); 
-			}
-		}
-	}
-	sock_release(sock);
-}
-
-/* WARNING: This really does discard data. I hope you didn't want to
- * do anything with it... */
-static void net_discard_data(net_msg *hdr, struct socket *sock)
-{
 	struct msghdr           msg;
 	struct iovec            iov;
-	int                     read;
-	mm_segment_t            oldfs;
-	unsigned int            size;
-
-	if (!sock->sk)
-		return;
-
-	size = hdr->data_len + sizeof(net_msg);
-	BUG_ON(size > PAGE_SIZE);
-
-	do {
-		msg.msg_name     = 0;
-		msg.msg_namelen  = 0;
-		msg.msg_iov      = &iov;
-		msg.msg_iovlen   = 1;
-		msg.msg_control  = NULL;
-		msg.msg_controllen = 0;
-		msg.msg_flags    = 0;
-		msg.msg_iov->iov_base = net_junk_buf;
-		msg.msg_iov->iov_len  = (__kernel_size_t)size;
-
-		oldfs = get_fs();
-		set_fs(KERNEL_DS);
-
-		read = sock_recvmsg(sock, &msg, size, 0);
-
-		set_fs(oldfs);
-
-		if (read < 0) {
-			netprintk("WOW, error code %d\n", read);
-			return;
-		}
-
-		netprintk("read %d of %u remaining\n", read, size);
-
-		if (read > size)
-			BUG();
-
-		size -= read;
-	} while (size);
-}
-
-static void net_drain_sock(struct socket *sock)
-{
-	struct msghdr           msg;
-	struct iovec            iov;
 	int                     len;
 	mm_segment_t            oldfs;
 
@@ -1532,7 +1457,7 @@
 			msg.msg_control  = NULL;
 			msg.msg_controllen = 0;
 			msg.msg_flags    = MSG_DONTWAIT;
-			msg.msg_iov->iov_base = net_junk_buf;
+			msg.msg_iov->iov_base = page_address(priv->net.page);
 			msg.msg_iov->iov_len  = (__kernel_size_t)PAGE_SIZE;
 
 			oldfs = get_fs();
@@ -1544,8 +1469,23 @@
 				break;
 		}
 	}
+
+	if (sock->sk) {
+		if (inode) {
+	       		priv = inode->u.generic_ip;
+			if (priv) {
+	 			spin_lock(&priv->net.sock_lock); 
+				priv->net.sock = NULL;
+	 			spin_unlock(&priv->net.sock_lock); 
+			}
+		}
+	}
+	sock_release(sock);
 }
 
+/* this is racey beyond reason, the userspace work will involve some tracking
+ * structures that senders can wait on or time out on for connections
+ * to happen */
 int net_init_tcp_sock(struct inode *inode)
 {
 	nm_node_inode_private *priv;
@@ -1599,19 +1539,6 @@
 		spin_lock(&net->sock_lock);
 		net->sock = sock;
 		net->flags &= ~NET_FLAG_CREATING_SOCKET;
-
-		netprintk0("1) ok this node is actively trying to connect, add to waitqueue\n");
-		if (current != net_recv_task) {
-			netprintk("net_recv_task=%p... maybe i should add THAT instead\n", net_recv_task);
-			if (net_recv_task == NULL) 
-				BUG();
-			init_waitqueue_entry(&net->sleep, net_recv_task);
-		} else {
-			netprintk("process %p added to waitqueue\n", current);
-			init_waitqueue_entry(&net->sleep, current);
-		}
-		add_wait_queue(sock->sk->sk_sleep, &net->sleep);
-
 		spin_unlock(&net->sock_lock);
 		goto out;
 	}
@@ -1632,19 +1559,6 @@
 					spin_lock(&net->sock_lock);
 					net->flags &= ~NET_FLAG_CREATING_SOCKET;
 					net->sock = sock;
-
-					netprintk0("2) ok this node is actively trying to connect, add to waitqueue\n");
-					if (current != net_recv_task) {
-						netprintk("net_recv_task=%p... maybe i should add THAT instead\n", net_recv_task);
-						if (net_recv_task == NULL) 
-							BUG();
-						init_waitqueue_entry(&net->sleep, net_recv_task);
-					} else {
-						netprintk("process %p added to waitqueue\n", current);
-						init_waitqueue_entry(&net->sleep, current);
-					}
-					add_wait_queue(sock->sk->sk_sleep, &net->sleep);
-
 					spin_unlock(&net->sock_lock);
 					break;
 				} else {
@@ -1693,11 +1607,7 @@
 	       	if (sock) 
 			sock_release(sock);
 	} else {
-		/* add this inode to the receive list, if not already */
-		spin_lock(&net_list_lock);
-		if (list_empty(&net->list))
-			list_add_tail(&net->list, &net_recv_list);
-		spin_unlock(&net_list_lock);
+		net_record_new_sock(net);
 	}
 
 	return err;
@@ -1719,7 +1629,7 @@
 			     SOCK_STREAM, IPPROTO_TCP,
 			     &recv_sock);
 	if (status < 0) {
-		netprintk ("unable to create socket, error=%d", status);
+		netprintk ("unable to create socket, error=%d\n", status);
 		goto bail;
 	}
 
@@ -1734,7 +1644,7 @@
 					 (struct sockaddr *)&sin,
 					 sizeof(sin));
 	if (status < 0) {
-		netprintk ("unable to bind socket to port %d, error=%d", 
+		netprintk ("unable to bind socket to port %d, error=%d\n", 
 			ntohs(ip_port), status);
 	}
 

Modified: trunk/cluster/tcp.h
===================================================================
--- trunk/cluster/tcp.h	2005-01-14 01:29:40 UTC (rev 1772)
+++ trunk/cluster/tcp.h	2005-01-14 23:41:58 UTC (rev 1773)
@@ -129,9 +129,7 @@
 	u32 key;
 	net_msg_handler_func *func;
 	void *data;
-	net_msg hdr;
 	u32 max_len;
-	void *buf;
 	spinlock_t lock;
 	atomic_t refcnt;
 	int flags;
@@ -146,17 +144,11 @@
 	atomic_t woken;
 } net_status_ctxt;
 
-void net_do_status_return(u64 msg_num, s32 status);
+#define NET_MAX_PAYLOAD_BYTES  (4096 - sizeof(net_msg))
 
-/* no clue for these yet... */
-#define NET_MIN_MSG_LEN  (0)
-#define NET_MAX_MSG_LEN  (8192)
-	
-
 /* RESERVED */
 #define NET_ALREADY_CONNECTED   (0xfff0)
 #define NET_UNKNOWN_HOST        (0xfff1)
-	
 
 static inline int net_is_valid_error_type(u32 err_type)
 {
@@ -168,7 +160,6 @@
 		       
 
 #define NET_HND_VAR_LEN   0x00000001
-#define NET_HND_IN_USE    0x00000002
 
 static inline int net_handler_msg_len_ok(net_msg_handler *handler, u32 len)
 {
@@ -234,8 +225,7 @@
 };
 
 int net_register_handler(u32 msg_type, u32 key, int flags, 
-			 u32 max_len, net_msg_handler_func *func, void *data, void *buf);
-int net_recv_tcp_msg (struct inode *inode, struct socket *sock, void *data, u32 *packet_len);
+			 u32 max_len, net_msg_handler_func *func, void *data);
 int net_send_error(struct socket *sock, u32 err_type);
 int net_init_tcp_sock(struct inode *inode);
 int net_send_message(u32 msg_type, u32 key, void *data, u32 len, struct inode *inode, int *status);

Modified: trunk/src/ocfs.h
===================================================================
--- trunk/src/ocfs.h	2005-01-14 01:29:40 UTC (rev 1772)
+++ trunk/src/ocfs.h	2005-01-14 23:41:58 UTC (rev 1773)
@@ -384,8 +384,6 @@
 	struct completion vote_event_init;
 
 	u32 net_key;
-	char *net_vote_buf;
-	char *net_response_buf;
 	spinlock_t net_response_lock;
 	unsigned int net_response_ids;
 	struct list_head net_response_list;

Modified: trunk/src/vote.c
===================================================================
--- trunk/src/vote.c	2005-01-14 01:29:40 UTC (rev 1772)
+++ trunk/src/vote.c	2005-01-14 23:41:58 UTC (rev 1773)
@@ -807,34 +807,16 @@
 	int i = MAX_VOL_ID_LENGTH - sizeof(osb->net_key);
 
 	memcpy(&osb->net_key, &osb->uuid[i], sizeof(osb->net_key));
-	osb->net_response_buf = osb->net_vote_buf = NULL;
 	osb->net_response_ids = 0;
 	spin_lock_init(&osb->net_response_lock);
 	INIT_LIST_HEAD(&osb->net_response_list);
 
-	osb->net_response_buf = kmalloc(sizeof(ocfs2_response_msg),
-					GFP_KERNEL);
-	if (!osb->net_response_buf) {
-		status = -ENOMEM;
-		LOG_ERROR_STATUS(status);
-		goto bail;
-	}
-
-	osb->net_vote_buf = kmalloc(sizeof(ocfs2_vote_msg),
-				    GFP_KERNEL);
-	if (!osb->net_vote_buf) {
-		status = -ENOMEM;
-		LOG_ERROR_STATUS(status);
-		goto bail;
-	}
-
 	status = net_register_handler(OCFS2_MESSAGE_TYPE_RESPONSE,
 				      osb->net_key,
 				      0,
 				      sizeof(ocfs2_response_msg),
 				      ocfs2_handle_response_message,
-				      osb,
-				      osb->net_response_buf);
+				      osb);
 	if (status < 0) {
 		LOG_ERROR_STATUS(status);
 		goto bail;
@@ -845,8 +827,7 @@
 				      0,
 				      sizeof(ocfs2_vote_msg),
 				      ocfs2_handle_vote_message,
-				      osb,
-				      osb->net_vote_buf);
+				      osb);
 	if (status < 0) {
 		/* TODO: net_unregister here! */
 		LOG_ERROR_STATUS(status);
@@ -854,15 +835,9 @@
 	}
 
 bail:
-	if (status < 0) {
-		if (osb->net_response_buf)
-			kfree(osb->net_response_buf);
-		if (osb->net_vote_buf)
-			kfree(osb->net_vote_buf);
-		osb->net_response_buf = osb->net_vote_buf = NULL;
-		/* 0 indicates we never registered anything */
+	if (status < 0)
 		osb->net_key = 0;
-	}
+
 	return status;
 }
 
@@ -876,7 +851,4 @@
 
 	if (!list_empty(&osb->net_response_list))
 		LOG_ERROR_STR("net response list not empty!\n");
-
-	kfree(osb->net_response_buf);
-	kfree(osb->net_vote_buf);
 }



More information about the Ocfs2-commits mailing list