[Ocfs2-commits] mfasheh commits r1746 - trunk/cluster
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Thu Jan 6 18:57:21 CST 2005
Author: mfasheh
Date: 2005-01-06 18:57:19 -0600 (Thu, 06 Jan 2005)
New Revision: 1746
Modified:
trunk/cluster/tcp.c
Log:
* make netprintk a bit more informative, add some more printks
* add net_discard_data which only discards a single message off the queue,
instead of PAGE_SIZE amount of data. This should fix a bug with completely
missed packets.
* populate the "data_len" field properly in *all* message headers.
The next two by Zach Brown:
* fix the net_receive_thread to use wait_event_interruptible
* verify sent message sizes
Modified: trunk/cluster/tcp.c
===================================================================
--- trunk/cluster/tcp.c 2005-01-06 18:41:39 UTC (rev 1745)
+++ trunk/cluster/tcp.c 2005-01-07 00:57:19 UTC (rev 1746)
@@ -56,8 +56,8 @@
#include "nodemanager.h"
//#if 0
-#define netprintk(x, arg...) printk("(tcp:%d) " x, current->pid, ##arg)
-#define netprintk0(x) printk("(tcp:%d) " x, current->pid)
+#define netprintk(x, arg...) printk("(tcp:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__, ##arg)
+#define netprintk0(x) printk("(tcp:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__)
//#else
#if 0
#define netprintk(x, arg...)
@@ -113,7 +113,8 @@
static void net_remove_handlers(void);
static int net_check_message_valid(net_msg *msg, u32 len);
static void net_dump_and_close_sock(struct socket *sock, struct inode *inode);
-static void net_dump_msg(struct socket *sock, struct inode *inode);
+static void net_drain_sock(struct socket *sock);
+static void net_discard_data(net_msg *hdr, struct socket *sock);
static int net_recv_message_header(net_msg *hdr, struct socket *sock);
static int net_init_tcp_recv_sock(void);
static int net_receive_thread(void *data);
@@ -402,10 +403,20 @@
netprintk ("net thread exited\n");
}
+static int net_rx_should_wake(void)
+{
+ int empty;
+ spin_lock(&net_list_lock);
+ empty = list_empty(&net_recv_list);
+ spin_unlock(&net_list_lock);
+
+ return !empty || tcp_sk(recv_sock->sk)->accept_queue;
+}
+
static int net_receive_thread(void *data)
{
- int status = 0;
+ int status;
DECLARE_WAITQUEUE(main_wait, current);
util_daemonize ("netrecv", strlen("netrecv"), 1);
@@ -421,9 +432,8 @@
if (!list_empty(&net_recv_list))
status = net_receive();
- set_current_state(TASK_INTERRUPTIBLE);
- schedule_timeout(20*HZ);
- current->state = TASK_RUNNING;
+ wait_event_interruptible(*recv_sock->sk->sk_sleep,
+ net_rx_should_wake());
if (signal_pending(current)) {
netprintk0("net recv thread got signal!\n");
@@ -763,7 +773,16 @@
return status;
} /* net_recv_tcp_msg */
+int net_sock_sendmsg(struct socket *sock, struct msghdr *msg, size_t size)
+{
+ int ret = sock_sendmsg(sock, msg, size);
+ /* XXX just so that we know things aren't going horribly wrong
+ * until we get a better transport core in place */
+ BUG_ON(ret >= 0 && ret != size);
+ return ret;
+}
+
/*
* net_send_tcp_msg()
*
@@ -811,7 +830,7 @@
status = 0;
set_fs (get_ds ());
- error = sock_sendmsg (sock, &msg, packet_len);
+ error = net_sock_sendmsg (sock, &msg, packet_len);
set_fs (oldfs);
if (error < 0) {
@@ -1132,7 +1151,6 @@
int err = 0;
int tmperr;
-
start_over:
spin_lock(&net_list_lock);
list_for_each_safe(iter, tmpiter, &net_recv_list) {
@@ -1174,7 +1192,7 @@
hdr.magic, hdr.msg_type, hdr.key);
if (hdr.magic == NET_MSG_STATUS_MAGIC) {
- net_dump_msg(sock, inode);
+ net_discard_data(&hdr, sock);
/* special type for returning message status */
net_do_status_return(hdr.msg_num, hdr.status);
err = 0;
@@ -1192,7 +1210,7 @@
} else if (hdr.msg_type == NET_UNKNOWN_HOST) {
netprintk0("error: unknown host\n");
}
- net_dump_msg(sock, inode);
+ net_discard_data(&hdr, sock);
err = 0;
goto error;
}
@@ -1218,7 +1236,10 @@
#endif
hdr.status = err;
hdr.magic = NET_MSG_STATUS_MAGIC; // twiddle the magic
+ hdr.data_len = 0;
+ netprintk("about to send status %d\n", err);
tmperr = net_send_tcp_msg(inode, sock, &hdr, sizeof(net_msg));
+ netprintk0("yay, sent!\n");
} else if (err < 0) {
netprintk("dispatch (%u/%u) returned %d\n",
hdr.msg_type, hdr.key, err);
@@ -1241,7 +1262,7 @@
net_dump_and_close_sock(sock, inode);
} else {
netprintk("bad message... node=%lu.\n", inode->i_ino);
- net_dump_msg(sock, inode);
+ net_discard_data(&hdr, sock);
// re-add this socket
spin_lock(&net_list_lock);
list_add_tail(&net->list, &net_recv_list);
@@ -1361,6 +1382,8 @@
sock_release(sock);
continue;
}
+
+ tcp_sk(recv_sock->sk)->nonagle = 1;
slen = sizeof(sin);
error = sock->ops->getname(sock, (struct sockaddr *) &sin, &slen, 1);
@@ -1440,6 +1463,7 @@
memset(&err, 0, sizeof(net_msg));
err.magic = NET_MSG_MAGIC;
err.msg_type = err_type;
+ err.data_len = 0;
msg.msg_name = 0;
msg.msg_namelen = 0;
@@ -1452,7 +1476,7 @@
msg.msg_iov->iov_base = (char*) &err;
oldfs = get_fs(); set_fs(KERNEL_DS);
- len = sock_sendmsg(sock, &msg, (size_t)(sizeof(net_msg)));
+ len = net_sock_sendmsg(sock, &msg, (size_t)(sizeof(net_msg)));
set_fs(oldfs);
return len;
@@ -1499,7 +1523,7 @@
{
nm_node_inode_private *priv = NULL;
- net_dump_msg(sock, inode);
+ net_drain_sock(sock);
if (sock->sk) {
if (inode) {
@@ -1515,10 +1539,58 @@
sock_release(sock);
}
-static void net_dump_msg(struct socket *sock, struct inode *inode)
+/* WARNING: This really does discard data. I hope you didn't want to
+ * do anything with it... */
+static void net_discard_data(net_msg *hdr, struct socket *sock)
{
struct msghdr msg;
struct iovec iov;
+ int read;
+ mm_segment_t oldfs;
+ unsigned int size;
+
+ if (!sock->sk)
+ return;
+
+ size = hdr->data_len + sizeof(net_msg);
+ BUG_ON(size > PAGE_SIZE);
+
+ do {
+ msg.msg_name = 0;
+ msg.msg_namelen = 0;
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_flags = 0;
+ msg.msg_iov->iov_base = net_junk_buf;
+ msg.msg_iov->iov_len = (__kernel_size_t)size;
+
+ oldfs = get_fs();
+ set_fs(KERNEL_DS);
+
+ read = sock_recvmsg(sock, &msg, size, 0);
+
+ set_fs(oldfs);
+
+ if (read < 0) {
+ netprintk("WOW, error code %d\n", read);
+ return;
+ }
+
+ netprintk("read %d of %u remaining\n", read, size);
+
+ if (read > size)
+ BUG();
+
+ size -= read;
+ } while (size);
+}
+
+static void net_drain_sock(struct socket *sock)
+{
+ struct msghdr msg;
+ struct iovec iov;
int len;
mm_segment_t oldfs;
@@ -1535,15 +1607,18 @@
msg.msg_flags = MSG_DONTWAIT;
msg.msg_iov->iov_base = net_junk_buf;
msg.msg_iov->iov_len = (__kernel_size_t)PAGE_SIZE;
- len = 0;
- oldfs = get_fs(); set_fs(KERNEL_DS);
+
+ oldfs = get_fs();
+ set_fs(KERNEL_DS);
len = sock_recvmsg(sock, &msg, PAGE_SIZE, MSG_DONTWAIT);
set_fs(oldfs);
+
+ if (!len)
+ break;
}
}
}
-
int net_init_tcp_sock(struct inode *inode)
{
nm_node_inode_private *priv;
More information about the Ocfs2-commits
mailing list