[rds-commits] zab commits r132 - trunk/linux/net/rds

svn-commits@oss.oracle.com svn-commits at oss.oracle.com
Wed Jul 12 20:30:10 CDT 2006


Author: zab
Date: 2006-07-12 20:30:09 -0500 (Wed, 12 Jul 2006)
New Revision: 132

Modified:
   trunk/linux/net/rds/ib.h
   trunk/linux/net/rds/ib_cm.c
   trunk/linux/net/rds/ib_recv.c
   trunk/linux/net/rds/ib_send.c
   trunk/linux/net/rds/threads.c
Log:
Post IB recv work from the work completion handler.

By posting recv work from the completion handler we start to move the RDS
thread out of the fast path.  In the completion handler we try to allocate with
GFP_ATOMIC.  If either that or DMA mapping fails we kick back to the thread who
tries blocking allocations.

We allocate a ibinc per work request along with the page.  This simplifies
error handling because the completion handler now can't fail but it means
we over-allocate a bit more memory per connection.

dma_mapping_error() is used to test if the DMA mapping failed for a lack
of resources.

i_recv_mutex is introduced to synchronize ring refilling.  The completion
handler leaves refilling to the thread if it fails to acquire it.

The RDS thread recv work is taught to retry immediately or after a delay
based on whether recv returned EAGAIN or ENOMEM.


Modified: trunk/linux/net/rds/ib.h
===================================================================
--- trunk/linux/net/rds/ib.h	2006-07-12 00:26:09 UTC (rev 131)
+++ trunk/linux/net/rds/ib.h	2006-07-13 01:30:09 UTC (rev 132)
@@ -31,6 +31,7 @@
 };
 
 struct rds_ib_recv_work {
+	struct rds_ib_incoming 	*r_ibinc;
 	struct page 		*r_page;
 	struct ib_recv_wr	r_wr;
 	struct ib_sge		r_sge[2];
@@ -50,9 +51,10 @@
 	struct ib_mr		*i_mr;
 	struct ib_cq		*i_send_cq;
 	struct ib_cq		*i_recv_cq;
+	
+	/* stops tx/rx posting */
+	int 			i_wc_err;
 
-	unsigned long		i_warned;
-
 	/* tx */
 	struct rds_ib_work_ring	i_send_ring;
 	struct rds_message	*i_rm;
@@ -61,6 +63,7 @@
 	struct rds_ib_send_work *i_sends;
 
 	/* rx */
+	struct mutex		i_recv_mutex;
 	struct rds_ib_work_ring	i_recv_ring;
 	struct rds_ib_incoming	*i_ibinc;
 	u32			i_recv_data_rem;

Modified: trunk/linux/net/rds/ib_cm.c
===================================================================
--- trunk/linux/net/rds/ib_cm.c	2006-07-12 00:26:09 UTC (rev 131)
+++ trunk/linux/net/rds/ib_cm.c	2006-07-13 01:30:09 UTC (rev 132)
@@ -371,8 +371,7 @@
 		 ic->i_pd, ic->i_send_cq, ic->i_recv_cq, 
 		 ic->i_cm_id ? ic->i_cm_id->qp : NULL);
 
-	/* suppress warnings */
-	set_bit(0, &ic->i_warned);
+	ic->i_wc_err = 1;
 
 	if (ic->i_cm_id) {
 		rdsdebug("disconnectiong cm %p\n", ic->i_cm_id);
@@ -427,7 +426,7 @@
 	kfree(ic->i_recvs);
 	ic->i_recvs = NULL;
 
-	clear_bit(0, &ic->i_warned);
+	ic->i_wc_err = 0;
 }
 
 int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
@@ -439,6 +438,8 @@
 	if (ic == NULL)
 		return -ENOMEM;
 
+	mutex_init(&ic->i_recv_mutex);
+
 	conn->c_transport_data = ic;
 
 	rdsdebug("conn %p conn ic %p\n", conn, conn->c_transport_data);

