[Rds-commits] zab commits r76 - in trunk: . linux/net/rds
svn-commits@oss.oracle.com
svn-commits at oss.oracle.com
Thu May 25 17:44:17 CDT 2006
Author: zab
Date: 2006-05-25 17:44:12 -0500 (Thu, 25 May 2006)
New Revision: 76
Added:
trunk/linux/net/rds/loop.c
trunk/linux/net/rds/loop.h
Modified:
trunk/TODO
trunk/linux/net/rds/Makefile
trunk/linux/net/rds/ack.c
trunk/linux/net/rds/connection.c
trunk/linux/net/rds/flow.c
trunk/linux/net/rds/message.c
trunk/linux/net/rds/rds.h
trunk/linux/net/rds/recv.c
trunk/linux/net/rds/send.c
trunk/linux/net/rds/tcp.c
trunk/linux/net/rds/tcp_connect.c
trunk/linux/net/rds/tcp_recv.c
trunk/linux/net/rds/transport.c
Log:
Add the loopback transport for local connections.
This lets us move messages directly from the send to the receive queue and call
directly into ack processing. This required a minimal send API extension to
let loopback get references to the next message to be sent as a whole rather
than one vec at a time.
The conn alloc API is changed a little to make it easier to return success
without doing anything. Also, check out that wild bug in rds_flow_create() as
it "checks" the return of rds_flow_create(). Get there.
The recv path is refactored a little so that loopback sending can call it to
queue messages in the receive queue.
With the introduction of loopback a connection's transport can differ from
the transport of the bound socket. This leads us to *only* use the socket's
transport to build up remote connections.
Loopback embeds an rds_incoming in rds_message and leans on the message
layer for a helper to copy the message to userspace.
Modified: trunk/TODO
===================================================================
--- trunk/TODO 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/TODO 2006-05-25 22:44:12 UTC (rev 76)
@@ -1,4 +1,6 @@
+clean up sending forward progress
+
oh, right, poll(). POLLIN does things, POLLOUT always returns true :/
connect() should be implemented, too. (don't forget getpeername)
Modified: trunk/linux/net/rds/Makefile
===================================================================
--- trunk/linux/net/rds/Makefile 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/Makefile 2006-05-25 22:44:12 UTC (rev 76)
@@ -2,4 +2,5 @@
rds-y := af_rds.o ack.o bind.o connection.o flow.o message.o \
recv.o send.o sysctl.o transport.o \
- tcp.o tcp_connect.o tcp_listen.o tcp_send.o tcp_recv.o
+ tcp.o tcp_connect.o tcp_listen.o tcp_send.o tcp_recv.o \
+ loop.o
Modified: trunk/linux/net/rds/ack.c
===================================================================
--- trunk/linux/net/rds/ack.c 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/ack.c 2006-05-25 22:44:12 UTC (rev 76)
@@ -161,7 +161,7 @@
rds_ack_send(conn);
list_del_init(&inc->i_item);
- rds_inc_put(conn->c_trans, inc);
+ rds_inc_put(inc);
}
Modified: trunk/linux/net/rds/connection.c
===================================================================
--- trunk/linux/net/rds/connection.c 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/connection.c 2006-05-25 22:44:12 UTC (rev 76)
@@ -24,6 +24,7 @@
#endif /* KERNEL_HAS_INET_HASHTABLES */
#include "rds.h"
+#include "loop.h"
/* XXX export some stats for this */
#define RDS_CONNECTION_HASH_BITS 12
@@ -73,6 +74,7 @@
struct rds_connection *conn, *tmp;
struct hlist_head *head = rds_conn_bucket(laddr, faddr);
unsigned long flags;
+ int ret;
spin_lock_irqsave(&rds_conn_lock, flags);
conn = rds_conn_lookup(head, laddr, faddr);
@@ -107,11 +109,20 @@
conn->c_ack_rm = NULL;
setup_timer(&conn->c_ack_timer, rds_ack_timer, (unsigned long)conn);
+ /*
+ * This is where a connection becomes loopback. If *any* RDS sockets
+ * can bind to the destination address then we'd rather the messages
+ * flow through loopback rather than either transport.
+ */
+ if (rds_trans_get_preferred(faddr))
+ trans = &rds_loop_transport;
+
conn->c_trans = trans;
- conn->c_transport_data = trans->conn_alloc(gfp);
- if (conn->c_transport_data == NULL) {
+
+ ret = trans->conn_alloc(conn, gfp);
+ if (ret) {
kmem_cache_free(rds_conn_slab, conn);
- conn = ERR_PTR(-ENOMEM);
+ conn = ERR_PTR(ret);
goto out;
}
@@ -179,7 +190,7 @@
&conn->c_incs_for_acks,
i_item) {
list_del_init(&inc->i_item);
- rds_inc_put(conn->c_trans, inc);
+ rds_inc_put(inc);
}
if (conn->c_ack_rm)
rds_message_put(conn->c_ack_rm);
Modified: trunk/linux/net/rds/flow.c
===================================================================
--- trunk/linux/net/rds/flow.c 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/flow.c 2006-05-25 22:44:12 UTC (rev 76)
@@ -91,7 +91,6 @@
struct rds_flow *rds_flow_create(__le32 laddr, __le16 lport, __le32 faddr,
__le16 fport, gfp_t gfp)
{
- struct dst_entry *dst = NULL;
struct rds_flow *flow, *tmp;
struct rds_sock *rs = NULL;
struct rds_connection *conn = NULL;
@@ -110,8 +109,8 @@
}
conn = rds_conn_create(laddr, faddr, rs->rs_transport, gfp);
- if (IS_ERR(dst)) {
- ret = PTR_ERR(dst);
+ if (IS_ERR(conn)) {
+ ret = PTR_ERR(conn);
goto out;
}
@@ -164,13 +163,17 @@
return flow;
}
+/*
+ * Since this calls ack_queue_inc it needs to not be called under locks.
+ */
void rds_flow_put(struct rds_flow *flow)
{
pr_debug("put flow %p ref %d\n", flow, atomic_read(&flow->f_refcount));
if (atomic_dec_and_test(&flow->f_refcount)) {
if (flow->f_inc) {
- rds_ack_queue_inc(flow->f_inc, GFP_ATOMIC);
- rds_inc_put(flow->f_conn->c_trans, flow->f_inc);
+ flow->f_conn->c_trans->ack_queue_inc(flow->f_inc,
+ GFP_ATOMIC);
+ rds_inc_put(flow->f_inc);
}
if (flow->f_rs)
rds_sock_put(flow->f_rs);
Added: trunk/linux/net/rds/loop.c
===================================================================
--- trunk/linux/net/rds/loop.c 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/loop.c 2006-05-25 22:44:12 UTC (rev 76)
@@ -0,0 +1,148 @@
+/*
+ * Copyright (C) 2006 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+#include <linux/config.h>
+#include <linux/kernel.h>
+#include <linux/in.h>
+
+#include "rds.h"
+
+/*
+ * This 'loopback' transport is a special case for flows that originate
+ * and terminate on the same machine.
+ *
+ * Connection build-up notices if the destination address is thought of
+ * as a local address by a transport. At that time it decides to use the
+ * loopback transport instead of the bound transport of the sending socket.
+ *
+ * The loopback transport's sending path just hands the sent rds_message
+ * straight to the receiving path via an embedded rds_incoming. Once it
+ * is consumed the rds_message ref is dropped and the ack processing paths
+ * are called directly.
+ */
+
+/*
+ * we don't have to marshal the ack through messages, we can call the
+ * ack processing path directly.
+ */
+static void rds_loop_ack_queue_inc(struct rds_incoming *inc, gfp_t gfp)
+{
+ struct rds_ack_entry ent = {
+ .e_sport = inc->i_hdr.h_sport,
+ .e_dport = inc->i_hdr.h_dport,
+ .e_seq = inc->i_hdr.h_sequence,
+ };
+ rds_ack_process(inc->i_conn, &ent);
+}
+
+/*
+ * rds_message_populate_headers() has initialized the inc's i_hdr to match
+ * the first header of the message which rds_recv_enqueue, etc, will use
+ * to generate acks.
+ *
+ * Usually a message transit both the sender and receiver's conns as it
+ * flows to the receiver. In the loopback case, though, the receive path
+ * is handed the sending conn so the sense of the addresses is reversed.
+ * rds_recv_incoming() uses conn->c_faddr as the sending address but
+ * we use conn->c_laddr as the sending address.
+ */
+static int rds_loop_send(struct rds_connection *conn)
+{
+ struct rds_message *rm;
+ int ret;
+
+ while ((ret = rds_send_get_next_message(conn, &rm)) > 0) {
+
+ rds_inc_init(&rm->m_inc, conn);
+ rds_message_addref(rm); /* for the inc */
+
+ rds_recv_enqueue(NULL, conn->c_laddr, conn->c_faddr,
+ &rm->m_inc, GFP_KERNEL);
+
+ rds_inc_put(&rm->m_inc);
+ rds_send_put_next_message(conn, rm, 1);
+ }
+
+ return 0;
+}
+
+/*
+ * We don't actually do anything to maintain connections so these are all
+ * nops.
+ *
+ * XXX worry about missing sends due to the send sem, then worry about
+ * not needing sending serialization at all.
+ */
+static void rds_loop_send_trigger(struct rds_connection *conn, long delay)
+{
+}
+static void rds_loop_conn_notice(struct rds_connection *conn)
+{
+}
+static int rds_loop_conn_alloc(struct rds_connection *conn, gfp_t gfp)
+{
+ return 0;
+}
+static void rds_loop_conn_free(void *data)
+{
+}
+static void rds_loop_conn_shutdown(struct rds_connection *conn)
+{
+}
+/*
+ * transport.c doesn't iterate over us so these shouldn't be called.
+ */
+static void rds_loop_listen_stop(void)
+{
+ BUG();
+}
+static int rds_loop_laddr_check(__le32 addr)
+{
+ BUG();
+ return 0;
+}
+
+/*
+ * We don't go through rds_recv_incoming() so none of these should be called.
+ */
+static void rds_loop_inc_merge(struct rds_incoming *dest,
+ struct rds_incoming *src)
+{
+ BUG();
+}
+static void rds_loop_inc_process_acks(struct rds_connection *conn,
+ struct rds_incoming *inc, u16 nr)
+{
+ BUG();
+}
+
+struct rds_transport rds_loop_transport = {
+ .laddr_check = rds_loop_laddr_check,
+ .send = rds_loop_send,
+ .send_trigger = rds_loop_send_trigger,
+ .conn_notice = rds_loop_conn_notice,
+ .conn_alloc = rds_loop_conn_alloc,
+ .conn_free = rds_loop_conn_free,
+ .conn_shutdown = rds_loop_conn_shutdown,
+ .inc_copy_to_user = rds_message_inc_copy_to_user,
+ .inc_free = rds_message_inc_free,
+ .inc_merge = rds_loop_inc_merge,
+ .inc_process_acks = rds_loop_inc_process_acks,
+ .listen_stop = rds_loop_listen_stop,
+ .ack_queue_inc = rds_loop_ack_queue_inc,
+};
Added: trunk/linux/net/rds/loop.h
===================================================================
--- trunk/linux/net/rds/loop.h 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/loop.h 2006-05-25 22:44:12 UTC (rev 76)
@@ -0,0 +1,7 @@
+#ifndef _RDS_LOOP_H
+#define _RDS_LOOP_H
+
+/* loop.c */
+extern struct rds_transport rds_loop_transport;
+
+#endif
Modified: trunk/linux/net/rds/message.c
===================================================================
--- trunk/linux/net/rds/message.c 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/message.c 2006-05-25 22:44:12 UTC (rev 76)
@@ -55,8 +55,14 @@
}
}
+void rds_message_inc_free(struct rds_incoming *inc)
+{
+ struct rds_message *rm = container_of(inc, struct rds_message, m_inc);
+ rds_message_put(rm);
+}
+
void rds_message_populate_headers(struct rds_message *rm, __be16 sport,
- __be16 dport, u64 seq)
+ __be16 dport, u64 seq)
{
struct rds_header *addr, *hdr;
struct page *page;
@@ -84,6 +90,14 @@
hdr->h_sequence = cpu_to_be64(seq);
seq++;
+ /*
+ * Help loopback by copying the first header into
+ * the inc's copy of the header while we have it
+ * mapped and hot.
+ */
+ if (left == rm->m_len)
+ rm->m_inc.i_hdr = *hdr;
+
left -= len;
if (left == 0)
break;
@@ -222,6 +236,54 @@
return rm;
}
+int rds_message_inc_copy_to_user(struct rds_incoming *inc,
+ struct iovec *first_iov, size_t size)
+{
+ struct rds_message *rm = container_of(inc, struct rds_message, m_inc);
+ struct iovec *iov = first_iov;
+ struct rds_vec *vec = rm->m_vecs;
+ unsigned long to_copy;
+ unsigned long iov_off = 0;
+ unsigned long vec_off = 0;
+ int ret = 0;
+
+ rm = container_of(inc, struct rds_message, m_inc);
+
+ while (ret < size && ret < rm->m_len) {
+ while (iov_off == iov->iov_len) {
+ iov_off = 0;
+ iov++;
+ }
+
+ to_copy = min(iov->iov_len - iov_off, vec->v_len - vec_off);
+ to_copy = min_t(size_t, to_copy, size - ret);
+ to_copy = min_t(unsigned long, to_copy, rm->m_len - ret);
+
+ /* XXX could look more like filemap_copy_from_user() */
+ ret = copy_to_user(iov->iov_base + iov_off,
+ kmap(vec->v_page) + vec->v_off + vec_off,
+ to_copy);
+ kunmap(vec->v_page);
+ if (ret) {
+ ret = -EFAULT;
+ break;
+ }
+
+ iov_off += to_copy;
+ vec_off += to_copy;
+ ret += to_copy;
+ size -= to_copy;
+ if (size == 0)
+ break;
+ if (vec_off == vec->v_len) {
+ vec_off = 0;
+ vec++;
+ }
+ }
+
+ return ret;
+}
+
/*
* XXX Right now we send acks with a normal header and then full frag
* payload full of header entries. This lets the normal frag fx path
Modified: trunk/linux/net/rds/rds.h
===================================================================
--- trunk/linux/net/rds/rds.h 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/rds.h 2006-05-25 22:44:12 UTC (rev 76)
@@ -26,18 +26,6 @@
unsigned int v_len;
};
-/* XXX rename to something send specific? */
-struct rds_message {
- atomic_t m_refcount;
- struct list_head m_header_pages;
- /* both _flow_item and _conn_item are on lists under conn->c_lock */
- struct list_head m_flow_item;
- struct list_head m_conn_item;
- u32 m_len;
- u64 m_seq;
- unsigned long m_nr_vecs;
- struct rds_vec m_vecs[0];
-};
struct rds_connection {
struct hlist_node c_hash_node;
@@ -113,9 +101,23 @@
struct rds_header i_hdr;
};
+struct rds_message {
+ atomic_t m_refcount;
+ struct list_head m_header_pages;
+ /* both _flow_item and _conn_item are on lists under conn->c_lock */
+ struct list_head m_flow_item;
+ struct list_head m_conn_item;
+ u32 m_len;
+ u64 m_seq;
+ /* XXX m_inc is only used for loopback. */
+ struct rds_incoming m_inc;
+ unsigned long m_nr_vecs;
+ struct rds_vec m_vecs[0];
+};
+
struct rds_transport {
int (*laddr_check)(__be32 addr);
- void *(*conn_alloc)(gfp_t gfp);
+ int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp);
void (*conn_free)(void *data);
void (*conn_notice)(struct rds_connection *conn);
void (*conn_shutdown)(struct rds_connection *conn);
@@ -128,6 +130,7 @@
void (*inc_free)(struct rds_incoming *inc);
void (*inc_merge)(struct rds_incoming *dest, struct rds_incoming *src);
void (*listen_stop)(void);
+ void (*ack_queue_inc)(struct rds_incoming *inc, gfp_t gfp);
};
struct rds_sock {
@@ -150,6 +153,11 @@
__le16 rs_bound_port;
struct list_head rs_flow_list;
+ /*
+ * This is only used to communicate the transport between bind and
+ * initiating connections. All other trans use is referenced through
+ * the connection.
+ */
struct rds_transport *rs_transport;
/*
@@ -218,7 +226,10 @@
unsigned long nr_frags,
size_t total_len);
void rds_message_populate_headers(struct rds_message *rm, __be16 sport,
- __be16 dport, __be64 sequence);
+ __be16 dport, u64 seq);
+int rds_message_inc_copy_to_user(struct rds_incoming *inc,
+ struct iovec *first_iov, size_t size);
+void rds_message_inc_free(struct rds_incoming *inc);
struct rds_message *rds_message_alloc_ack(gfp_t gfp);
void rds_message_addref(struct rds_message *rm);
void rds_message_put(struct rds_message *rm);
@@ -226,9 +237,11 @@
/* recv.c */
void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn);
void rds_inc_addref(struct rds_incoming *inc);
-void rds_inc_put(struct rds_transport *trans, struct rds_incoming *inc);
+void rds_inc_put(struct rds_incoming *inc);
int rds_recv_incoming(struct rds_connection *conn, struct rds_incoming *inc,
gfp_t gfp);
+void rds_recv_enqueue(struct rds_flow *caller_flow, __be32 saddr, __be32 daddr,
+ struct rds_incoming *inc, gfp_t gfp);
int rds_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
size_t size, int msg_flags);
void rds_clear_recv_queue(struct rds_sock *rs);
@@ -238,6 +251,10 @@
size_t payload_len);
int rds_send_get_next_vec(struct rds_connection *conn, struct rds_vec *vec);
void rds_send_vec_done(struct rds_connection *conn, size_t bytes);
+int rds_send_get_next_message(struct rds_connection *conn,
+ struct rds_message **rm);
+void rds_send_put_next_message(struct rds_connection *conn,
+ struct rds_message *rm, int complete);
/* sysctl.c */
int __init rds_sysctl_init(void);
Modified: trunk/linux/net/rds/recv.c
===================================================================
--- trunk/linux/net/rds/recv.c 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/recv.c 2006-05-25 22:44:12 UTC (rev 76)
@@ -36,16 +36,64 @@
atomic_inc(&inc->i_refcount);
}
-void rds_inc_put(struct rds_transport *trans, struct rds_incoming *inc)
+void rds_inc_put(struct rds_incoming *inc)
{
pr_debug("put inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
if (atomic_dec_and_test(&inc->i_refcount)) {
BUG_ON(!list_empty(&inc->i_item));
- trans->inc_free(inc);
+ inc->i_conn->c_trans->inc_free(inc);
}
}
+/*
+ * This is called when a transport has a full message accumulated in
+ * 'inc'. This can happen on behalf of the transport in rds_recv_incoming()
+ * as it reassembles. Transports, like loopback, call this directly when
+ * they don't use rds_recv_incoming().
+ */
+void rds_recv_enqueue(struct rds_flow *caller_flow, __be32 saddr, __be32 daddr,
+ struct rds_incoming *inc, gfp_t gfp)
+{
+ struct rds_flow *flow = caller_flow;
+ struct rds_sock *rs;
+ struct sock *sk;
+ unsigned long flags;
+
+ if (flow == NULL) {
+ flow = rds_flow_create(daddr, inc->i_hdr.h_dport, saddr,
+ inc->i_hdr.h_sport, gfp);
+ if (IS_ERR(flow)) {
+ flow = NULL;
+ goto out;
+ }
+ }
+
+ rs = flow->f_rs;
+ sk = rds_rs_to_sk(rs);
+
+ /* serialize with rds_release -> sock_orphan */
+ write_lock_irqsave(&sk->sk_callback_lock, flags);
+ if (!sock_flag(sk, SOCK_DEAD)) {
+ pr_debug("adding inc %p to rs %p's recv queue\n", inc, rs);
+ list_add_tail(&inc->i_item, &rs->rs_recv_queue);
+ rds_inc_addref(inc);
+ wake_up(sk->sk_sleep);
+ inc = NULL; /* don't ack until it leaves the recv_queue */
+ }
+ write_unlock_irqrestore(&sk->sk_callback_lock, flags);
+
+out:
+ if (flow != caller_flow)
+ rds_flow_put(flow);
+ if (inc)
+ inc->i_conn->c_trans->ack_queue_inc(inc, gfp);
+}
+
+/*
+ * This shouldn't be called by loopback -- it assumes that 'faddr' on the
+ * conn is the sending address and that isn't the case for loopback.
+ */
int rds_recv_incoming(struct rds_connection *conn, struct rds_incoming *inc,
gfp_t gfp)
{
@@ -151,28 +199,14 @@
* flow.
*/
if (head_inc) {
- struct rds_sock *rs = flow->f_rs;
- struct sock *sk = rds_rs_to_sk(rs);
-
- /* serialize with rds_release -> sock_orphan */
- write_lock_irqsave(&sk->sk_callback_lock, flags);
- if (!sock_flag(sk, SOCK_DEAD)) {
- pr_debug("adding inc %p to rs %p's recv queue\n",
- head_inc, rs);
- list_add_tail(&head_inc->i_item, &rs->rs_recv_queue);
- wake_up(sk->sk_sleep);
- head_inc = NULL;
- }
- write_unlock_irqrestore(&sk->sk_callback_lock, flags);
- if (head_inc) {
- rds_ack_queue_inc(head_inc, gfp);
- rds_inc_put(rs->rs_transport, head_inc);
- }
+ rds_recv_enqueue(flow, conn->c_faddr, conn->c_laddr,
+ head_inc, gfp);
+ rds_inc_put(head_inc);
}
out:
if (ack)
- rds_ack_queue_inc(inc, gfp);
+ flow->f_conn->c_trans->ack_queue_inc(inc, gfp);
if (flow)
rds_flow_put(flow);
return ret;
@@ -213,13 +247,13 @@
ret = 1;
if (drop) {
list_del_init(&inc->i_item);
- rds_inc_put(rs->rs_transport, inc);
+ rds_inc_put(inc);
}
}
write_unlock_irqrestore(&sk->sk_callback_lock, flags);
if (ret && drop)
- rds_ack_queue_inc(inc, GFP_KERNEL);
+ inc->i_conn->c_trans->ack_queue_inc(inc, GFP_KERNEL);
pr_debug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
return ret;
@@ -263,8 +297,8 @@
}
pr_debug("copying inc %p to user\n", inc);
- ret = rs->rs_transport->inc_copy_to_user(inc, msg->msg_iov,
- size);
+ ret = inc->i_conn->c_trans->inc_copy_to_user(inc, msg->msg_iov,
+ size);
if (ret < 0)
break;
@@ -274,7 +308,7 @@
* to get the next message.
*/
if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
- rds_inc_put(rs->rs_transport, inc);
+ rds_inc_put(inc);
inc = NULL;
/* XXX inc some stat */;
continue;
@@ -292,7 +326,7 @@
out:
if (inc)
- rds_inc_put(rs->rs_transport, inc);
+ rds_inc_put(inc);
return ret;
}
@@ -313,7 +347,7 @@
list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
list_del_init(&inc->i_item);
- rds_ack_queue_inc(inc, GFP_KERNEL);
- rds_inc_put(rs->rs_transport, inc);
+ inc->i_conn->c_trans->ack_queue_inc(inc, GFP_KERNEL);
+ rds_inc_put(inc);
}
}
Modified: trunk/linux/net/rds/send.c
===================================================================
--- trunk/linux/net/rds/send.c 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/send.c 2006-05-25 22:44:12 UTC (rev 76)
@@ -24,6 +24,51 @@
#include "rds.h"
+/*
+ * This gets a reference to the first message in the send queue and leaves
+ * it on the send queue. This gives a transport's sending path a chance
+ * to process an entire message.
+ *
+ * rds_send_put_next_message() must be called to drop the reference that
+ * this function returns.
+ */
+int rds_send_get_next_message(struct rds_connection *conn,
+ struct rds_message **rm)
+{
+ unsigned long flags;
+ int ret = 0;
+
+ spin_lock_irqsave(&conn->c_lock, flags);
+ if (!list_empty(&conn->c_send_queue)) {
+ *rm = list_entry(conn->c_send_queue.next, struct rds_message,
+ m_conn_item);
+ rds_message_addref(*rm);
+ ret = 1;
+ }
+ spin_unlock_irqrestore(&conn->c_lock, flags);
+
+ return ret;
+}
+
+/*
+ * Called when a transport is done sending an entire message that was
+ * returned from rds_send_get_next_message(). If 'complete' is non-zero
+ * then the transport is indicating that it has reliably sent the message
+ * which should now be moved to the retransmit queue.
+ */
+void rds_send_put_next_message(struct rds_connection *conn,
+ struct rds_message *rm, int complete)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&conn->c_lock, flags);
+ if (complete && !list_empty(&rm->m_conn_item))
+ list_move_tail(&rm->m_conn_item, &conn->c_retrans);
+ spin_unlock_irqrestore(&conn->c_lock, flags);
+
+ rds_message_put(rm);
+}
+
static void rds_send_reset_vec(struct rds_connection *conn)
{
struct rds_message *rm;
@@ -73,6 +118,9 @@
* This is the transport's interface into the message stream. It is
* given the vec for the current region of the current fragment. It
* calls rds_send_vec_done() when it finishes with the vec it was given.
+ *
+ * XXX The sender has the send_sem, we might want to reduce the amount of
+ * lock traffic involved in iterating over the vecs in a message.
*/
int rds_send_get_next_vec(struct rds_connection *conn, struct rds_vec *vec)
{
@@ -278,7 +326,7 @@
goto out;
}
- while (!rds_send_queue_rm(flow, rm, nr_frags, rs->rs_bound_port,
+ while (!rds_send_queue_rm(flow, rm, nr_frags, rs->rs_bound_port,
usin->sin_port, &queued)) {
/* XXX make sure this is reasonable */
if (payload_len > rs->rs_sndbuf) {
@@ -292,7 +340,7 @@
timeo = wait_event_interruptible_timeout(*sk->sk_sleep,
rds_send_queue_rm(flow, rm, nr_frags,
- rs->rs_bound_port,
+ rs->rs_bound_port,
usin->sin_port,
&queued),
timeo);
Modified: trunk/linux/net/rds/tcp.c
===================================================================
--- trunk/linux/net/rds/tcp.c 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/tcp.c 2006-05-25 22:44:12 UTC (rev 76)
@@ -72,11 +72,14 @@
return 0;
}
-static void *rds_tcp_conn_alloc(gfp_t gfp)
+static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
{
- void *addr = kmem_cache_alloc(rds_tcp_conn_slab, gfp);
- pr_debug("alloced tc %p\n", addr);
- return addr;
+ conn->c_transport_data = kmem_cache_alloc(rds_tcp_conn_slab, gfp);
+ if (conn->c_transport_data == NULL)
+ return -ENOMEM;
+
+ pr_debug("alloced tc %p\n", conn->c_transport_data);
+ return 0;
}
static void rds_tcp_conn_free(void *tc)
{
@@ -97,6 +100,7 @@
.inc_merge = rds_tcp_inc_merge,
.inc_process_acks = rds_tcp_inc_process_acks,
.listen_stop = rds_tcp_listen_stop,
+ .ack_queue_inc = rds_ack_queue_inc,
};
int __init rds_tcp_init(void)
Modified: trunk/linux/net/rds/tcp_connect.c
===================================================================
--- trunk/linux/net/rds/tcp_connect.c 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/tcp_connect.c 2006-05-25 22:44:12 UTC (rev 76)
@@ -166,7 +166,7 @@
up(&tc->t_conn->c_send_sem);
if (tc->t_tinc) {
- rds_inc_put(&rds_tcp_transport, &tc->t_tinc->ti_inc);
+ rds_inc_put(&tc->t_tinc->ti_inc);
tc->t_tinc = NULL;
}
sock_release(sock);
Modified: trunk/linux/net/rds/tcp_recv.c
===================================================================
--- trunk/linux/net/rds/tcp_recv.c 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/tcp_recv.c 2006-05-25 22:44:12 UTC (rev 76)
@@ -228,7 +228,7 @@
tc->t_tinc_hdr_rem = sizeof(struct rds_header);
tc->t_tinc_data_rem = 0;
tc->t_tinc = NULL;
- rds_inc_put(&rds_tcp_transport, &tinc->ti_inc);
+ rds_inc_put(&tinc->ti_inc);
tinc = NULL;
}
}
Modified: trunk/linux/net/rds/transport.c
===================================================================
--- trunk/linux/net/rds/transport.c 2006-05-24 23:42:58 UTC (rev 75)
+++ trunk/linux/net/rds/transport.c 2006-05-25 22:44:12 UTC (rev 76)
@@ -19,6 +19,7 @@
#include <linux/config.h>
#include <linux/kernel.h>
#include <linux/in.h>
+
#include "rds.h"
static struct rds_transport *transports[] = {
More information about the Rds-commits
mailing list