[Rds-commits] zab commits r76 - in trunk: . linux/net/rds

svn-commits@oss.oracle.com svn-commits at oss.oracle.com
Thu May 25 17:44:17 CDT 2006


Author: zab
Date: 2006-05-25 17:44:12 -0500 (Thu, 25 May 2006)
New Revision: 76

Added:
   trunk/linux/net/rds/loop.c
   trunk/linux/net/rds/loop.h
Modified:
   trunk/TODO
   trunk/linux/net/rds/Makefile
   trunk/linux/net/rds/ack.c
   trunk/linux/net/rds/connection.c
   trunk/linux/net/rds/flow.c
   trunk/linux/net/rds/message.c
   trunk/linux/net/rds/rds.h
   trunk/linux/net/rds/recv.c
   trunk/linux/net/rds/send.c
   trunk/linux/net/rds/tcp.c
   trunk/linux/net/rds/tcp_connect.c
   trunk/linux/net/rds/tcp_recv.c
   trunk/linux/net/rds/transport.c
Log:
Add the loopback transport for local connections.

This lets us move messages directly from the send to the receive queue and call
directly into ack processing.  This required a minimal send API extension to
let loopback get references to the next message to be sent as a whole rather
than one vec at a time.

The conn alloc API is changed a little to make it easier to return success
without doing anything.  Also, check out that wild bug in rds_flow_create() as
it "checks" the return of rds_flow_create().  Get there.

The recv path is refactored a little so that loopback sending can call it to
queue messages in the receive queue.

With the introduction of loopback a connection's transport can differ from
the transport of the bound socket.  This leads us to *only* use the socket's
transport to build up remote connections.

Loopback embeds an rds_incoming in rds_message and leans on the message
layer for a helper to copy the message to userspace.


Modified: trunk/TODO
===================================================================
--- trunk/TODO	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/TODO	2006-05-25 22:44:12 UTC (rev 76)
@@ -1,4 +1,6 @@
 
+clean up sending forward progress
+
 oh, right, poll().  POLLIN does things, POLLOUT always returns true :/
 
 connect() should be implemented, too. (don't forget getpeername)

Modified: trunk/linux/net/rds/Makefile
===================================================================
--- trunk/linux/net/rds/Makefile	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/Makefile	2006-05-25 22:44:12 UTC (rev 76)
@@ -2,4 +2,5 @@
  
 rds-y :=	af_rds.o ack.o bind.o connection.o flow.o message.o	\
 			recv.o send.o sysctl.o transport.o		\
-		tcp.o tcp_connect.o tcp_listen.o tcp_send.o tcp_recv.o
+		tcp.o tcp_connect.o tcp_listen.o tcp_send.o tcp_recv.o	\
+		loop.o

Modified: trunk/linux/net/rds/ack.c
===================================================================
--- trunk/linux/net/rds/ack.c	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/ack.c	2006-05-25 22:44:12 UTC (rev 76)
@@ -161,7 +161,7 @@
 			rds_ack_send(conn);
 
 		list_del_init(&inc->i_item);
-		rds_inc_put(conn->c_trans, inc);
+		rds_inc_put(inc);
 
 	}
 

Modified: trunk/linux/net/rds/connection.c
===================================================================
--- trunk/linux/net/rds/connection.c	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/connection.c	2006-05-25 22:44:12 UTC (rev 76)
@@ -24,6 +24,7 @@
 #endif /* KERNEL_HAS_INET_HASHTABLES */
 
 #include "rds.h"
+#include "loop.h"
 
 /* XXX export some stats for this */
 #define RDS_CONNECTION_HASH_BITS 12
@@ -73,6 +74,7 @@
 	struct rds_connection *conn, *tmp;
 	struct hlist_head *head = rds_conn_bucket(laddr, faddr);
 	unsigned long flags;
+	int ret;
 
 	spin_lock_irqsave(&rds_conn_lock, flags);
 	conn = rds_conn_lookup(head, laddr, faddr);