Modified: trunk/linux/net/rds/ib_recv.c
===================================================================
--- trunk/linux/net/rds/ib_recv.c	2006-07-12 00:26:09 UTC (rev 131)
+++ trunk/linux/net/rds/ib_recv.c	2006-07-13 01:30:09 UTC (rev 132)
@@ -19,34 +19,6 @@
  * License along with this program; if not, write to the
  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  * Boston, MA 021110-1307, USA.
- *
- * --
- *
- * RDS/IB maintains a ring of recveive work requests that are posted on 
- * each connection.  The size of the ring is fixed as connection allocation
- * time.
- *
- * The sender first sends a header message and then sends the fragments that
- * make up the body.  Since the receive work is completed in the order that the
- * sender posted we are able to infer whether an incoming receive work has a
- * header or data fragment.   The sender is careful to always send as much data
- * as is promised in the header.
- *
- * Each receive work points at an individual page that will receive the header
- * or data DMA.  As the pages are received with headers they are left mapped in
- * the receive work and reposted.  Data pages are removed from the receive work
- * and unmapped and hung off an 'rds_ib_incoming' so that they can be passed up
- * the stack and queued on the receiving socket.  A new page must be allocated
- * and mapped to take its place.
- *
- * For now only the RDS thread fills the receive queues via ib_post_recv()
- * as we're unconditionally using dma_map_page() and believe that it would
- * block in the completion handler.
- *
- * XXX:
- * 	- figure out non-blocking page alloc/mapping for recv completion
- * 	- only raise interrupts on final frags via SOLICIT
- * 	- post recv work with seperate frag+header sges
  */
 
 #include <linux/kernel.h>
@@ -85,6 +57,7 @@
 	u32 i;
 
 	for(i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
+		recv->r_ibinc = NULL;
 		recv->r_page = NULL;
 
 		recv->r_wr.next = NULL;
@@ -92,6 +65,7 @@
 		recv->r_wr.sg_list = recv->r_sge;
 		recv->r_wr.num_sge = 2;
 
+		recv->r_sge[0].addr = 0;
 		recv->r_sge[0].length = RDS_FRAG_SIZE;
 		recv->r_sge[0].lkey = ic->i_mr->lkey;
 
@@ -108,6 +82,8 @@
 	u32 i;
 
 	for(i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
+		if (recv->r_ibinc)
+			rds_inc_put(&recv->r_ibinc->ii_inc);
 		if (recv->r_sge[0].addr)
 			rds_ib_recv_unmap_page(ic, recv);
 		if (recv->r_page)
@@ -115,6 +91,75 @@
 	}
 }
 
