[rds-commits] zab commits r81 - in trunk: . linux/net/rds

svn-commits@oss.oracle.com svn-commits at oss.oracle.com
Tue May 30 13:13:01 CDT 2006


Author: zab
Date: 2006-05-30 13:12:57 -0500 (Tue, 30 May 2006)
New Revision: 81

Modified:
   trunk/TODO
   trunk/linux/net/rds/connection.c
   trunk/linux/net/rds/flow.c
   trunk/linux/net/rds/loop.c
   trunk/linux/net/rds/rds.h
   trunk/linux/net/rds/recv.c
   trunk/linux/net/rds/send.c
   trunk/linux/net/rds/tcp_connect.c
   trunk/linux/net/rds/tcp_listen.c
   trunk/linux/net/rds/tcp_recv.c
   trunk/linux/net/rds/tcp_send.c
Log:
Support connection reestablishment.

TCP learns to drop a connection on fatal send failures.  When TCP drops an
errored socket it resets its message and header demuxing state in the
rds_tcp_connection and calls into the core connection reset helper.

The core connection reset path resets the tx and rx demuxing and purges any
acks that were queued on the connection.  The sender will resend messages that
are still relevant and we'll regenerate acks again.

The previous method of reassembling messages in the flows would have required
dealing with partially received messages on connection reset.  Instead of
adding that complexity we simplify by having the receiving side match the
sending side in serializing messages in a connection.  We trade off simplicity
and complexity for what is believed to be an acceptable latency hit of
requiring small messages to wait for large ones to transit the connection in
their entirety.

Reassembling in the connection rids the rx path of flows entirely.  The rx path
touches less cache and acquires fewer locks and the flows get smaller.


Modified: trunk/TODO
===================================================================
--- trunk/TODO	2006-05-26 20:48:57 UTC (rev 80)
+++ trunk/TODO	2006-05-30 18:12:57 UTC (rev 81)
@@ -13,7 +13,8 @@
 
 If message headers had more info on their position in the message then
 we could detect framing errors on the receiver more aggressively.  and
-we probably should.
+we probably should.  While we're at it, this could clean up the goofy
+disconnect between message and fragment sequence numbers.
 
 throw together a man page
 	- document that recvmsg with a small size truncates

Modified: trunk/linux/net/rds/connection.c
===================================================================
--- trunk/linux/net/rds/connection.c	2006-05-26 20:48:57 UTC (rev 80)
+++ trunk/linux/net/rds/connection.c	2006-05-30 18:12:57 UTC (rev 81)
@@ -60,7 +60,46 @@
 	return ret;
 }
 
+void rds_conn_ack_teardown(struct rds_connection *conn)
+{
+	struct rds_incoming *inc, *itmp;
+
+	list_for_each_entry_safe(inc, itmp, &conn->c_incs_for_acks, i_item) {
+		list_del_init(&inc->i_item);
+		rds_inc_put(inc);
+	}
+	if (conn->c_ack_rm) {
+		rds_message_put(conn->c_ack_rm);
+		conn->c_ack_rm = NULL;
+	}
+}
+
 /*
+ * This is called by transports as they're bringing down a connection.
+ * It clears partial message state so that the transport can start sending
+ * and receiving over this connection again in the future.  It is up to
+ * the transport to have serialized this call with its send and recv.
+ */
+void rds_conn_reset(struct rds_connection *conn)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&conn->c_lock, flags);
+
+	rds_send_reset_vec(conn);
+	list_splice_init(&conn->c_retrans, &conn->c_send_queue);
+	rds_conn_ack_teardown(conn);
+
+	spin_unlock_irqrestore(&conn->c_lock, flags);
+
+	if (conn->c_rx_inc) {
+		rds_inc_put(conn->c_rx_inc);
+		conn->c_rx_inc = NULL;
+	}
+	conn->c_next_rx_seq = 0;
+}
+
+/*
  * There is only every one 'conn' for a given pair of addresses in the
  * system at a time.  They contain messages to be retransmitted and so
  * span the lifetime of the actual underlying transport connections.
@@ -105,6 +144,9 @@
 	INIT_LIST_HEAD(&conn->c_send_queue);
 	INIT_LIST_HEAD(&conn->c_retrans);
 
+	conn->c_next_rx_seq = 0;
+	conn->c_rx_inc = NULL;
+
 	INIT_LIST_HEAD(&conn->c_incs_for_acks);
 	conn->c_ack_rm = NULL;
 	setup_timer(&conn->c_ack_timer, rds_ack_timer, (unsigned long)conn);
@@ -160,7 +202,6 @@
 	struct hlist_head *head;
 	struct hlist_node *pos, *tmp;
 	struct rds_connection *conn;
-	struct rds_incoming *inc, *itmp;
 	struct rds_message *rm, *rtmp;
 	size_t i;
 
@@ -185,15 +226,7 @@
 			if (conn->c_cur_msg)
 				rds_message_put(conn->c_cur_msg);
 
-			/* tear down acks being built */
-			list_for_each_entry_safe(inc, itmp,
-						 &conn->c_incs_for_acks,
-						 i_item) {
-				list_del_init(&inc->i_item);
-				rds_inc_put(inc);
-			}
-			if (conn->c_ack_rm)
-				rds_message_put(conn->c_ack_rm);
+			rds_conn_ack_teardown(conn);
 
 			conn->c_trans->conn_free(conn->c_transport_data);
 