@@ -107,11 +109,20 @@
 	conn->c_ack_rm = NULL;
 	setup_timer(&conn->c_ack_timer, rds_ack_timer, (unsigned long)conn);
 
+	/*
+	 * This is where a connection becomes loopback.  If *any* RDS sockets
+	 * can bind to the destination address then we'd rather the messages
+	 * flow through loopback rather than either transport.
+	 */
+	if (rds_trans_get_preferred(faddr))
+		trans = &rds_loop_transport;
+
 	conn->c_trans = trans;
-	conn->c_transport_data = trans->conn_alloc(gfp);
-	if (conn->c_transport_data == NULL) {
+
+	ret = trans->conn_alloc(conn, gfp);
+	if (ret) {
 		kmem_cache_free(rds_conn_slab, conn);
-		conn = ERR_PTR(-ENOMEM);
+		conn = ERR_PTR(ret);
 		goto out;
 	}
 
@@ -179,7 +190,7 @@
 						 &conn->c_incs_for_acks,
 						 i_item) {
 				list_del_init(&inc->i_item);
-				rds_inc_put(conn->c_trans, inc);
+				rds_inc_put(inc);
 			}
 			if (conn->c_ack_rm)
 				rds_message_put(conn->c_ack_rm);

Modified: trunk/linux/net/rds/flow.c
===================================================================
--- trunk/linux/net/rds/flow.c	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/flow.c	2006-05-25 22:44:12 UTC (rev 76)
@@ -91,7 +91,6 @@
 struct rds_flow *rds_flow_create(__le32 laddr, __le16 lport, __le32 faddr,
 				 __le16 fport, gfp_t gfp)
 {
-	struct dst_entry *dst = NULL;
 	struct rds_flow *flow, *tmp;
 	struct rds_sock *rs = NULL;
 	struct rds_connection *conn = NULL;
@@ -110,8 +109,8 @@
 	}
 
 	conn = rds_conn_create(laddr, faddr, rs->rs_transport, gfp);
-	if (IS_ERR(dst)) {
-		ret = PTR_ERR(dst);
+	if (IS_ERR(conn)) {
+		ret = PTR_ERR(conn);
 		goto out;
 	}
 
@@ -164,13 +163,17 @@
 	return flow;
 }
 