+/*
+ * This tries to allocate and post unused work requests after making sure that
+ * they have all their data allocated.  The i_recv_mutex is held here so that
+ * ring_alloc and _unalloc pairs don't go unmatched.
+ *
+ * -1 is returned if posting fails due to temporary resource exhaustion.
+ */
+int rds_ib_recv_refill(struct rds_connection *conn, gfp_t ibinc_gfp,
+		       gfp_t page_gfp)
+{
+	struct rds_ib_connection *ic = conn->c_transport_data;
+	struct rds_ib_recv_work *recv;
+	struct ib_recv_wr *failed_wr;
+	int ret = 0;
+	u32 pos;
+
+	while (!ic->i_wc_err && rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
+
+		recv = &ic->i_recvs[pos];
+
+		if (recv->r_ibinc == NULL) {
+			recv->r_ibinc = kmem_cache_alloc(rds_ib_incoming_slab,
+						         ibinc_gfp);
+			if (recv->r_ibinc) {
+				INIT_LIST_HEAD(&recv->r_ibinc->ii_pages);
+				rds_inc_init(&recv->r_ibinc->ii_inc, conn,
+					     conn->c_faddr);
+			}
+		}
+
+		if (recv->r_page == NULL)
+			recv->r_page = alloc_page(page_gfp);
+
+		if (!recv->r_sge[0].addr) {
+			recv->r_sge[0].addr =
+				dma_map_page(ic->i_cm_id->device->dma_device,
+					     recv->r_page, 0, PAGE_SIZE,
+					     DMA_FROM_DEVICE);
+
+			if (dma_mapping_error(recv->r_sge[0].addr))
+				recv->r_sge[0].addr = 0;
+		}
+
+		if (!recv->r_ibinc || !recv->r_page || !recv->r_sge[0].addr) {
+			ret = -1;
+			break;
+		}
+
+		/* XXX when can this fail? */
+		ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
+		rdsdebug("recv %p ibinc %p page %p addr %llu ret %d\n", recv,
+			 recv->r_ibinc, recv->r_page,
+			 (unsigned long long)recv->r_sge[0].addr, ret);
+		if (ret) {
+			printk(KERN_WARNING "RDS/IB: recv post on "
+			       "%u.%u.%u.%u returned %d, disconnecting and "
+			       "reconnecting\n", NIPQUAD(conn->c_faddr),
+			       ret);
+			queue_work(rds_wq, &conn->c_shutdown_work);
+			ret = -1;
+			break;
+		}
+	}
+
+	if (ret)
+		rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
+	return ret;
+}
+
 void rds_ib_inc_free(struct rds_incoming *inc)
 {
 	struct rds_ib_incoming *ibinc;
@@ -220,35 +265,32 @@
 }
 
 /*
- * We're relying on RC completions arriving in post order so that we always
- * get an orderly stream of message headers and then their data fragments.
+ * Rings are posted with all the allocations they'll need to queue the
+ * incoming message to the receiving socket so this can't fail.  It relies
+ * on being called in the order that the sender sent in to infer which
+ * fragments start messages and so will have a header appended. 
  */
-static int rds_ib_process_recv(struct rds_connection *conn,
-			       struct rds_ib_recv_work *recv, u32 byte_len)
+static void rds_ib_process_recv(struct rds_connection *conn,
+				struct rds_ib_recv_work *recv, u32 byte_len)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct rds_ib_incoming *ibinc = ic->i_ibinc;
-	int ret = 0;
 
 	/* XXX shut down the connection if port 0,0 are seen? */
 
