[rds-devel] [PATCH 2/2] RDS/IB: add connection level flow control

Olaf Kirch olaf.kirch at oracle.com
Tue Nov 13 08:25:43 PST 2007


From: Olaf Kirch <olaf.kirch at oracle.com>

[EXPERIMENTAL] RDS/IB: add connection level flow control

Rather than relying on RNR NAKs to fix recv queue overruns for us,
this patch introduces flow control on the RC connection.

This flow control mechanism is based on the count of posted receive
buffers.

We add an IB specific extension to the RDS header, which includes
the following two 32bit quantities:

 -	a sequence number. Each IB work request has a distinct number,
 	even for fragments of the same RDS message.

	This is different from the RDS sequence number space, which
	may not be consecutive, and which uses one sequence number for the
	entirety of an RDS message.

 -	a window sequence number, denoting the end of the receive window.
	Essentially, this is the last sequence number received from the
	peer, plus the number of posted receive buffers.

 	The peer should not transmit packets with a sequence number equal
	or greater to this value (modulo wrap-around).

These quantities are transmitted with every packet. When the sender
notices that it is running low on send credits, it explicitly requests
an ACK packet (which would carry updated window information).

When we find we're unable to refill the recv queue from the recv CQ
completion handler, and refilling is deferred to the worker thread,
we also send an ACK packet after refilling the queue.

In practice, it would probably be sufficient to make the sequence and window
quantities 16 bits wide, unless we expect to deal with receive queues of 32000 entries.

Signed-off-by: Olaf Kirch <olaf.kirch at oracle.com>
---
 net/rds/ib.h       |   13 +++++++++
 net/rds/ib_cm.c    |    7 +++++
 net/rds/ib_recv.c  |   70 +++++++++++++++++++++++++++++++++++++++++++++++++++--
 net/rds/ib_send.c  |   50 +++++++++++++++++++++++++++++++++++++
 net/rds/ib_stats.c |    1 
 net/rds/rds.h      |    1 
 6 files changed, 138 insertions(+), 4 deletions(-)