+/*
+ * Since this calls ack_queue_inc it needs to not be called under locks.
+ */
 void rds_flow_put(struct rds_flow *flow)
 {
 	pr_debug("put flow %p ref %d\n", flow, atomic_read(&flow->f_refcount));
 	if (atomic_dec_and_test(&flow->f_refcount)) {
 		if (flow->f_inc) {
-			rds_ack_queue_inc(flow->f_inc, GFP_ATOMIC);
-			rds_inc_put(flow->f_conn->c_trans, flow->f_inc);
+			flow->f_conn->c_trans->ack_queue_inc(flow->f_inc,
+							     GFP_ATOMIC);
+			rds_inc_put(flow->f_inc);
 		}
 		if (flow->f_rs)
 			rds_sock_put(flow->f_rs);

Added: trunk/linux/net/rds/loop.c
===================================================================
--- trunk/linux/net/rds/loop.c	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/loop.c	2006-05-25 22:44:12 UTC (rev 76)
@@ -0,0 +1,148 @@
+/*
+ * Copyright (C) 2006 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+#include <linux/config.h>
+#include <linux/kernel.h>
+#include <linux/in.h>
+
+#include "rds.h"
+
+/*
+ * This 'loopback' transport is a special case for flows that originate
+ * and terminate on the same machine.
+ *
+ * Connection build-up notices if the destination address is thought of
+ * as a local address by a transport.  At that time it decides to use the
+ * loopback transport instead of the bound transport of the sending socket.
+ *
+ * The loopback transport's sending path just hands the sent rds_message
+ * straight to the receiving path via an embedded rds_incoming.  Once it
+ * is consumed the rds_message ref is dropped and the ack processing paths
+ * are called directly.
+ */
+
+/*
+ * we don't have to marshal the ack through messages, we can call the 
+ * ack processing path directly.
+ */
+static void rds_loop_ack_queue_inc(struct rds_incoming *inc, gfp_t gfp)
+{
+	struct rds_ack_entry ent = {
+		.e_sport = inc->i_hdr.h_sport,
+		.e_dport = inc->i_hdr.h_dport,
+		.e_seq = inc->i_hdr.h_sequence,
+	};
+	rds_ack_process(inc->i_conn, &ent);
+}
+
+/*
+ * rds_message_populate_headers() has initialized the inc's i_hdr to match
+ * the first header of the message which rds_recv_enqueue, etc, will use
+ * to generate acks.
+ *
+ * Usually a message transit both the sender and receiver's conns as it
+ * flows to the receiver.  In the loopback case, though, the receive path
+ * is handed the sending conn so the sense of the addresses is reversed.
+ * rds_recv_incoming() uses conn->c_faddr as the sending address but
+ * we use conn->c_laddr as the sending address.
+ */
+static int rds_loop_send(struct rds_connection *conn)
+{
+	struct rds_message *rm;
+	int ret;
+
+	while ((ret = rds_send_get_next_message(conn, &rm)) > 0) {
+
+		rds_inc_init(&rm->m_inc, conn);
+		rds_message_addref(rm); /* for the inc */
+
+		rds_recv_enqueue(NULL, conn->c_laddr, conn->c_faddr,
+				 &rm->m_inc, GFP_KERNEL);
+
+		rds_inc_put(&rm->m_inc);
+		rds_send_put_next_message(conn, rm, 1);
+	}
+
+	return 0;
+}
+
+/*
+ * We don't actually do anything to maintain connections so these are all
+ * nops.
+ *
+ * XXX worry about missing sends due to the send sem, then worry about
+ * not needing sending serialization at all.
+ */
+static void rds_loop_send_trigger(struct rds_connection *conn, long delay)
+{
+}
+static void rds_loop_conn_notice(struct rds_connection *conn)
+{
+}
+static int rds_loop_conn_alloc(struct rds_connection *conn, gfp_t gfp)
+{
+	return 0;
+}
+static void rds_loop_conn_free(void *data)
+{
+}
+static void rds_loop_conn_shutdown(struct rds_connection *conn)
+{
+}
+/*
+ * transport.c doesn't iterate over us so these shouldn't be called.
+ */
+static void rds_loop_listen_stop(void)
+{
+	BUG();
+}
+static int rds_loop_laddr_check(__le32 addr)
+{
+	BUG();
+	return 0;
+}
+
+/*
+ * We don't go through rds_recv_incoming() so none of these should be called.
+ */
+static void rds_loop_inc_merge(struct rds_incoming *dest,
+			       struct rds_incoming *src)
+{
+	BUG();
+}
+static void rds_loop_inc_process_acks(struct rds_connection *conn,
+				      struct rds_incoming *inc, u16 nr)
+{
+	BUG();
+}
+
+struct rds_transport rds_loop_transport = {
+	.laddr_check		= rds_loop_laddr_check,
+	.send			= rds_loop_send,
+	.send_trigger		= rds_loop_send_trigger,
+	.conn_notice		= rds_loop_conn_notice,
+	.conn_alloc		= rds_loop_conn_alloc,
+	.conn_free		= rds_loop_conn_free,
+	.conn_shutdown		= rds_loop_conn_shutdown,
+	.inc_copy_to_user	= rds_message_inc_copy_to_user,
+	.inc_free		= rds_message_inc_free,
+	.inc_merge		= rds_loop_inc_merge,
+	.inc_process_acks	= rds_loop_inc_process_acks,
+	.listen_stop		= rds_loop_listen_stop,
+	.ack_queue_inc		= rds_loop_ack_queue_inc,
+};

Added: trunk/linux/net/rds/loop.h
===================================================================
--- trunk/linux/net/rds/loop.h	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/loop.h	2006-05-25 22:44:12 UTC (rev 76)
@@ -0,0 +1,7 @@
+#ifndef _RDS_LOOP_H
+#define _RDS_LOOP_H 
+
+/* loop.c */
+extern struct rds_transport rds_loop_transport;
+
+#endif

Modified: trunk/linux/net/rds/message.c
===================================================================
--- trunk/linux/net/rds/message.c	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/message.c	2006-05-25 22:44:12 UTC (rev 76)
@@ -55,8 +55,14 @@
 	}
 }
 
