[Ocfs2-commits] mfasheh commits r1746 - trunk/cluster

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Thu Jan 6 18:57:21 CST 2005


Author: mfasheh
Date: 2005-01-06 18:57:19 -0600 (Thu, 06 Jan 2005)
New Revision: 1746

Modified:
   trunk/cluster/tcp.c
Log:
* make netprintk a bit more informative, add some more printks

* add net_discard_data which only discards a single message off the queue,  
  instead of PAGE_SIZE amount of data. This should fix a bug with completely
  missed packets.

* populate the "data_len" field properly in *all* message headers.

The next two by Zach Brown:
* fix the net_receive_thread to use wait_event_interruptible      

* verify sent message sizes      



Modified: trunk/cluster/tcp.c
===================================================================
--- trunk/cluster/tcp.c	2005-01-06 18:41:39 UTC (rev 1745)
+++ trunk/cluster/tcp.c	2005-01-07 00:57:19 UTC (rev 1746)
@@ -56,8 +56,8 @@
 #include "nodemanager.h"
 
 //#if 0
-#define netprintk(x, arg...)    printk("(tcp:%d) " x, current->pid, ##arg)
-#define netprintk0(x)           printk("(tcp:%d) " x, current->pid)
+#define netprintk(x, arg...)    printk("(tcp:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__, ##arg)
+#define netprintk0(x)           printk("(tcp:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__)
 //#else
 #if 0
 #define netprintk(x, arg...)    
@@ -113,7 +113,8 @@
 static void net_remove_handlers(void);
 static int net_check_message_valid(net_msg *msg, u32 len);
 static void net_dump_and_close_sock(struct socket *sock, struct inode *inode);
-static void net_dump_msg(struct socket *sock, struct inode *inode);
+static void net_drain_sock(struct socket *sock);
+static void net_discard_data(net_msg *hdr, struct socket *sock);
 static int net_recv_message_header(net_msg *hdr, struct socket *sock);
 static int net_init_tcp_recv_sock(void);
 static int net_receive_thread(void *data);
@@ -402,10 +403,20 @@
 	netprintk ("net thread exited\n");
 }
 
+static int net_rx_should_wake(void)
+{
+	int empty;
 
+	spin_lock(&net_list_lock);
+	empty = list_empty(&net_recv_list);
+	spin_unlock(&net_list_lock);
+
+	return !empty || tcp_sk(recv_sock->sk)->accept_queue;
+}
+
 static int net_receive_thread(void *data)
 {
-	int status = 0;
+	int status;
 	DECLARE_WAITQUEUE(main_wait, current);
 
 	util_daemonize ("netrecv", strlen("netrecv"), 1);
@@ -421,9 +432,8 @@
 			if (!list_empty(&net_recv_list))
 				status = net_receive();
 
-			set_current_state(TASK_INTERRUPTIBLE);
-			schedule_timeout(20*HZ);
-			current->state = TASK_RUNNING;
+			wait_event_interruptible(*recv_sock->sk->sk_sleep,
+						 net_rx_should_wake());
 
 			if (signal_pending(current)) {
 				netprintk0("net recv thread got signal!\n");
@@ -763,7 +773,16 @@
 	return status;
 }				/* net_recv_tcp_msg */
 
+int net_sock_sendmsg(struct socket *sock, struct msghdr *msg, size_t size)
+{
+	int ret = sock_sendmsg(sock, msg, size);
 
+	/* XXX just so that we know things aren't going horribly wrong
+	 * until we get a better transport core in place */
+	BUG_ON(ret >= 0 && ret != size);
+	return ret;
+}
+
 /*
  * net_send_tcp_msg()
  *
@@ -811,7 +830,7 @@
 		
 		status = 0;	
 		set_fs (get_ds ());
-		error = sock_sendmsg (sock, &msg, packet_len);
+		error = net_sock_sendmsg (sock, &msg, packet_len);
 		set_fs (oldfs);
 	
 		if (error < 0) {
@@ -1132,7 +1151,6 @@
 	int err = 0;
 	int tmperr;
 
-
 start_over:	
 	spin_lock(&net_list_lock);
 	list_for_each_safe(iter, tmpiter, &net_recv_list) {
@@ -1174,7 +1192,7 @@
 			  hdr.magic, hdr.msg_type, hdr.key);
 
 		if (hdr.magic == NET_MSG_STATUS_MAGIC) {
-			net_dump_msg(sock, inode);
+			net_discard_data(&hdr, sock);
 			/* special type for returning message status */
 			net_do_status_return(hdr.msg_num, hdr.status);
 			err = 0;
@@ -1192,7 +1210,7 @@
 			} else if (hdr.msg_type == NET_UNKNOWN_HOST) {
 				netprintk0("error: unknown host\n");
 			}
-			net_dump_msg(sock, inode);
+			net_discard_data(&hdr, sock);
 			err = 0;
 			goto error;
 		}