Index: ofa_kernel-1.2.5.1/net/rds/ib.h
===================================================================
--- ofa_kernel-1.2.5.1.orig/net/rds/ib.h
+++ ofa_kernel-1.2.5.1/net/rds/ib.h
@@ -57,7 +57,12 @@ struct rds_ib_work_ring {
 struct rds_ib_header {
 	struct rds_header	i_base;
 
-	/* IB specific fields to be added by follow-up patch */
+	/* The IB transport has another "sequence number" of its own,
+	 * which is incremented per work request.
+	 * This is used for flow control.
+	 */
+	__be32			i_seq;
+	__be32			i_win;
 };
 
 struct rds_ib_connection {
@@ -74,6 +79,8 @@ struct rds_ib_connection {
 	struct rds_ib_header	*i_send_hdrs;
 	dma_addr_t 		i_send_hdrs_dma;
 	struct rds_ib_send_work *i_sends;
+	atomic_t		i_send_seq;
+	atomic_t		i_send_window;
 
 	/* rx */
 	struct mutex		i_recv_mutex;
@@ -86,6 +93,9 @@ struct rds_ib_connection {
 	struct rds_page_frag	i_frag;
 	dma_addr_t 		i_addr;
 	u64			i_ack_recv;	/* last ACK received */
+	u32			i_recv_seq;
+	u32			i_recv_window_update;
+	atomic_t		i_recv_window;
 
 	/* sending acks */
 	unsigned long		i_ack_flags;
@@ -123,6 +133,7 @@ struct rds_ib_statistics {
 	unsigned long	s_ib_tx_ring_full;
 	unsigned long	s_ib_tx_sg_mapping_failure;
 	unsigned long	s_ib_tx_stalled;
+	unsigned long	s_ib_tx_no_credits;
 	unsigned long	s_ib_rx_cq_call;
 	unsigned long	s_ib_rx_cq_event;
 	unsigned long	s_ib_rx_ring_empty;
Index: ofa_kernel-1.2.5.1/net/rds/ib_send.c
===================================================================
--- ofa_kernel-1.2.5.1.orig/net/rds/ib_send.c
+++ ofa_kernel-1.2.5.1/net/rds/ib_send.c
@@ -150,6 +150,30 @@ void rds_ib_send_cq_comp_handler(struct 
 	}
 }
 
+/*
+ * Fill in the IB extensions of the header
+ */
+static void rds_ib_populate_header(struct rds_ib_connection *ic, struct rds_ib_header *ihdr)
+{
+	u32 seq = atomic_inc_return(&ic->i_send_seq);
+	u32 win = atomic_read(&ic->i_recv_window);
+
+	ihdr->i_seq = cpu_to_be32(seq);
+	ihdr->i_win = cpu_to_be32(win);
+}
+
+static unsigned int rds_ib_send_credits(struct rds_ib_connection *ic)
+{
+	s32 credits;
+
+	/* Subtract one to account for at least one ACK packet */
+	credits = atomic_read(&ic->i_send_window)
+		- atomic_read(&ic->i_send_seq) - 1;
+	if (credits <= 0)
+		return 0;
+	return credits;
+}
+
 int rds_ib_xmit_cong_map(struct rds_connection *conn,
 			 struct rds_cong_map *map, unsigned long offset)
 {
@@ -164,6 +188,7 @@ int rds_ib_xmit_cong_map(struct rds_conn
 	unsigned long i;
 	unsigned int off;
 	u32 pos;
+	u32 credits;
 	u32 work_alloc;
 	int ret;
 
@@ -193,6 +218,13 @@ int rds_ib_xmit_cong_map(struct rds_conn
 	 * be enough work requests to send the entire bitmap.
 	 */
 	i = ceil(RDS_CONG_MAP_BYTES, RDS_FRAG_SIZE),
+	credits = rds_ib_send_credits(ic);
+	if (credits < i) {
+		set_bit(RDS_LL_SEND_THROTTLE, &conn->c_flags);
+		rds_ib_stats_inc(s_ib_tx_no_credits);
+		ret = -ENOMEM;
+		goto out;
+	}
 	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
 	if (work_alloc != i) {
 		rds_ib_stats_inc(s_ib_tx_ring_full);
@@ -230,6 +262,7 @@ int rds_ib_xmit_cong_map(struct rds_conn
 
 		/* build the header and include it in the wr */
 		ihdr = &ic->i_send_hdrs[pos];
+		rds_ib_populate_header(ic, ihdr);
 		hdr = &ihdr->i_base;
 		memset(hdr, 0, sizeof(struct rds_header));
 		hdr->h_flags = RDS_FLAG_CONG_BITMAP;
@@ -289,6 +322,7 @@ int rds_ib_xmit(struct rds_connection *c
 	u32 pos;
 	u32 i;
 	u32 work_alloc;
+	u32 credits;
 	int sent;
 	int ret;
 
@@ -300,6 +334,18 @@ int rds_ib_xmit(struct rds_connection *c
 	else
 		i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
 
+	/* Bound the amount we can send by the number of recv buffers
+	 * the remote has posted. */
+	credits = rds_ib_send_credits(ic);
+	if (credits == 0) {
+		set_bit(RDS_LL_SEND_THROTTLE, &conn->c_flags);
+		rds_ib_stats_inc(s_ib_tx_no_credits);
+		ret = -ENOMEM;
+		goto out;
+	}
+	if (i > credits)
+		i = credits;
+
 	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
 	if (work_alloc == 0) {
 		set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
@@ -324,7 +370,8 @@ int rds_ib_xmit(struct rds_connection *c
 		ic->i_rm = rm;
 
 		/* Finalize the header */
-		if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
+		if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags)
+		 || credits < 16)
 			rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED;
 		if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))
 			rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED;
@@ -406,6 +453,7 @@ add_header:
 		/* Tack on the header after the data. send->s_sge[1] should already
 		 * have been set up to point to the right header buffer. */
 		ihdr = &ic->i_send_hdrs[pos];
+		rds_ib_populate_header(ic, ihdr);
 		memcpy(&ihdr->i_base, &rm->m_inc.i_hdr, sizeof(struct rds_header));
 		send->s_wr.num_sge = 2;
 
Index: ofa_kernel-1.2.5.1/net/rds/ib_cm.c
===================================================================
--- ofa_kernel-1.2.5.1.orig/net/rds/ib_cm.c
+++ ofa_kernel-1.2.5.1/net/rds/ib_cm.c
@@ -74,6 +74,13 @@ static void rds_ib_connect_complete(stru
 				qp_attr.min_rnr_timer, -ret);
 	}
 
+	/* Initialize send_window with some arbitrary value - the initial
+	 * cong map update will give us the right value anyway. */
+	atomic_set(&ic->i_recv_window, 1 + ic->i_recv_ring.w_nr);
+	atomic_set(&ic->i_send_window, 16);
+	atomic_set(&ic->i_send_seq, 1);
+	ic->i_recv_seq = 1;
+
 	rds_connect_complete(conn);
 }
 
Index: ofa_kernel-1.2.5.1/net/rds/ib_recv.c
===================================================================
--- ofa_kernel-1.2.5.1.orig/net/rds/ib_recv.c
+++ ofa_kernel-1.2.5.1/net/rds/ib_recv.c
@@ -221,6 +221,7 @@ int rds_ib_recv_refill(struct rds_connec
 	struct ib_recv_wr *failed_wr;
 	int ret = 0;
 	u32 pos;
+	u32 window;
 
 	while (rds_conn_up(conn) && rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
 
@@ -246,6 +247,11 @@ int rds_ib_recv_refill(struct rds_connec
 		}
 	}
 
+	/* Update the window we will advertise to our peer - ie the
+	 * maximum sequence number we're prepared to receive. */
+	window = ic->i_recv_seq + ic->i_recv_ring.w_nr - ic->i_recv_ring.w_nr_free;
+	atomic_set(&ic->i_recv_window, window);
+
 	if (ret)
 		rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
 	return ret;
@@ -438,6 +444,16 @@ static void rds_ib_send_ack(struct rds_i
 	rds_message_make_checksum(hdr);
 	ic->i_ack_queued = jiffies;
 
+	/* We do not synchronize with the send path of normal (data)
+	 * packets here. So we do not put a sequence number on the
+	 * ACK packet. We do increment the send sequence, however,
+	 * in order to account for the recv queue slot we consume.
+	 */
+	ihdr->i_win = cpu_to_be32(atomic_read(&ic->i_recv_window));
+	ihdr->i_seq = 0;
+	atomic_inc(&ic->i_send_seq);
+	WARN_ON(atomic_read(&ic->i_send_seq) >= atomic_read(&ic->i_send_window));
+
 	ret = ib_post_send(ic->i_cm_id->qp, &ic->i_ack_wr, &failed_wr);
 	if (unlikely(ret)) {
 		/* Failed to send. Release the WR, and
@@ -632,6 +648,33 @@ static void rds_ib_cong_recv(struct rds_
 }
 
 /*
+ * Process the IB specific part of the header.
+ */
+static int rds_ib_process_header(struct rds_ib_connection *ic, struct rds_ib_header *ihdr)
+{
+	unsigned int seq = (unsigned int) be32_to_cpu(ihdr->i_seq);
+	unsigned int win = (unsigned int) be32_to_cpu(ihdr->i_win);
+
+	/* ACKs don't carry a sequence number. */
+	if (seq)
+		ic->i_recv_seq = seq;
+
+	/* The window may appear to go backwards sometimes. This happens
+	 * if the remote starts building a packet while we're in the
+	 * recv CQ handler. The send path will see the old window, all
+	 * the while the CQ handler refills the ring, and posts an ACK with
+	 * the updated window. If this ACK gets posted before the data
+	 * packet, we will receive the older (smaller) window value
+	 * after the newer (greater) window.
+	 */
+	if (win > atomic_read(&ic->i_send_window)) {
+		atomic_set(&ic->i_send_window, win);
+		return 1;
+	}
+	return 0;
+}
+
+/*
  * Rings are posted with all the allocations they'll need to queue the
  * incoming message to the receiving socket so this can't fail.
  * All fragments start with a header, so we can make sure we're not receiving
@@ -670,6 +713,20 @@ static void rds_ib_process_recv(struct r
 
 	if (!rds_ib_copy_header(conn, &hdr_buf, recv, byte_len))
 		return;
+
+	/* If we were previously throttling sends because we ran out of
+	 * credits, kick the xmit thread now to resume */
+	if (rds_ib_process_header(ic, &hdr_buf)
+	 && test_and_clear_bit(RDS_LL_SEND_THROTTLE, &conn->c_flags))
+		queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+
+	/* Periodically send a gratuitous ACK, in order to update the
+	 * peer's send window. */
+	if (ic->i_recv_seq >= ic->i_recv_window_update) {
+		ic->i_recv_window_update = ic->i_recv_seq + ic->i_recv_ring.w_nr / 2;
+		state->ack_required = 1;
+	}
+
 	hdr = &hdr_buf.i_base;
 
 	/* Process the ACK sequence which comes with every packet */
@@ -812,7 +869,6 @@ void rds_ib_recv_cq_comp_handler(struct 
 		rds_send_drop_acked(conn, state.ack_recv, NULL);
 		ic->i_ack_recv = state.ack_recv;
 	}
-	rds_ib_attempt_ack(ic);
 
 	/* 
 	 * XXX atomic is bad as it drains reserve pools, we should really
@@ -847,6 +903,11 @@ void rds_ib_recv_cq_comp_handler(struct 
 
 	if (ret)
 		queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
+
+	/* If we send an ACK, send it after replenishing the
+	 * receive buffers, s.t. we can give the peer up-to-date
+	 * credits */
+	rds_ib_attempt_ack(ic);
 }
 
 int rds_ib_recv(struct rds_connection *conn)
@@ -863,8 +924,13 @@ int rds_ib_recv(struct rds_connection *c
 	mutex_lock(&ic->i_recv_mutex);
 	if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER))
 		ret = -ENOMEM;
-	else
+	else {
 		rds_ib_stats_inc(s_ib_rx_refill_from_thread);
+
+		/* Force send an ACK to inform the peer about the refill */
+		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
+		rds_ib_attempt_ack(ic);
+	}
 	mutex_unlock(&ic->i_recv_mutex);
 
 	return ret;
Index: ofa_kernel-1.2.5.1/net/rds/ib_stats.c
===================================================================
--- ofa_kernel-1.2.5.1.orig/net/rds/ib_stats.c
+++ ofa_kernel-1.2.5.1/net/rds/ib_stats.c
@@ -48,6 +48,7 @@ static char *rds_ib_stat_names[] = {
 	"ib_tx_ring_full",
 	"ib_tx_sg_mapping_failure",
 	"ib_tx_stalled",
+	"ib_tx_no_credits",
 	"ib_rx_cq_call",
 	"ib_rx_cq_event",
 	"ib_rx_ring_empty",
Index: ofa_kernel-1.2.5.1/net/rds/rds.h
===================================================================
--- ofa_kernel-1.2.5.1.orig/net/rds/rds.h
+++ ofa_kernel-1.2.5.1/net/rds/rds.h
@@ -110,6 +110,7 @@ enum {
 
 /* Bits for c_flags */
 #define RDS_LL_SEND_FULL	0
+#define RDS_LL_SEND_THROTTLE	1
 
 struct rds_connection {
 	struct hlist_node	c_hash_node;

-- 
Olaf Kirch  |  --- o --- Nous sommes du soleil we love when we play
okir at lst.de |    / | \   sol.dhoop.naytheet.ah kin.ir.samse.qurax



More information about the rds-devel mailing list