+void rds_message_inc_free(struct rds_incoming *inc)
+{
+	struct rds_message *rm = container_of(inc, struct rds_message, m_inc);
+	rds_message_put(rm);
+}
+
 void rds_message_populate_headers(struct rds_message *rm, __be16 sport,
-				  __be16 dport, u64 seq)
+			          __be16 dport, u64 seq)
 {
 	struct rds_header *addr, *hdr;
 	struct page *page;
@@ -84,6 +90,14 @@
 			hdr->h_sequence = cpu_to_be64(seq);
 			seq++;
 
+			/*
+			 * Help loopback by copying the first header into
+			 * the inc's copy of the header while we have it
+			 * mapped and hot.
+			 */
+			if (left == rm->m_len)
+				rm->m_inc.i_hdr = *hdr;
+
 			left -= len;
 			if (left == 0)
 				break;
@@ -222,6 +236,54 @@
 	return rm;
 }
 
+int rds_message_inc_copy_to_user(struct rds_incoming *inc,
+				 struct iovec *first_iov, size_t size)
+{
+	struct rds_message *rm = container_of(inc, struct rds_message, m_inc);
+	struct iovec *iov = first_iov;
+	struct rds_vec *vec = rm->m_vecs;
+	unsigned long to_copy;
+	unsigned long iov_off = 0;
+	unsigned long vec_off = 0;
+	int ret = 0;
+
+	rm = container_of(inc, struct rds_message, m_inc);
+
+	while (ret < size && ret < rm->m_len) {
+		while (iov_off == iov->iov_len) {
+			iov_off = 0;
+			iov++;
+		}
+
+		to_copy = min(iov->iov_len - iov_off, vec->v_len - vec_off);
+		to_copy = min_t(size_t, to_copy, size - ret);
+		to_copy = min_t(unsigned long, to_copy, rm->m_len - ret);
+
+		/* XXX could look more like filemap_copy_from_user() */
+		ret = copy_to_user(iov->iov_base + iov_off,
+				   kmap(vec->v_page) + vec->v_off + vec_off, 
+				   to_copy);
+		kunmap(vec->v_page);
+		if (ret) {
+			ret = -EFAULT;
+			break;
+		}
+
+		iov_off += to_copy;
+		vec_off += to_copy;
+		ret += to_copy;
+		size -= to_copy;
+		if (size == 0)
+			break;
+		if (vec_off == vec->v_len) {
+			vec_off = 0;
+			vec++;
+		}
+	}
+
+	return ret;
+}
+
 /*
  * XXX Right now we send acks with a normal header and then full frag
  * payload full of header entries.  This lets the normal frag fx path

Modified: trunk/linux/net/rds/rds.h
===================================================================
--- trunk/linux/net/rds/rds.h	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/rds.h	2006-05-25 22:44:12 UTC (rev 76)
@@ -26,18 +26,6 @@
 	unsigned int		v_len;
 };
 
-/* XXX rename to something send specific? */
-struct rds_message {
-	atomic_t		m_refcount;
-	struct list_head	m_header_pages;
-	/* both _flow_item and _conn_item are on lists under conn->c_lock */
-	struct list_head	m_flow_item;
-	struct list_head	m_conn_item;
-	u32			m_len;
-	u64			m_seq;
-	unsigned long		m_nr_vecs;
-	struct rds_vec		m_vecs[0];
-};
 
 struct rds_connection {
 	struct hlist_node	c_hash_node;
@@ -113,9 +101,23 @@
 	struct rds_header	i_hdr;
 };
 