-	rdsdebug("ic %p recv %p byte len %u\n", ic, recv, byte_len);
+	rdsdebug("ic %p ibinc %p recv %p byte len %u\n", ic, ibinc, recv,
+		 byte_len);
 
 	/*
-	 * If this is just a header allocate an inc to track the upcoming
-	 * message payloads.  We leave the page allocated and mapped so 
-	 * that it can be reposted.
-	 */
+	 * If we don't already have an inc on the connection then this
+	 * fragment has a header and starts a message.. copy its header
+	 * into the inc and save the inc so we can hang upcoming fragments
+	 * off its list.
+	 */ 
 	if (ibinc == NULL) {
-		ibinc = kmem_cache_alloc(rds_ib_incoming_slab, GFP_ATOMIC);
-		if (ibinc == NULL) {
-			ret = -ENOMEM;
-			goto out;
-		}
+		ibinc = recv->r_ibinc;
+		recv->r_ibinc = NULL;
 
-		INIT_LIST_HEAD(&ibinc->ii_pages);
-		rds_inc_init(&ibinc->ii_inc, conn, conn->c_faddr);
-
 		if (byte_len <= RDS_FRAG_SIZE) {
 			/* 
 			 * XXX The remainter of the page will need to be zeroed
@@ -267,6 +309,8 @@
 		}
 		ic->i_ibinc = ibinc;
 		ic->i_recv_data_rem = be32_to_cpu(ibinc->ii_inc.i_hdr.h_len);
+		rdsdebug("ic %p ibinc %p rem %u\n", ic, ibinc,
+			 ic->i_recv_data_rem);
 	}
 
 	rds_ib_recv_unmap_page(ic, recv);
@@ -282,24 +326,17 @@
 				  &ibinc->ii_inc, GFP_ATOMIC, KM_SOFTIRQ0);
 		rds_inc_put(&ibinc->ii_inc);
 	}
-out:
-	return ret;
 }
 
-/*
- * XXX serialize with the thread somehow?  TCP does this by getting
- * the callback lock while it is working in the thread.
- */
 void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 {
 	struct rds_connection *conn = context;
 	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct ib_wc wc;
 	struct rds_ib_recv_work *recv;
-	u32 avail;
-	int ret;
+	int ret = 0;
 
-	rdsdebug("cq %p conn %p\n", cq, conn);
+	rdsdebug("conn %p cq %p\n", conn, cq);
 
 	ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
 
@@ -309,21 +346,14 @@
 			 be32_to_cpu(wc.imm_data));
 
 		recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
-		if (wc.status == IB_WC_SUCCESS) {
-			ret = rds_ib_process_recv(conn, recv, wc.byte_len);
-			if (ret == -ENOMEM)
-				queue_work(rds_wq, &conn->c_recv_work);
-		}
+		if (wc.status == IB_WC_SUCCESS)
+			rds_ib_process_recv(conn, recv, wc.byte_len);
 
-		avail = rds_ib_ring_free(&ic->i_recv_ring, 1);
-		if (avail > ic->i_recv_ring.w_nr / 2)
-			queue_work(rds_wq, &conn->c_recv_work);
-		else if (avail == ic->i_recv_ring.w_nr)
-			wake_up(&rds_ib_empty_wait);
+		rds_ib_ring_free(&ic->i_recv_ring, 1);
 
 		/* We expect errors as the qp is drained during shutdown */
-		if (wc.status != IB_WC_SUCCESS &&
-		    !test_and_set_bit(0, &ic->i_warned)) {
+		if (wc.status != IB_WC_SUCCESS && !ic->i_wc_err) {
+			ic->i_wc_err = 1;
 			printk(KERN_WARNING "RDS/IB: completion on "
 			       "%u.%u.%u.%u had status %u, disconnecting and "
 			       "reconnecting\n", NIPQUAD(conn->c_faddr),
@@ -331,56 +361,46 @@
 			queue_work(rds_wq, &conn->c_shutdown_work);
 		}
 	}
+	
+	/* 
+	 * If we fail to refill we assume it's a allocation failure
+	 * from our use of GFP_ATOMIC and we want the thread to try again
+	 * immediately.  Similarly, if the thread is already trying to
+	 * refill we want it to try again immediately as it may have missed
+	 * the ring entry we just completed before it released the
+	 * i_recv_mutex.
+	 */
+	if (mutex_trylock(&ic->i_recv_mutex)) {
+		if (rds_ib_recv_refill(conn, GFP_ATOMIC,
+					 GFP_ATOMIC | __GFP_HIGHMEM))
+			ret = -EAGAIN;
+		mutex_unlock(&ic->i_recv_mutex);
+	} else 
+		ret = -EAGAIN;
+
+	if (ret)
+		queue_work(rds_wq, &conn->c_recv_work);
 }
 
-int rds_ib_recv_refill(struct rds_connection *conn, gfp_t gfp)
+int rds_ib_recv(struct rds_connection *conn)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
-	struct rds_ib_recv_work *recv;
-	struct ib_recv_wr *failed_wr;
 	int ret = 0;
-	u32 pos;
 
-	while (rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
+	rdsdebug("conn %p\n", conn);
 
-		recv = &ic->i_recvs[pos];
+	/*
+	 * If we get a temporary posting failure in this context then
+	 * we're really low and we want the caller to back off for a bit.
+	 */
+	mutex_lock(&ic->i_recv_mutex);
+	if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER))
+		ret = -ENOMEM;
+	mutex_unlock(&ic->i_recv_mutex);
 
-		if (recv->r_page == NULL) {
-			recv->r_page = alloc_page(gfp);
-			if (recv->r_page == NULL) {
-				ret = -ENOMEM;
-				break;
-			}
-
-			/* XXX this can't fail? */
-			recv->r_sge[0].addr =
-				dma_map_page(ic->i_cm_id->device->dma_device,
-					     recv->r_page, 0, PAGE_SIZE,
-					     DMA_FROM_DEVICE);
-		}
-
-		rdsdebug("recv %p page %p addr %llu\n", recv, recv->r_page,
-			 recv->r_sge[0].addr);
-			 
-		ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
-		rdsdebug("ret %d\n", ret);
-		if (ret) {
-			rds_ib_recv_unmap_page(ic, recv);
-			rds_ib_recv_free_page(recv);
-			rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
-			break;
-		}
-	}
-
 	return ret;
 }
 