Modified: trunk/linux/net/rds/flow.c
===================================================================
--- trunk/linux/net/rds/flow.c	2006-05-26 20:48:57 UTC (rev 80)
+++ trunk/linux/net/rds/flow.c	2006-05-30 18:12:57 UTC (rev 81)
@@ -124,15 +124,12 @@
 	flow->f_faddr = faddr;
 	flow->f_lport = lport;
 	flow->f_fport = fport;
-	spin_lock_init(&flow->f_lock);
 	/* f_conn doesn't hold a ref, conns are permanent */
 	flow->f_conn = conn;
 	flow->f_rs = rs;
 	rs = NULL;
 	INIT_LIST_HEAD(&flow->f_message_list);
 	flow->f_message_bytes = 0;
-	flow->f_inc = NULL;
-	flow->f_next_rx_seq = 0;
 
 	pr_debug("allocated flow %p for %u.%u.%u.%u:%u -> %u.%u.%u.%u:%u\n",
 		 flow, NIPQUAD(laddr), ntohs(lport), NIPQUAD(faddr),
@@ -170,10 +167,6 @@
 {
 	pr_debug("put flow %p ref %d\n", flow, atomic_read(&flow->f_refcount));
 	if (atomic_dec_and_test(&flow->f_refcount)) {
-		if (flow->f_inc) {
-			rds_inc_ack(flow->f_inc, GFP_ATOMIC);
-			rds_inc_put(flow->f_inc);
-		}
 		if (flow->f_rs)
 			rds_sock_put(flow->f_rs);
 		BUG_ON(!hlist_unhashed(&flow->f_hash_node));

Modified: trunk/linux/net/rds/loop.c
===================================================================
--- trunk/linux/net/rds/loop.c	2006-05-26 20:48:57 UTC (rev 80)
+++ trunk/linux/net/rds/loop.c	2006-05-30 18:12:57 UTC (rev 81)
@@ -68,10 +68,16 @@
 
 	while ((ret = rds_send_get_next_message(conn, &rm)) > 0) {
 
+		/* 
+		 * XXX we fake out the header flags so that the rx path
+		 * just delivers the message.. this should be fixed up
+		 * as part of the minor header refactoring.
+		 */
+		rm->m_inc.i_hdr.h_flags = RDS_HEAD_FLAG_EOM; /* !ack, !frag */
 		rds_inc_init(&rm->m_inc, conn);
 		rds_message_addref(rm); /* for the inc */
 
-		rds_recv_enqueue(NULL, conn->c_laddr, conn->c_faddr,
+		rds_recv_incoming(conn, conn->c_laddr, conn->c_faddr,
 				 &rm->m_inc, GFP_KERNEL);
 
 		rds_inc_put(&rm->m_inc);

Modified: trunk/linux/net/rds/rds.h
===================================================================
--- trunk/linux/net/rds/rds.h	2006-05-26 20:48:57 UTC (rev 80)
+++ trunk/linux/net/rds/rds.h	2006-05-30 18:12:57 UTC (rev 81)
@@ -44,6 +44,9 @@
 	struct list_head	c_send_queue;
 	struct list_head	c_retrans;
 
+	u64			c_next_rx_seq;
+	struct rds_incoming	*c_rx_inc; /* being reassembled */
+
 	struct list_head	c_incs_for_acks;
 	struct rds_message	*c_ack_rm;
 	unsigned long		c_ack_deadline;
@@ -53,6 +56,10 @@
 	void			*c_transport_data;
 };
 
+/*
+ * This is used by the sending side to throttle senders based on receiver space
+ * and to cancel pending messages to a specific destination.
+ */
 struct rds_flow {
 	struct hlist_node	f_hash_node;
 	__be32			f_laddr;
@@ -60,16 +67,12 @@
 	__be16			f_lport;
 	__be16			f_fport;
 	atomic_t		f_refcount;
-	spinlock_t		f_lock; /* XXX can go away?  rx only.. */
 	struct rds_connection	*f_conn;
 	struct rds_sock		*f_rs;
 	struct list_head	f_sock_item;
 	/* f_message_list is protected by conn->c_lock */
 	struct list_head	f_message_list;
 	u32			f_message_bytes;
-
-	struct rds_incoming	*f_inc; /* being reassembled */
-	u64			f_next_rx_seq;
 };
 
 #define RDS_HEAD_FLAG_ACK	1
@@ -208,6 +211,7 @@
 void __exit rds_conn_exit(void);
 struct rds_connection *rds_conn_create(__le32 laddr, __le32 faddr,
 				       struct rds_transport *trans, gfp_t gfp);
+void rds_conn_reset(struct rds_connection *conn);
 
 /* flow.c */
 int __init rds_flow_init(void);
@@ -238,9 +242,7 @@
 void rds_inc_addref(struct rds_incoming *inc);
 void rds_inc_put(struct rds_incoming *inc);
 void rds_inc_ack(struct rds_incoming *inc, gfp_t gfp);
-int rds_recv_incoming(struct rds_connection *conn, struct rds_incoming *inc,
-		      gfp_t gfp);
-void rds_recv_enqueue(struct rds_flow *caller_flow, __be32 saddr, __be32 daddr,
+int rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
 		      struct rds_incoming *inc, gfp_t gfp);
 int rds_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
 		size_t size, int msg_flags);
@@ -251,6 +253,7 @@
 		size_t payload_len);
 int rds_send_get_next_vec(struct rds_connection *conn, struct rds_vec *vec);
 void rds_send_vec_done(struct rds_connection *conn, size_t bytes);
+void rds_send_reset_vec(struct rds_connection *conn);
 int rds_send_get_next_message(struct rds_connection *conn,
 			      struct rds_message **rm);
 void rds_send_put_next_message(struct rds_connection *conn,

Modified: trunk/linux/net/rds/recv.c
===================================================================
--- trunk/linux/net/rds/recv.c	2006-05-26 20:48:57 UTC (rev 80)
+++ trunk/linux/net/rds/recv.c	2006-05-30 18:12:57 UTC (rev 81)
@@ -56,60 +56,28 @@
 }
 
 /*
- * This is called when a transport has a full message accumulated in
- * 'inc'.  This can happen on behalf of the transport in rds_recv_incoming()
- * as it reassembles.  Transports, like loopback, call this directly when
- * they don't use rds_recv_incoming().
+ * The transport must make sure that this is serialized against other
+ * rx and conn reset on this specific conn.
+ *
+ * We currently assert that only one fragmented message will be sent
+ * down a connection at a time.  This lets us reassemble in the conn
+ * instead of per-flow which means that we don't have to go digging through
+ * flows to tear down partial reassembly progress on conn failure and
+ * we save flow lookup and locking for each frag arrival.  It does mean
+ * that small messages will wait behind large ones.  Fragmenting at all
+ * is only to reduce the memory consumption of pre-posted buffers.
+ *
+ * The caller passes in saddr and daddr instead of us getting it from the
+ * conn.  This lets loopback, who only has one conn for both directions,
+ * tell us which roles the addrs in the conn are playing for this message.
  */
-void rds_recv_enqueue(struct rds_flow *caller_flow, __be32 saddr, __be32 daddr,
+int rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
 		      struct rds_incoming *inc, gfp_t gfp)
 {
-	struct rds_flow *flow = caller_flow;
-	struct rds_sock *rs;
-	struct sock *sk;
-	unsigned long flags;
-	
-	if (flow == NULL) {
-		flow = rds_flow_create(daddr, inc->i_hdr.h_dport, saddr,
-				       inc->i_hdr.h_sport, gfp);
-		if (IS_ERR(flow)) {
-			flow = NULL;
-			goto out;
-		}
-	}
-
-	rs = flow->f_rs;
-	sk = rds_rs_to_sk(rs);
-
-	/* serialize with rds_release -> sock_orphan */
-	write_lock_irqsave(&sk->sk_callback_lock, flags);
-	if (!sock_flag(sk, SOCK_DEAD)) {
-		pr_debug("adding inc %p to rs %p's recv queue\n", inc, rs);
-		list_add_tail(&inc->i_item, &rs->rs_recv_queue);
-		rds_inc_addref(inc);
-		wake_up(sk->sk_sleep);
-		inc = NULL;  /* don't ack until it leaves the recv_queue */
-	}
-	write_unlock_irqrestore(&sk->sk_callback_lock, flags);
-
-out:
-	if (flow != caller_flow)
-		rds_flow_put(flow);
-	if (inc)
-		rds_inc_ack(inc, gfp);
-}
-
-/*
- * This shouldn't be called by loopback -- it assumes that 'faddr' on the
- * conn is the sending address and that isn't the case for loopback.
- */
-int rds_recv_incoming(struct rds_connection *conn, struct rds_incoming *inc,
-		      gfp_t gfp)
-{
 	int is_eom = inc->i_hdr.h_flags & RDS_HEAD_FLAG_EOM;
 	int is_frag = inc->i_hdr.h_flags & RDS_HEAD_FLAG_FRAG;
-	struct rds_incoming *head_inc = NULL;
-	struct rds_flow *flow;
+	struct rds_sock *rs = NULL;
+	struct sock *sk;
 	unsigned long flags;
 	int ret = 0;
 	int ack = 0;
@@ -118,24 +86,9 @@
 
 	if (inc->i_hdr.h_flags & RDS_HEAD_FLAG_ACK) {
 		rds_ack_recv(conn, inc);
-		flow = NULL;
 		goto out;
 	}
 
-	flow = rds_flow_create(conn->c_laddr, inc->i_hdr.h_dport,
-			       conn->c_faddr, inc->i_hdr.h_sport, gfp);
-	if (IS_ERR(flow)) {
-		ret = PTR_ERR(flow);
-		if (ret == -ENOTCONN) {
-			ack = 1;
-			ret = 0;
-		}
-		flow = NULL;
-		goto out;
-	}
-
-	spin_lock_irqsave(&flow->f_lock, flags);
-
 	/* 
 	 * Sequence numbers should only increase.  fragments get their
 	 * sequence number as they're queued in a sending conn.  They
@@ -156,14 +109,14 @@
 	 * XXX we could spend more on the wire to get more robust failure
 	 * detection, arguably worth it to avoid data corruption.
 	 */
-	pr_debug("inc %p seq %llu frag %d eom %d flow %p next %llu\n",
+	pr_debug("inc %p seq %llu frag %d eom %d next %llu\n",
 		 inc, be64_to_cpu(inc->i_hdr.h_sequence), is_frag, is_eom,
-		 flow, flow->f_next_rx_seq);
-	if (be64_to_cpu(inc->i_hdr.h_sequence) < flow->f_next_rx_seq) {
+		 conn->c_next_rx_seq);
+	if (be64_to_cpu(inc->i_hdr.h_sequence) < conn->c_next_rx_seq) {
 		ack = 1;
-		goto out_unlock;
+		goto out;
 	}
-	flow->f_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
+	conn->c_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
 
 	/*
 	 * we can't work with this frag in the message if we don't have
@@ -171,53 +124,64 @@
 	 * after the first few fragments of a messge arrive and are 
 	 * dropped. 
 	 */
-	if (is_frag && !flow->f_inc) {
+	if (is_frag && !conn->c_rx_inc) {
 		ack = 1;
-		goto out_unlock;
+		goto out;
 	}
 
 	/*
 	 * append this inc's payload to the previously existing incoming
 	 * message that we're reassembling in the flow.
 	 */
-	if (flow->f_inc) {
+	if (conn->c_rx_inc) {
 		pr_debug("appending inc %p to existing inc %p\n", inc,
-			 flow->f_inc);
-		conn->c_trans->inc_merge(flow->f_inc, inc);
+			 conn->c_rx_inc);
+		conn->c_trans->inc_merge(conn->c_rx_inc, inc);
 	} else {
+		/*
+		 * only enqueue the first frag if the socket is bound
+		 */
+		rs = rds_find_bound(daddr, inc->i_hdr.h_dport);
+		if (rs == NULL) {
+			ack = 1;
+			ret = 0;
+			goto out;
+		}
 		rds_inc_addref(inc);
-		flow->f_inc = inc;
+		conn->c_rx_inc = inc;
 	}
 
-	/* 
-	 * if this was the last frag then transfer the flow's ref to the
-	 * the sock recv queue.
-	 */
-	if (is_eom) {
-		head_inc = flow->f_inc;
-		flow->f_inc = NULL;
-	}
+	/* leave it on the conn 'till it all arrives */
+	if (!is_eom)
+		goto out;
 
-out_unlock:
-	spin_unlock_irqrestore(&flow->f_lock, flags);
-
 	/*
-	 * We can be racing with rds_release().  It might have marked the
-	 * socket as dead and unhashed our flow after we created it.  If so we
-	 * don't queue our message and our flow ref drop below will free the
-	 * flow.
+	 * This is the last frag so transfer c_rx_inc's ref to the the sock
+	 * recv queue.  We can be racing with rds_release() which marks the
+	 * socket dead.  If so we ack and drop the message instead of
+	 * queueing it.
 	 */
-	if (head_inc) {
-		rds_recv_enqueue(flow, conn->c_faddr, conn->c_laddr,
-				 head_inc, gfp);
-		rds_inc_put(head_inc);
+	sk = rds_rs_to_sk(rs);
+
+	/* serialize with rds_release -> sock_orphan */
+	write_lock_irqsave(&sk->sk_callback_lock, flags);
+	if (!sock_flag(sk, SOCK_DEAD)) {
+		pr_debug("adding inc %p to rs %p's recv queue\n", inc, rs);
+		list_add_tail(&inc->i_item, &rs->rs_recv_queue);
+		wake_up(sk->sk_sleep);
+	} else {
+		rds_inc_put(conn->c_rx_inc);
+		ack = 1;
 	}
+	write_unlock_irqrestore(&sk->sk_callback_lock, flags);
 
+	conn->c_rx_inc = NULL;
+
 out:
+	if (rs)
+		rds_sock_put(rs);
 	if (ack)
 		rds_inc_ack(inc, gfp);
-	if (flow)
-		rds_flow_put(flow);
 	return ret;
 }
 

Modified: trunk/linux/net/rds/send.c
===================================================================
--- trunk/linux/net/rds/send.c	2006-05-26 20:48:57 UTC (rev 80)
+++ trunk/linux/net/rds/send.c	2006-05-30 18:12:57 UTC (rev 81)
@@ -69,7 +69,7 @@
 	rds_message_put(rm);
 }
 
-static void rds_send_reset_vec(struct rds_connection *conn)
+void rds_send_reset_vec(struct rds_connection *conn)
 {
 	struct rds_message *rm;
 

Modified: trunk/linux/net/rds/tcp_connect.c
===================================================================
--- trunk/linux/net/rds/tcp_connect.c	2006-05-26 20:48:57 UTC (rev 80)
+++ trunk/linux/net/rds/tcp_connect.c	2006-05-30 18:12:57 UTC (rev 81)
@@ -125,6 +125,19 @@
 				   rds_tcp_reconnect_delay());
 }
 
+static void rds_tcp_conn_reset(struct rds_tcp_connection *tc)
+{
+	if (tc->t_tinc) {
+		rds_inc_put(&tc->t_tinc->ti_inc);
+		tc->t_tinc = NULL;
+	}
+	tc->t_tinc_hdr_rem = sizeof(struct rds_header);
+	tc->t_tinc_data_rem = 0;
+
+	if (tc->t_conn)
+		rds_conn_reset(tc->t_conn);
+}
+
 /*
  * This is executing in the single tcp thread.  Before killing the tcp
  * socket it needs to serialize with callbacks and processes in 
@@ -152,15 +165,12 @@
 		if (tc->t_conn)
 			down(&tc->t_conn->c_send_sem);
 		lock_sock(sock->sk);
+		rds_tcp_conn_reset(tc);
 		rds_tcp_restore_callbacks(sock, tc); /* tc->tc_sock = NULL */
 		release_sock(sock->sk);
 		if (tc->t_conn)
 			up(&tc->t_conn->c_send_sem);
 
-		if (tc->t_tinc) {
-			rds_inc_put(&tc->t_tinc->ti_inc);
-			tc->t_tinc = NULL;
-		}
 		sock_release(sock);
 	};
 

Modified: trunk/linux/net/rds/tcp_listen.c
===================================================================
--- trunk/linux/net/rds/tcp_listen.c	2006-05-26 20:48:57 UTC (rev 80)
+++ trunk/linux/net/rds/tcp_listen.c	2006-05-30 18:12:57 UTC (rev 81)
@@ -82,6 +82,7 @@
 	if (tc->t_sock == NULL && tc->t_conn) {
 		rds_tcp_set_callbacks(new_sock, tc);
 		new_sock = NULL;
+		/* we're trusting that these workers see the new t_sock */
 		queue_work(rds_tcp_wq, &tc->t_send_work);
 		queue_work(rds_tcp_wq, &tc->t_recv_work);
 	}

Modified: trunk/linux/net/rds/tcp_recv.c
===================================================================
--- trunk/linux/net/rds/tcp_recv.c	2006-05-26 20:48:57 UTC (rev 80)
+++ trunk/linux/net/rds/tcp_recv.c	2006-05-30 18:12:57 UTC (rev 81)
@@ -219,7 +219,10 @@
 		}
 
 		if (tc->t_tinc_hdr_rem == 0 && tc->t_tinc_data_rem == 0) {
-			ret = rds_recv_incoming(tc->t_conn, &tinc->ti_inc,
+			ret = rds_recv_incoming(tc->t_conn, 
+						tc->t_conn->c_faddr,
+						tc->t_conn->c_laddr,
+						&tinc->ti_inc,
 						arg->gfp);
 			if (ret) {
 				desc->error = ret;

Modified: trunk/linux/net/rds/tcp_send.c
===================================================================
--- trunk/linux/net/rds/tcp_send.c	2006-05-26 20:48:57 UTC (rev 80)
+++ trunk/linux/net/rds/tcp_send.c	2006-05-30 18:12:57 UTC (rev 81)
@@ -62,15 +62,24 @@
 
 	while ((ret = rds_send_get_next_vec(conn, &vec)) > 0) {
 		ret = sock->ops->sendpage(sock, vec.v_page, vec.v_off,
-					  vec.v_len, MSG_DONTWAIT);
-		pr_debug("tcp sendpage %p:%u:%u ret %zd\n", vec.v_page,
+					  vec.v_len,
+					  MSG_DONTWAIT|MSG_NOSIGNAL);
+		pr_debug("tcp sendpage %p:%u:%u ret %d\n", vec.v_page,
 			 vec.v_off, vec.v_len, ret);
-		if (ret > 0)
+
+		if (ret > 0) {
 			rds_send_vec_done(conn, ret);
-		/* XXX kill the connection on hard errors? */
-		if (ret <= 0)
-			break;
-		cond_resched();
+			continue;
+		}
+
+		/* write_space will hit after EAGAIN, all else is fatal */
+		if (ret != -EAGAIN) {
+			printk(KERN_WARNING "RDS/tcp: sendpage(%u) to "
+			       "%u.%u.%u.%u returned %d, reconnecting\n",
+			       vec.v_len, NIPQUAD(conn->c_faddr), ret);
+			queue_work(rds_tcp_wq, &tc->t_shutdown_work);
+		}
+		break;
 	} 
 
 	rds_tcp_cork(sock, 0);




More information about the rds-commits mailing list