+struct rds_message {
+	atomic_t		m_refcount;
+	struct list_head	m_header_pages;
+	/* both _flow_item and _conn_item are on lists under conn->c_lock */
+	struct list_head	m_flow_item;
+	struct list_head	m_conn_item;
+	u32			m_len;
+	u64			m_seq;
+	/* XXX m_inc is only used for loopback. */
+	struct rds_incoming	m_inc;
+	unsigned long		m_nr_vecs;
+	struct rds_vec		m_vecs[0];
+};
+
 struct rds_transport {
 	int (*laddr_check)(__be32 addr);
-	void *(*conn_alloc)(gfp_t gfp);
+	int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp);
 	void (*conn_free)(void *data);
 	void (*conn_notice)(struct rds_connection *conn);
 	void (*conn_shutdown)(struct rds_connection *conn);
@@ -128,6 +130,7 @@
 	void (*inc_free)(struct rds_incoming *inc);
 	void (*inc_merge)(struct rds_incoming *dest, struct rds_incoming *src);
 	void (*listen_stop)(void);
+	void (*ack_queue_inc)(struct rds_incoming *inc, gfp_t gfp);
 };
 
 struct rds_sock {
@@ -150,6 +153,11 @@
 	__le16			rs_bound_port;
 
 	struct list_head	rs_flow_list;
+	/* 
+	 * This is only used to communicate the transport between bind and
+	 * initiating connections.  All other trans use is referenced through
+	 * the connection.
+	 */
 	struct rds_transport    *rs_transport;
 
 	/*
@@ -218,7 +226,10 @@
 					       unsigned long nr_frags,
 					       size_t total_len);
 void rds_message_populate_headers(struct rds_message *rm, __be16 sport,
-				  __be16 dport, __be64 sequence);
+			          __be16 dport, u64 seq);
+int rds_message_inc_copy_to_user(struct rds_incoming *inc,
+				 struct iovec *first_iov, size_t size);
+void rds_message_inc_free(struct rds_incoming *inc);
 struct rds_message *rds_message_alloc_ack(gfp_t gfp);
 void rds_message_addref(struct rds_message *rm);
 void rds_message_put(struct rds_message *rm);
@@ -226,9 +237,11 @@
 /* recv.c */
 void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn);
 void rds_inc_addref(struct rds_incoming *inc);
-void rds_inc_put(struct rds_transport *trans, struct rds_incoming *inc);
+void rds_inc_put(struct rds_incoming *inc);
 int rds_recv_incoming(struct rds_connection *conn, struct rds_incoming *inc,
 		      gfp_t gfp);
+void rds_recv_enqueue(struct rds_flow *caller_flow, __be32 saddr, __be32 daddr,
+		      struct rds_incoming *inc, gfp_t gfp);
 int rds_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
 		size_t size, int msg_flags);
 void rds_clear_recv_queue(struct rds_sock *rs);
@@ -238,6 +251,10 @@
 		size_t payload_len);
 int rds_send_get_next_vec(struct rds_connection *conn, struct rds_vec *vec);
 void rds_send_vec_done(struct rds_connection *conn, size_t bytes);
+int rds_send_get_next_message(struct rds_connection *conn,
+			      struct rds_message **rm);
+void rds_send_put_next_message(struct rds_connection *conn,
+			       struct rds_message *rm, int complete);
 
 /* sysctl.c */
 int __init rds_sysctl_init(void);

Modified: trunk/linux/net/rds/recv.c
===================================================================
--- trunk/linux/net/rds/recv.c	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/recv.c	2006-05-25 22:44:12 UTC (rev 76)
@@ -36,16 +36,64 @@
 	atomic_inc(&inc->i_refcount);
 }
 