@@ -1218,7 +1236,10 @@
 #endif
 			hdr.status = err;
 			hdr.magic = NET_MSG_STATUS_MAGIC;  // twiddle the magic
+			hdr.data_len = 0;
+			netprintk("about to send status %d\n", err);
 			tmperr = net_send_tcp_msg(inode, sock, &hdr, sizeof(net_msg));
+			netprintk0("yay, sent!\n");
 		} else if (err < 0) {
 			netprintk("dispatch (%u/%u) returned %d\n",
 				  hdr.msg_type, hdr.key, err);
@@ -1241,7 +1262,7 @@
 				net_dump_and_close_sock(sock, inode);
 			} else {
 				netprintk("bad message... node=%lu.\n", inode->i_ino);
-				net_dump_msg(sock, inode);
+				net_discard_data(&hdr, sock);
 				// re-add this socket
 				spin_lock(&net_list_lock);
 				list_add_tail(&net->list, &net_recv_list);
@@ -1361,6 +1382,8 @@
 			sock_release(sock);
 			continue;
 		}
+
+		tcp_sk(recv_sock->sk)->nonagle = 1;
 			
 		slen = sizeof(sin);
 		error = sock->ops->getname(sock, (struct sockaddr *) &sin, &slen, 1);
@@ -1440,6 +1463,7 @@
 	memset(&err, 0, sizeof(net_msg));	
 	err.magic        = NET_MSG_MAGIC;
 	err.msg_type     = err_type;
+	err.data_len     = 0;
 
         msg.msg_name     = 0;
         msg.msg_namelen  = 0;
@@ -1452,7 +1476,7 @@
         msg.msg_iov->iov_base = (char*) &err;
 
         oldfs = get_fs(); set_fs(KERNEL_DS);
-        len = sock_sendmsg(sock, &msg, (size_t)(sizeof(net_msg)));
+        len = net_sock_sendmsg(sock, &msg, (size_t)(sizeof(net_msg)));
         set_fs(oldfs);
 
         return len;
@@ -1499,7 +1523,7 @@
 {
 	nm_node_inode_private *priv = NULL;
 
-	net_dump_msg(sock, inode);
+	net_drain_sock(sock);
 
 	if (sock->sk) {
 		if (inode) {
@@ -1515,10 +1539,58 @@
 	sock_release(sock);
 }
 
-static void net_dump_msg(struct socket *sock, struct inode *inode)
+/* WARNING: This really does discard data. I hope you didn't want to
+ * do anything with it... */
+static void net_discard_data(net_msg *hdr, struct socket *sock)
 {
 	struct msghdr           msg;
 	struct iovec            iov;
+	int                     read;
+	mm_segment_t            oldfs;
+	unsigned int            size;
+
+	if (!sock->sk)
+		return;
+
+	size = hdr->data_len + sizeof(net_msg);
+	BUG_ON(size > PAGE_SIZE);
+
+	do {
+		msg.msg_name     = 0;
+		msg.msg_namelen  = 0;
+		msg.msg_iov      = &iov;
+		msg.msg_iovlen   = 1;
+		msg.msg_control  = NULL;
+		msg.msg_controllen = 0;
+		msg.msg_flags    = 0;
+		msg.msg_iov->iov_base = net_junk_buf;
+		msg.msg_iov->iov_len  = (__kernel_size_t)size;
+
+		oldfs = get_fs();
+		set_fs(KERNEL_DS);
+
+		read = sock_recvmsg(sock, &msg, size, 0);
+
+		set_fs(oldfs);
+
+		if (read < 0) {
+			netprintk("WOW, error code %d\n", read);
+			return;
+		}
+
+		netprintk("read %d of %u remaining\n", read, size);
+
+		if (read > size)
+			BUG();
+
+		size -= read;
+	} while (size);
+}
+
+static void net_drain_sock(struct socket *sock)
+{
+	struct msghdr           msg;
+	struct iovec            iov;
 	int                     len;
 	mm_segment_t            oldfs;
 
@@ -1535,15 +1607,18 @@
 			msg.msg_flags    = MSG_DONTWAIT;
 			msg.msg_iov->iov_base = net_junk_buf;
 			msg.msg_iov->iov_len  = (__kernel_size_t)PAGE_SIZE;
-			len = 0;
-			oldfs = get_fs(); set_fs(KERNEL_DS);
+
+			oldfs = get_fs();
+			set_fs(KERNEL_DS);
 			len = sock_recvmsg(sock, &msg, PAGE_SIZE, MSG_DONTWAIT);
 			set_fs(oldfs);
+
+			if (!len)
+				break;
 		}
 	}
 }
 
-
 int net_init_tcp_sock(struct inode *inode)
 {
 	nm_node_inode_private *priv;



More information about the Ocfs2-commits mailing list