-/* XXX have this also poll the conn's completion queue */
-int rds_ib_recv(struct rds_connection *conn)
-{
-	return rds_ib_recv_refill(conn, GFP_KERNEL);
-}
-
 int __init rds_ib_recv_init(void)
 {
 	rds_ib_incoming_slab = kmem_cache_create("rds_ib_incoming",

Modified: trunk/linux/net/rds/ib_send.c
===================================================================
--- trunk/linux/net/rds/ib_send.c	2006-07-12 00:26:09 UTC (rev 131)
+++ trunk/linux/net/rds/ib_send.c	2006-07-13 01:30:09 UTC (rev 132)
@@ -103,8 +103,8 @@
 		rds_ib_ring_free(&ic->i_send_ring, 1);
 
 		/* We expect errors as the qp is drained during shutdown */
-		if (wc.status != IB_WC_SUCCESS &&
-		    !test_and_set_bit(0, &ic->i_warned)) {
+		if (wc.status != IB_WC_SUCCESS && !ic->i_wc_err) {
+			ic->i_wc_err = 1;
 			printk(KERN_WARNING "RDS/IB: completion on "
 			       "%u.%u.%u.%u had status %u, disconnecting and "
 			       "reconnecting\n", NIPQUAD(conn->c_faddr),

Modified: trunk/linux/net/rds/threads.c
===================================================================
--- trunk/linux/net/rds/threads.c	2006-07-12 00:26:09 UTC (rev 131)
+++ trunk/linux/net/rds/threads.c	2006-07-13 01:30:09 UTC (rev 132)
@@ -112,7 +112,8 @@
 
 	if (test_bit(RDS_CONN_CONNECTED, &conn->c_status)) {
 		ret = rds_send_xmit(conn);
-		rdsdebug("sending to conn %p returned %d\n", conn, ret);
+		rdsdebug("conn %p ret %d\n", conn, ret);
+		/* XXX up some stat.. */
 		switch (ret) {
 			case -EAGAIN:
 				queue_work(rds_wq, &conn->c_send_work);
@@ -132,10 +133,17 @@
 
 	if (test_bit(RDS_CONN_CONNECTED, &conn->c_status)) {
 		ret = conn->c_trans->recv(conn);
-		rdsdebug("recv thread conn %p returned %d\n", conn, ret);
-		if (ret == -ENOMEM) {
-			/* XXX up some stat.. */
-			queue_delayed_work(rds_wq, &conn->c_recv_work, 2);
+		rdsdebug("conn %p ret %d\n", conn, ret);
+		/* XXX up some stat.. */
+		switch (ret) {
+			case -EAGAIN:
+				queue_work(rds_wq, &conn->c_recv_work);
+				break;
+			case -ENOMEM:
+				queue_delayed_work(rds_wq, &conn->c_recv_work,
+						   2);
+			default:
+				break;
 		}
 	}
 }




More information about the rds-commits mailing list