-void rds_inc_put(struct rds_transport *trans, struct rds_incoming *inc)
+void rds_inc_put(struct rds_incoming *inc)
 {
 	pr_debug("put inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
 	if (atomic_dec_and_test(&inc->i_refcount)) {
 		BUG_ON(!list_empty(&inc->i_item));
 
-		trans->inc_free(inc);
+		inc->i_conn->c_trans->inc_free(inc);
 	}
 }
 
+/*
+ * This is called when a transport has a full message accumulated in
+ * 'inc'.  This can happen on behalf of the transport in rds_recv_incoming()
+ * as it reassembles.  Transports, like loopback, call this directly when
+ * they don't use rds_recv_incoming().
+ */
+void rds_recv_enqueue(struct rds_flow *caller_flow, __be32 saddr, __be32 daddr,
+		      struct rds_incoming *inc, gfp_t gfp)
+{
+	struct rds_flow *flow = caller_flow;
+	struct rds_sock *rs;
+	struct sock *sk;
+	unsigned long flags;
+	
+	if (flow == NULL) {
+		flow = rds_flow_create(daddr, inc->i_hdr.h_dport, saddr,
+				       inc->i_hdr.h_sport, gfp);
+		if (IS_ERR(flow)) {
+			flow = NULL;
+			goto out;
+		}
+	}
+
+	rs = flow->f_rs;
+	sk = rds_rs_to_sk(rs);
+
+	/* serialize with rds_release -> sock_orphan */
+	write_lock_irqsave(&sk->sk_callback_lock, flags);
+	if (!sock_flag(sk, SOCK_DEAD)) {
+		pr_debug("adding inc %p to rs %p's recv queue\n", inc, rs);
+		list_add_tail(&inc->i_item, &rs->rs_recv_queue);
+		rds_inc_addref(inc);
+		wake_up(sk->sk_sleep);
+		inc = NULL;  /* don't ack until it leaves the recv_queue */
+	}
+	write_unlock_irqrestore(&sk->sk_callback_lock, flags);
+
+out:
+	if (flow != caller_flow)
+		rds_flow_put(flow);
+	if (inc)
+		inc->i_conn->c_trans->ack_queue_inc(inc, gfp);
+}
+
+/*
+ * This shouldn't be called by loopback -- it assumes that 'faddr' on the
+ * conn is the sending address and that isn't the case for loopback.
+ */
 int rds_recv_incoming(struct rds_connection *conn, struct rds_incoming *inc,
 		      gfp_t gfp)
 {
@@ -151,28 +199,14 @@
 	 * flow.
 	 */
 	if (head_inc) {
-		struct rds_sock *rs = flow->f_rs;
-		struct sock *sk = rds_rs_to_sk(rs);
-
-		/* serialize with rds_release -> sock_orphan */
-		write_lock_irqsave(&sk->sk_callback_lock, flags);
-		if (!sock_flag(sk, SOCK_DEAD)) {
-			pr_debug("adding inc %p to rs %p's recv queue\n",
-				 head_inc, rs);
-			list_add_tail(&head_inc->i_item, &rs->rs_recv_queue);
-			wake_up(sk->sk_sleep);
-			head_inc = NULL;
-		}
-		write_unlock_irqrestore(&sk->sk_callback_lock, flags);
-		if (head_inc) {
-			rds_ack_queue_inc(head_inc, gfp);
-			rds_inc_put(rs->rs_transport, head_inc);
-		}
+		rds_recv_enqueue(flow, conn->c_faddr, conn->c_laddr,
+				 head_inc, gfp);
+		rds_inc_put(head_inc);
 	}
 
 out:
 	if (ack)
-		rds_ack_queue_inc(inc, gfp);
+		flow->f_conn->c_trans->ack_queue_inc(inc, gfp);
 	if (flow)
 		rds_flow_put(flow);
 	return ret;
@@ -213,13 +247,13 @@
 		ret = 1;
 		if (drop) {
 			list_del_init(&inc->i_item);
-			rds_inc_put(rs->rs_transport, inc);
+			rds_inc_put(inc);
 		}
 	}
 	write_unlock_irqrestore(&sk->sk_callback_lock, flags);
 
 	if (ret && drop)
-		rds_ack_queue_inc(inc, GFP_KERNEL);
+		inc->i_conn->c_trans->ack_queue_inc(inc, GFP_KERNEL);
 
 	pr_debug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
 	return ret;
@@ -263,8 +297,8 @@
 		}
 
 		pr_debug("copying inc %p to user\n", inc);
-		ret = rs->rs_transport->inc_copy_to_user(inc, msg->msg_iov,
-							 size);
+		ret = inc->i_conn->c_trans->inc_copy_to_user(inc, msg->msg_iov,
+							     size);
 		if (ret < 0)
 			break;
 
@@ -274,7 +308,7 @@
 		 * to get the next message.
 		 */
 		if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
-			rds_inc_put(rs->rs_transport, inc);
+			rds_inc_put(inc);
 			inc = NULL;
 			/* XXX inc some stat */;
 			continue;
@@ -292,7 +326,7 @@
 
 out:
 	if (inc)
-		rds_inc_put(rs->rs_transport, inc);
+		rds_inc_put(inc);
 	return ret;
 }
 
@@ -313,7 +347,7 @@
 
 	list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
 		list_del_init(&inc->i_item);
-		rds_ack_queue_inc(inc, GFP_KERNEL);
-		rds_inc_put(rs->rs_transport, inc);
+		inc->i_conn->c_trans->ack_queue_inc(inc, GFP_KERNEL);
+		rds_inc_put(inc);
 	}
 }

Modified: trunk/linux/net/rds/send.c
===================================================================
--- trunk/linux/net/rds/send.c	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/send.c	2006-05-25 22:44:12 UTC (rev 76)
@@ -24,6 +24,51 @@
 
 #include "rds.h"
 
+/*
+ * This gets a reference to the first message in the send queue and leaves
+ * it on the send queue.  This gives a transport's sending path a chance
+ * to process an entire message.
+ *
+ * rds_send_put_next_message() must be called to drop the reference that
+ * this function returns. 
+ */
+int rds_send_get_next_message(struct rds_connection *conn,
+			      struct rds_message **rm)
+{
+	unsigned long flags;
+	int ret = 0;
+
+	spin_lock_irqsave(&conn->c_lock, flags);
+	if (!list_empty(&conn->c_send_queue)) {
+		*rm = list_entry(conn->c_send_queue.next, struct rds_message,
+				m_conn_item);
+		rds_message_addref(*rm);
+		ret = 1;
+	}
+	spin_unlock_irqrestore(&conn->c_lock, flags);
+
+	return ret;
+}
+
+/*
+ * Called when a transport is done sending an entire message that was
+ * returned from rds_send_get_next_message().  If 'complete' is non-zero
+ * then the transport is indicating that it has reliably sent the message
+ * which should now be moved to the retransmit queue.
+ */
+void rds_send_put_next_message(struct rds_connection *conn,
+			       struct rds_message *rm, int complete)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&conn->c_lock, flags);
+	if (complete && !list_empty(&rm->m_conn_item))
+		list_move_tail(&rm->m_conn_item, &conn->c_retrans);
+	spin_unlock_irqrestore(&conn->c_lock, flags);
+
+	rds_message_put(rm);
+}
+
 static void rds_send_reset_vec(struct rds_connection *conn)
 {
 	struct rds_message *rm;
@@ -73,6 +118,9 @@
  * This is the transport's interface into the message stream.  It is
  * given the vec for the current region of the current fragment.  It
  * calls rds_send_vec_done() when it finishes with the vec it was given.
+ *
+ * XXX The sender has the send_sem, we might want to reduce the amount of
+ * lock traffic involved in iterating over the vecs in a message.
  */
 int rds_send_get_next_vec(struct rds_connection *conn, struct rds_vec *vec)
 {
@@ -278,7 +326,7 @@
 		goto out;
 	}
 
-	while (!rds_send_queue_rm(flow, rm, nr_frags, rs->rs_bound_port,
+	while (!rds_send_queue_rm(flow, rm, nr_frags, rs->rs_bound_port, 
 				  usin->sin_port, &queued)) {
 		/* XXX make sure this is reasonable */
 		if (payload_len > rs->rs_sndbuf) {
@@ -292,7 +340,7 @@
 
 		timeo = wait_event_interruptible_timeout(*sk->sk_sleep,
 					rds_send_queue_rm(flow, rm, nr_frags,
-						          rs->rs_bound_port,
+							  rs->rs_bound_port,
 							  usin->sin_port,
 							  &queued),
 					timeo);

Modified: trunk/linux/net/rds/tcp.c
===================================================================
--- trunk/linux/net/rds/tcp.c	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/tcp.c	2006-05-25 22:44:12 UTC (rev 76)
@@ -72,11 +72,14 @@
 	return 0;
 }
 
-static void *rds_tcp_conn_alloc(gfp_t gfp)
+static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 {
-	void *addr = kmem_cache_alloc(rds_tcp_conn_slab, gfp);
-	pr_debug("alloced tc %p\n", addr);
-	return addr;
+	conn->c_transport_data = kmem_cache_alloc(rds_tcp_conn_slab, gfp);
+	if (conn->c_transport_data == NULL)
+		return -ENOMEM;
+
+	pr_debug("alloced tc %p\n", conn->c_transport_data);
+	return 0;
 }
 static void rds_tcp_conn_free(void *tc)
 {
@@ -97,6 +100,7 @@
 	.inc_merge		= rds_tcp_inc_merge,
 	.inc_process_acks	= rds_tcp_inc_process_acks,
 	.listen_stop		= rds_tcp_listen_stop,
+	.ack_queue_inc		= rds_ack_queue_inc,
 };
 
 int __init rds_tcp_init(void)

Modified: trunk/linux/net/rds/tcp_connect.c
===================================================================
--- trunk/linux/net/rds/tcp_connect.c	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/tcp_connect.c	2006-05-25 22:44:12 UTC (rev 76)
@@ -166,7 +166,7 @@
 			up(&tc->t_conn->c_send_sem);
 
 		if (tc->t_tinc) {
-			rds_inc_put(&rds_tcp_transport, &tc->t_tinc->ti_inc);
+			rds_inc_put(&tc->t_tinc->ti_inc);
 			tc->t_tinc = NULL;
 		}
 		sock_release(sock);

Modified: trunk/linux/net/rds/tcp_recv.c
===================================================================
--- trunk/linux/net/rds/tcp_recv.c	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/tcp_recv.c	2006-05-25 22:44:12 UTC (rev 76)
@@ -228,7 +228,7 @@
 			tc->t_tinc_hdr_rem = sizeof(struct rds_header);
 			tc->t_tinc_data_rem = 0;
 			tc->t_tinc = NULL;
-			rds_inc_put(&rds_tcp_transport, &tinc->ti_inc);
+			rds_inc_put(&tinc->ti_inc);
 			tinc = NULL;
 		}
 	}

Modified: trunk/linux/net/rds/transport.c
===================================================================
--- trunk/linux/net/rds/transport.c	2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/transport.c	2006-05-25 22:44:12 UTC (rev 76)
@@ -19,6 +19,7 @@
 #include <linux/config.h>
 #include <linux/kernel.h>
 #include <linux/in.h>
+
 #include "rds.h"
 
 static struct rds_transport *transports[] = {




More information about the Rds-commits mailing list