[rds-commits] zab commits r131 - trunk/linux/net/rds

svn-commits@oss.oracle.com svn-commits at oss.oracle.com
Tue Jul 11 19:26:11 CDT 2006


Author: zab
Date: 2006-07-11 19:26:09 -0500 (Tue, 11 Jul 2006)
New Revision: 131

Modified:
   trunk/linux/net/rds/ib.c
   trunk/linux/net/rds/ib.h
   trunk/linux/net/rds/ib_cm.c
   trunk/linux/net/rds/ib_recv.c
   trunk/linux/net/rds/ib_send.c
   trunk/linux/net/rds/loop.c
   trunk/linux/net/rds/rds.h
   trunk/linux/net/rds/send.c
   trunk/linux/net/rds/tcp.c
   trunk/linux/net/rds/tcp.h
   trunk/linux/net/rds/tcp_send.c
   trunk/linux/net/rds/threads.c
Log:
Send the RDS header along with the first fragment in the IB work request.

To do this we rework the transport sending interface a little so that
the transport gets one call that indicates both header and data sending
progress.  Transports now have to be careful to handle zero length messages
in their single transmit routines.

The IB work requests in the send and receive rings are now built with 2 sges.
The receiver also now has a ring of receive headers to match the sender.

On the sending side we use the second sge of the first fragment to point to mapped header region.  Further fragments of the message only have one sge.

The receiver is taught to recognize that the header will be in the first
sge if the first fragment isn't full sized.  It always posts the receive
work with the second sage pointing to the mapped headers so that it can
receive full fragments and a header.

TCP is updated to the new interface.  It still uses a seperate sendmsg
to send the header before using sendpage to send each data page. 

xmit_{prepare,complete} are now optional so that loopback and IB don't provide
empty functions. 

Now that the main transmit path references entire messages it can be
used by loopback.

Ack generation from the sending path is disabled temporarily.

Lots of code disappears.  The old IB header transmit path is folded
into the single remaining transmit path.  The IB incoming allocation
path is folded into the receive processing path.  Loopback no longer
needs rds_send_{get,put}_next_message().


Modified: trunk/linux/net/rds/ib.c
===================================================================
--- trunk/linux/net/rds/ib.c	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/ib.c	2006-07-12 00:26:09 UTC (rev 131)
@@ -57,11 +57,7 @@
 
 struct rds_transport rds_ib_transport = {
 	.laddr_check		= rds_ib_laddr_check,
-	.xmit			= rds_send_xmit,
-	.xmit_prepare		= rds_ib_xmit_prepare,
-	.xmit_complete		= rds_ib_xmit_complete,
-	.xmit_header		= rds_ib_xmit_header,
-	.xmit_data		= rds_ib_xmit_data,
+	.xmit			= rds_ib_xmit,
 	.recv			= rds_ib_recv,
 	.conn_alloc		= rds_ib_conn_alloc,
 	.conn_free		= rds_ib_conn_free,

Modified: trunk/linux/net/rds/ib.h
===================================================================
--- trunk/linux/net/rds/ib.h	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/ib.h	2006-07-12 00:26:09 UTC (rev 131)
@@ -26,14 +26,14 @@
 
 struct rds_ib_send_work {
 	struct rds_message	*s_rm;
-	struct ib_sge		s_sge;
 	struct ib_send_wr	s_wr;
+	struct ib_sge		s_sge[2];
 };
 
 struct rds_ib_recv_work {
 	struct page 		*r_page;
-	struct ib_sge		r_sge;
 	struct ib_recv_wr	r_wr;
+	struct ib_sge		r_sge[2];
 };
 
 struct rds_ib_work_ring {
@@ -56,14 +56,16 @@
 	/* tx */
 	struct rds_ib_work_ring	i_send_ring;
 	struct rds_message	*i_rm;
-	struct rds_header	*i_headers;
-	dma_addr_t 		i_headers_dma;
+	struct rds_header	*i_send_hdrs;
+	dma_addr_t 		i_send_hdrs_dma;
 	struct rds_ib_send_work *i_sends;
 
 	/* rx */
 	struct rds_ib_work_ring	i_recv_ring;
 	struct rds_ib_incoming	*i_ibinc;
 	u32			i_recv_data_rem;
+	struct rds_header	*i_recv_hdrs;
+	dma_addr_t 		i_recv_hdrs_dma;
 	struct rds_ib_recv_work *i_recvs;
 };
 
@@ -106,13 +108,8 @@
 u32 rds_ib_ring_oldest(struct rds_ib_work_ring *ring);
 
 /* ib_send.c */
-void rds_ib_xmit_prepare(struct rds_connection *conn);
-void rds_ib_xmit_complete(struct rds_connection *conn);
-int rds_ib_xmit_header(struct rds_connection *conn,
-			struct rds_message *rm, unsigned int off);
-int rds_ib_xmit_data(struct rds_connection *conn,
-		      struct rds_message *rm, unsigned int sg,
-		      unsigned int off);
+int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
+	        unsigned int hdr_off, unsigned int sg, unsigned int off);
 void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context);
 void rds_ib_send_init_ring(struct rds_ib_connection *ic);
 void rds_ib_send_clear_ring(struct rds_ib_connection *ic);

Modified: trunk/linux/net/rds/ib_cm.c
===================================================================
--- trunk/linux/net/rds/ib_cm.c	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/ib_cm.c	2006-07-12 00:26:09 UTC (rev 131)
@@ -147,16 +147,26 @@
 		goto out;
 	}
 
-	ic->i_headers = dma_alloc_coherent(ic->i_cm_id->device->dma_device,
+	ic->i_send_hdrs = dma_alloc_coherent(ic->i_cm_id->device->dma_device,
 					   ic->i_send_ring.w_nr *
 					   	sizeof(struct rds_header),
-					   &ic->i_headers_dma, GFP_KERNEL);
-	if (ic->i_headers == NULL) {
+					   &ic->i_send_hdrs_dma, GFP_KERNEL);
+	if (ic->i_send_hdrs == NULL) {
 		ret = -ENOMEM;
-		rdsdebug("dma_alloc_coherent failed\n");
+		rdsdebug("dma_alloc_coherent send failed\n");
 		goto out;
 	}
 
+	ic->i_recv_hdrs = dma_alloc_coherent(ic->i_cm_id->device->dma_device,
+					   ic->i_recv_ring.w_nr *
+					   	sizeof(struct rds_header),
+					   &ic->i_recv_hdrs_dma, GFP_KERNEL);
+	if (ic->i_recv_hdrs == NULL) {
+		ret = -ENOMEM;
+		rdsdebug("dma_alloc_coherent recv failed\n");
+		goto out;
+	}
+
 	ic->i_sends = kmalloc(ic->i_send_ring.w_nr *
 				sizeof(struct rds_ib_send_work), GFP_KERNEL);
 	if (ic->i_sends == NULL) {
@@ -372,12 +382,20 @@
 			   rds_ib_ring_empty(&ic->i_send_ring) &&
 			   rds_ib_ring_empty(&ic->i_recv_ring));
 
-		if (ic->i_headers)
+		if (ic->i_send_hdrs)
 			dma_free_coherent(ic->i_cm_id->device->dma_device,
 					   ic->i_send_ring.w_nr *
 					   	sizeof(struct rds_header),
-					   ic->i_headers, ic->i_headers_dma);
+					   ic->i_send_hdrs,
+					   ic->i_send_hdrs_dma);
 
+		if (ic->i_recv_hdrs)
+			dma_free_coherent(ic->i_cm_id->device->dma_device,
+					   ic->i_recv_ring.w_nr *
+					   	sizeof(struct rds_header),
+					   ic->i_recv_hdrs,
+					   ic->i_recv_hdrs_dma);
+
 		rds_ib_send_clear_ring(ic);
 		rds_ib_recv_clear_ring(ic);
 
@@ -395,7 +413,8 @@
 		ic->i_pd = NULL;
 		ic->i_send_cq = NULL;
 		ic->i_recv_cq = NULL;
-		ic->i_headers = NULL;
+		ic->i_send_hdrs = NULL;
+		ic->i_recv_hdrs = NULL;
 	}
 
 	if (ic->i_ibinc) {

Modified: trunk/linux/net/rds/ib_recv.c
===================================================================
--- trunk/linux/net/rds/ib_recv.c	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/ib_recv.c	2006-07-12 00:26:09 UTC (rev 131)
@@ -62,7 +62,7 @@
 
 static void rds_ib_recv_free_page(struct rds_ib_recv_work *recv)
 {
-	BUG_ON(recv->r_sge.addr != 0);
+	BUG_ON(recv->r_sge[0].addr != 0);
 
 	rdsdebug("recv %p page %p\n", recv, recv->r_page);
 	__free_page(recv->r_page);
@@ -73,10 +73,10 @@
 				   struct rds_ib_recv_work *recv)
 {
 	rdsdebug("recv %p page %p\n", recv, recv->r_page);
-	BUG_ON(recv->r_sge.addr == 0);
-	dma_unmap_page(ic->i_cm_id->device->dma_device, recv->r_sge.addr,
+	BUG_ON(recv->r_sge[0].addr == 0);
+	dma_unmap_page(ic->i_cm_id->device->dma_device, recv->r_sge[0].addr,
 		       PAGE_SIZE, DMA_FROM_DEVICE);
-	recv->r_sge.addr = 0;
+	recv->r_sge[0].addr = 0;
 }
 
 void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
@@ -87,14 +87,18 @@
 	for(i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
 		recv->r_page = NULL;
 
-		recv->r_sge.addr = 0;
-		recv->r_sge.length = RDS_FRAG_SIZE;
-		recv->r_sge.lkey = ic->i_mr->lkey;
-	 
 		recv->r_wr.next = NULL;
 		recv->r_wr.wr_id = i;
-		recv->r_wr.sg_list = &recv->r_sge;
-		recv->r_wr.num_sge = 1;
+		recv->r_wr.sg_list = recv->r_sge;
+		recv->r_wr.num_sge = 2;
+
+		recv->r_sge[0].length = RDS_FRAG_SIZE;
+		recv->r_sge[0].lkey = ic->i_mr->lkey;
+
+		recv->r_sge[1].addr = ic->i_recv_hdrs_dma +
+				      (i * sizeof(struct rds_header));
+		recv->r_sge[1].length = sizeof(struct rds_header);
+		recv->r_sge[1].lkey = ic->i_mr->lkey;
 	}
 }
 
@@ -104,7 +108,7 @@
 	u32 i;
 
 	for(i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
-		if (recv->r_sge.addr)
+		if (recv->r_sge[0].addr)
 			rds_ib_recv_unmap_page(ic, recv);
 		if (recv->r_page)
 			rds_ib_recv_free_page(recv);
@@ -128,32 +132,6 @@
 	kmem_cache_free(rds_ib_incoming_slab, ibinc);
 }
 
-/*
- * allcates a new rds_ib_incoming and initializes its header from the
- * header at the start of the give page.
- */
-static struct rds_ib_incoming *rds_ib_inc_new(struct rds_connection *conn,
-				  	      struct page *page, gfp_t gfp)
-{
-	struct rds_ib_incoming *ibinc;
-	void *addr;
-
-	ibinc = kmem_cache_alloc(rds_ib_incoming_slab, gfp);
-	/* 
-	 * XXX punt to thread, suggests refactoring core threading
-	 */
-	BUG_ON(ibinc == NULL);
-
-	INIT_LIST_HEAD(&ibinc->ii_pages);
-	rds_inc_init(&ibinc->ii_inc, conn, conn->c_faddr);
-
-	addr = kmap_atomic(page, KM_SOFTIRQ0);
-	memcpy(&ibinc->ii_inc.i_hdr, addr, sizeof(struct rds_header));
-	kunmap_atomic(addr, KM_SOFTIRQ0);
-
-	return ibinc;
-}
-
 void rds_ib_inc_process_acks(struct rds_connection *conn,
 			     struct rds_incoming *inc, u16 nr)
 {
@@ -246,35 +224,55 @@
  * get an orderly stream of message headers and then their data fragments.
  */
 static int rds_ib_process_recv(struct rds_connection *conn,
-			       struct rds_ib_recv_work *recv)
+			       struct rds_ib_recv_work *recv, u32 byte_len)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct rds_ib_incoming *ibinc = ic->i_ibinc;
 	int ret = 0;
 
-	rdsdebug("ic %p recv %p sge len %u\n", ic, recv, recv->r_sge.length);
+	/* XXX shut down the connection if port 0,0 are seen? */
 
+	rdsdebug("ic %p recv %p byte len %u\n", ic, recv, byte_len);
+
 	/*
 	 * If this is just a header allocate an inc to track the upcoming
 	 * message payloads.  We leave the page allocated and mapped so 
 	 * that it can be reposted.
 	 */
 	if (ibinc == NULL) {
-		ibinc = rds_ib_inc_new(conn, recv->r_page, GFP_ATOMIC);
+		ibinc = kmem_cache_alloc(rds_ib_incoming_slab, GFP_ATOMIC);
 		if (ibinc == NULL) {
 			ret = -ENOMEM;
 			goto out;
 		}
+
+		INIT_LIST_HEAD(&ibinc->ii_pages);
+		rds_inc_init(&ibinc->ii_inc, conn, conn->c_faddr);
+
+		if (byte_len <= RDS_FRAG_SIZE) {
+			/* 
+			 * XXX The remainter of the page will need to be zeroed
+			 * if we map it to userspace or flip it into the page
+			 * cache.
+			 */
+			void *addr = kmap_atomic(recv->r_page, KM_SOFTIRQ0);
+			memcpy(&ibinc->ii_inc.i_hdr,
+			       addr + byte_len - sizeof(struct rds_header),
+			       sizeof(struct rds_header));
+			kunmap_atomic(addr, KM_SOFTIRQ0);
+		} else {
+			int i = recv - ic->i_recvs;
+			memcpy(&ibinc->ii_inc.i_hdr, &ic->i_recv_hdrs[i],
+			       sizeof(struct rds_header));
+		}
 		ic->i_ibinc = ibinc;
 		ic->i_recv_data_rem = be32_to_cpu(ibinc->ii_inc.i_hdr.h_len);
-		goto out;
 	}
 
 	rds_ib_recv_unmap_page(ic, recv);
 	list_add_tail(&recv->r_page->lru, &ibinc->ii_pages);
 	recv->r_page = NULL;
 
-	/* XXX use sge len in case sender does weird things */
 	if (ic->i_recv_data_rem > RDS_FRAG_SIZE)
 		ic->i_recv_data_rem -= RDS_FRAG_SIZE;
 	else {
@@ -299,6 +297,7 @@
 	struct ib_wc wc;
 	struct rds_ib_recv_work *recv;
 	u32 avail;
+	int ret;
 
 	rdsdebug("cq %p conn %p\n", cq, conn);
 
@@ -311,7 +310,8 @@
 
 		recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
 		if (wc.status == IB_WC_SUCCESS) {
-			if (rds_ib_process_recv(conn, recv) == -ENOMEM)
+			ret = rds_ib_process_recv(conn, recv, wc.byte_len);
+			if (ret == -ENOMEM)
 				queue_work(rds_wq, &conn->c_recv_work);
 		}
 
@@ -353,14 +353,14 @@
 			}
 
 			/* XXX this can't fail? */
-			recv->r_sge.addr =
+			recv->r_sge[0].addr =
 				dma_map_page(ic->i_cm_id->device->dma_device,
 					     recv->r_page, 0, PAGE_SIZE,
 					     DMA_FROM_DEVICE);
 		}
 
 		rdsdebug("recv %p page %p addr %llu\n", recv, recv->r_page,
-			 recv->r_sge.addr);
+			 recv->r_sge[0].addr);
 			 
 		ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
 		rdsdebug("ret %d\n", ret);

Modified: trunk/linux/net/rds/ib_send.c
===================================================================
--- trunk/linux/net/rds/ib_send.c	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/ib_send.c	2006-07-12 00:26:09 UTC (rev 131)
@@ -28,14 +28,6 @@
 #include "rds.h"
 #include "ib.h"
 
-void rds_ib_xmit_prepare(struct rds_connection *conn)
-{
-}
-
-void rds_ib_xmit_complete(struct rds_connection *conn)
-{
-}
-
 void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
 		          struct rds_ib_send_work *send)
 {
@@ -55,14 +47,19 @@
 	for(i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
 		send->s_rm = NULL;
 
-		send->s_sge.lkey = ic->i_mr->lkey;
-	 
 		send->s_wr.wr_id = i;
-		send->s_wr.sg_list = &send->s_sge;
+		send->s_wr.sg_list = send->s_sge;
 		send->s_wr.num_sge = 1;
 		send->s_wr.opcode = IB_WR_SEND;
 		send->s_wr.send_flags = IB_SEND_SIGNALED;
 		send->s_wr.imm_data = 0;
+
+		send->s_sge[0].lkey = ic->i_mr->lkey;
+
+		send->s_sge[1].addr = ic->i_send_hdrs_dma +
+				      (i * sizeof(struct rds_header));
+		send->s_sge[1].length = sizeof(struct rds_header);
+		send->s_sge[1].lkey = ic->i_mr->lkey;
 	}
 }
 
@@ -101,6 +98,7 @@
 
 		if (send->s_rm)
 			rds_ib_send_unmap_rm(ic, send);
+		send->s_wr.num_sge = 1;
 
 		rds_ib_ring_free(&ic->i_send_ring, 1);
 
@@ -116,51 +114,6 @@
 	}
 }
 
-int rds_ib_xmit_header(struct rds_connection *conn,
-			struct rds_message *rm, unsigned int off)
-{
-	struct rds_ib_connection *ic = conn->c_transport_data;
-	struct rds_ib_send_work *send;
-	struct rds_header *hdr;
-	struct ib_send_wr *failed_wr;
-	u32 pos;
-	int ret;
-
-	BUG_ON(off != 0);
-
-	if (!rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos)) {
-		ret = -EAGAIN;
-		goto out;
-	}
-
-	send = &ic->i_sends[pos];
-	hdr = &ic->i_headers[pos];
-
-	send->s_sge.addr = ic->i_headers_dma +
-				((char *)hdr - (char *)ic->i_headers);
-	send->s_sge.length = sizeof(struct rds_header);
-	send->s_wr.next = NULL;
-
-	memcpy(hdr, &rm->m_inc.i_hdr, sizeof(struct rds_header));
-
-	/*
-	 * XXX For now we're sending these as their own wr.  The receiver
-	 * knows from the state of the RC that they'll be a header.  This
-	 * means small messages take two work reqs, which isn't so nice,
-	 */ 
-	ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
-	rdsdebug("ic %p send %p ret %d wr %p\n", ic, send, ret, failed_wr);
-	if (ret >= 0)
-		ret = sizeof(struct rds_header);
-	else {
-		/* XXX disconnect and reconnect on error? */
-		rds_ib_ring_unalloc(&ic->i_send_ring, 1);
-
-	}
-out:
-	return ret;
-}
-
 /*
  * This can be called multiple times for a given message.  The first time
  * we see a message we map its scatterlist into the IB device so that
@@ -170,9 +123,8 @@
  * in order so we pass ownership of the message to the completion handler
  * once we send the final fragment. 
  */
-int rds_ib_xmit_data(struct rds_connection *conn,
-		      struct rds_message *rm, unsigned int sg,
-		      unsigned int off)
+int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
+	        unsigned int hdr_off, unsigned int sg, unsigned int off)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct rds_ib_send_work *send = NULL;
@@ -188,9 +140,10 @@
 	int ret;
 
 	BUG_ON(off % RDS_FRAG_SIZE);
+	BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
 
 	/* map the message the first time we see it */
-	if (ic->i_rm == NULL) {
+	if (ic->i_rm == NULL && rm->m_nents) {
 		ret = dma_map_sg(ic->i_cm_id->device->dma_device, rm->m_sg,
 				 rm->m_nents, DMA_TO_DEVICE);
 		rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, ret);
@@ -206,34 +159,63 @@
 		ic->i_rm = rm;
 	}
 
-	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 
-			ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE),
-			&pos);
+	if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
+		i = 1;
+	else
+		i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
+
+	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
 	if (work_alloc == 0) {
 		ret = -EAGAIN;
 		goto out;
 	}
 
-	first = &ic->i_sends[pos];
+	send = &ic->i_sends[pos];
+	first = send;
 	prev = NULL;
 	scat = &rm->m_sg[sg];
 	sent = 0;
+	i = 0;
 
-	for(i = 0; i < work_alloc && scat != &rm->m_sg[rm->m_nents]; i++) {
+	/*
+	 * We could be copying the header into the unused tail of the page.
+	 * That would need to be changed in the future when those pages might
+	 * be mapped userspace pages or page cache pages.  So instead we always
+	 * use a second sge and our long-lived ring of mapped headers.  We send
+	 * the header after the data so that the data payload can be aligned on
+	 * the receiver.
+	 */
+	if (hdr_off == 0) {
+		memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr,
+		       sizeof(struct rds_header));
+		sent += sizeof(struct rds_header);
+		first->s_wr.num_sge = 2;
+	}
 
-		len = min(RDS_FRAG_SIZE, sg_dma_len(scat) - off);
+	/* handle a 0-len message */
+	if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) {
+		first->s_sge[0].addr = 0;
+		first->s_sge[0].length = 0;
+		first->s_wr.next = NULL;
+	      	/* as though we walked the scatter list */
+		i = 1;
+		prev = first;
+	}
 
-		send = &ic->i_sends[pos];
-		send->s_sge.addr = sg_dma_address(scat) + off;
-		send->s_sge.length = len;
+	/* if there's data reference it with a chain of work reqs */
+	for(; i < work_alloc && scat != &rm->m_sg[rm->m_nents]; i++) {
+
 		send->s_wr.next = NULL;
 		if (prev)
 			prev->s_wr.next = &send->s_wr;
-		send->s_wr.wr_id = send - ic->i_sends;
 
-		rdsdebug("pos %u send %p wr %p next %p\n",
-			 pos, send, &send->s_wr, send->s_wr.next);
+		len = min(RDS_FRAG_SIZE, sg_dma_len(scat) - off);
+		send->s_sge[0].addr = sg_dma_address(scat) + off;
+		send->s_sge[0].length = len;
 
+		rdsdebug("send %p wr %p num_sge %u next %p\n", send,
+			 &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
+
 		sent += len;
 		off += len;
 		if (off == sg_dma_len(scat)) {
@@ -241,14 +223,14 @@
 			off = 0;
 		}
 
-		pos++;
-		if (pos == ic->i_send_ring.w_nr)
-			pos = 0;
+		prev = send;
+		if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
+			send = ic->i_sends;
 	}
 
 	/* if we finished the message then send completion owns it */
 	if (scat == &rm->m_sg[rm->m_nents]) {
-		send->s_rm = ic->i_rm;
+		prev->s_rm = ic->i_rm;
 		ic->i_rm = NULL;
 	}
 
@@ -267,9 +249,9 @@
 		printk(KERN_WARNING "RDS/IB: ib_post_send to %u.%u.%u.%u "
 		       "returned %d\n", NIPQUAD(conn->c_faddr), ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
-		if (send->s_rm) {
-			ic->i_rm = send->s_rm;
-			send->s_rm = NULL;
+		if (prev->s_rm) {
+			ic->i_rm = prev->s_rm;
+			prev->s_rm = NULL;
 		}
 		goto out;
 	}

Modified: trunk/linux/net/rds/loop.c
===================================================================
--- trunk/linux/net/rds/loop.c	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/loop.c	2006-07-12 00:26:09 UTC (rev 131)
@@ -55,21 +55,21 @@
  * flows to the receiver.  In the loopback case, though, the receive path
  * is handed the sending conn so the sense of the addresses is reversed.
  */
-static int rds_loop_xmit(struct rds_connection *conn)
+static int rds_loop_xmit(struct rds_connection *conn, struct rds_message *rm,
+			 unsigned int hdr_off, unsigned int sg,
+			 unsigned int off)
 {
-	struct rds_message *rm;
+	BUG_ON(hdr_off || sg || off);
 
-	while (rds_send_get_next_message(conn, &rm) > 0) {
-		rds_inc_init(&rm->m_inc, conn, conn->c_laddr);
-		rds_message_addref(rm); /* for the inc */
+	rds_inc_init(&rm->m_inc, conn, conn->c_laddr);
+	rds_message_addref(rm); /* for the inc */
 
-		rds_recv_incoming(conn, conn->c_laddr, conn->c_faddr,
-				 &rm->m_inc, GFP_KERNEL, KM_USER0);
+	rds_recv_incoming(conn, conn->c_laddr, conn->c_faddr, &rm->m_inc,
+			  GFP_KERNEL, KM_USER0);
 
-		rds_inc_put(&rm->m_inc);
-		rds_send_put_next_message(conn, rm, 1);
-	}
-	return 0;
+	rds_inc_put(&rm->m_inc);
+
+	return sizeof(struct rds_header) + be32_to_cpu(rm->m_inc.i_hdr.h_len);
 }
 
 /* we need to at least give the thread something to succeed */

Modified: trunk/linux/net/rds/rds.h
===================================================================
--- trunk/linux/net/rds/rds.h	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/rds.h	2006-07-12 00:26:09 UTC (rev 131)
@@ -141,11 +141,12 @@
 /**
  * struct rds_transport -  transport specific behavioural hooks
  *
- * @xmit: xmit advances the connection's send queue.  Transports will often
- *        set xmit to rds_send_xmit() which uses xmit_prepare and xmit_complete
- *        to fragment the messages in the send queue and deals with sending
- *        partial messages between xmit calls.  The caller serializes on the
- *        send_sem so this doesn't need to be reentrant for a given conn.
+ * @xmit: .xmit is called by rds_send_xmit() to tell the transport to send
+ *        part of a message.  The caller serializes on the send_sem so this
+ *        doesn't need to be reentrant for a given conn.  The header must be
+ *        sent before the data payload.  .xmit must be prepared to send a
+ *        message with no data payload.  .xmit should return the number of
+ *        bytes that were sent down the connection, including header bytes.
  *        Returning 0 tells the caller that it doesn't need to perform any
  *        additional work now.  This is usually the case when the transport has
  *        filled the sending queue for its connection and will handle
@@ -154,9 +155,6 @@
  *        immediately.  Returning -ENOMEM tells the caller to retry the send at
  *        some point in the future.
  *
- * @xmit_data: This will not be called for a zero length message which does
- * 	       not have any payload.
- *
  * @conn_shutdown: conn_shutdown stops traffic on the given connection.  Once
  *                 it returns the connection can not call rds_recv_incoming().
  *                 This will only be called once after conn_connect returns
@@ -173,14 +171,10 @@
 	void (*conn_free)(void *data);
 	int (*conn_connect)(struct rds_connection *conn);
 	void (*conn_shutdown)(struct rds_connection *conn);
-	int (*xmit)(struct rds_connection *conn);
 	void (*xmit_prepare)(struct rds_connection *conn);
 	void (*xmit_complete)(struct rds_connection *conn);
-	int (*xmit_header)(struct rds_connection *conn,
-			   struct rds_message *rm, unsigned int off);
-	int (*xmit_data)(struct rds_connection *conn,
-			 struct rds_message *rm, unsigned int sg,
-			 unsigned int off);
+	int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
+		    unsigned int hdr_off, unsigned int sg, unsigned int off);
 	int (*recv)(struct rds_connection *conn);
 	int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov,
 				size_t size);
@@ -332,10 +326,6 @@
 int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
 		size_t payload_len);
 void rds_send_reset(struct rds_connection *conn);
-int rds_send_get_next_message(struct rds_connection *conn,
-			      struct rds_message **rm);
-void rds_send_put_next_message(struct rds_connection *conn,
-			       struct rds_message *rm, int complete);
 int rds_send_xmit(struct rds_connection *conn);
 
 /* stats.c */

Modified: trunk/linux/net/rds/send.c
===================================================================
--- trunk/linux/net/rds/send.c	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/send.c	2006-07-12 00:26:09 UTC (rev 131)
@@ -23,53 +23,6 @@
 
 #include "rds.h"
 
-/*
- * This gets a reference to the first message in the send queue and leaves
- * it on the send queue.  This gives a transport's sending path a chance
- * to process an entire message.  This is only suitable for transports
- * that will either send the entire message or none at all.  It doesn't
- * allow for registering a partial send to be continued in the future.
- *
- * rds_send_put_next_message() must be called to drop the reference that
- * this function returns. 
- */
-int rds_send_get_next_message(struct rds_connection *conn,
-			      struct rds_message **rm)
-{
-	unsigned long flags;
-	int ret = 0;
-
-	spin_lock_irqsave(&conn->c_lock, flags);
-	if (!list_empty(&conn->c_send_queue)) {
-		*rm = list_entry(conn->c_send_queue.next, struct rds_message,
-				m_conn_item);
-		rds_message_addref(*rm);
-		ret = 1;
-	}
-	spin_unlock_irqrestore(&conn->c_lock, flags);
-
-	return ret;
-}
-
-/*
- * Called when a transport is done sending an entire message that was
- * returned from rds_send_get_next_message().  If 'complete' is non-zero
- * then the transport is indicating that it has reliably sent the message
- * which should now be moved to the retransmit queue.
- */
-void rds_send_put_next_message(struct rds_connection *conn,
-			       struct rds_message *rm, int complete)
-{
-	unsigned long flags;
-
-	spin_lock_irqsave(&conn->c_lock, flags);
-	if (complete && !list_empty(&rm->m_flow_item))
-		list_move_tail(&rm->m_conn_item, &conn->c_retrans);
-	spin_unlock_irqrestore(&conn->c_lock, flags);
-
-	rds_message_put(rm);
-}
-
 /* must be serialized with sending which uses the c_xmit_rm ref */
 void rds_send_reset(struct rds_connection *conn)
 {
@@ -120,10 +73,15 @@
 		goto out;
 	}
 
+#if 0
+	/* XXX this is disabled pending refactoring so that IB can
+	 * post ack messages from its completion handlers */
 	/* this can kick send again if allocation fails */
 	rds_ack_build(conn, GFP_KERNEL, KM_USER0);
+#endif
 
-	conn->c_trans->xmit_prepare(conn);
+	if (conn->c_trans->xmit_prepare)
+		conn->c_trans->xmit_prepare(conn);
 
 	/* 
 	 * spin trying to push headers and data down the connection until
@@ -179,25 +137,23 @@
 			rm = conn->c_xmit_rm;
 		spin_unlock_irqrestore(&conn->c_lock, flags);
 
-		if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) {
-			ret = conn->c_trans->xmit_header(conn, rm,
-					                 conn->c_xmit_hdr_off);
-			if (ret <= 0)
-				break;
-			BUG_ON(ret > sizeof(struct rds_header));
-			conn->c_xmit_hdr_off += ret;
-			BUG_ON(conn->c_xmit_hdr_off >
-			       sizeof(struct rds_header));
-		}
-
-		if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
+		if (conn->c_xmit_hdr_off < sizeof(struct rds_header) ||
 		    conn->c_xmit_sg < rm->m_nents) {
-			ret = conn->c_trans->xmit_data(conn, rm, 
-						       conn->c_xmit_sg,
-						       conn->c_xmit_data_off);
+			ret = conn->c_trans->xmit(conn, rm,
+						  conn->c_xmit_hdr_off,
+						  conn->c_xmit_sg,
+						  conn->c_xmit_data_off);
 			if (ret <= 0)
 				break;
 
+			if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) {
+				tmp = min_t(int, ret,
+					    sizeof(struct rds_header) -
+					    conn->c_xmit_hdr_off);
+				conn->c_xmit_hdr_off += tmp;
+				ret -= tmp;
+			}
+
 			sg = &rm->m_sg[conn->c_xmit_sg];
 			while (ret) {
 				tmp = min_t(int, ret, sg->length -
@@ -213,10 +169,10 @@
 				}
 			}
 		}
-
 	}
 
-	conn->c_trans->xmit_complete(conn);
+	if (conn->c_trans->xmit_complete)
+		conn->c_trans->xmit_complete(conn);
 
 	up(&conn->c_send_sem);
 out:

Modified: trunk/linux/net/rds/tcp.c
===================================================================
--- trunk/linux/net/rds/tcp.c	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/tcp.c	2006-07-12 00:26:09 UTC (rev 131)
@@ -136,11 +136,9 @@
 
 struct rds_transport rds_tcp_transport = {
 	.laddr_check		= rds_tcp_laddr_check,
-	.xmit			= rds_send_xmit,
 	.xmit_prepare		= rds_tcp_xmit_prepare,
 	.xmit_complete		= rds_tcp_xmit_complete,
-	.xmit_header		= rds_tcp_xmit_header,
-	.xmit_data		= rds_tcp_xmit_data,
+	.xmit			= rds_tcp_xmit,
 	.recv			= rds_tcp_recv,
 	.conn_alloc		= rds_tcp_conn_alloc,
 	.conn_free		= rds_tcp_conn_free,

Modified: trunk/linux/net/rds/tcp.h
===================================================================
--- trunk/linux/net/rds/tcp.h	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/tcp.h	2006-07-12 00:26:09 UTC (rev 131)
@@ -65,11 +65,8 @@
 /* tcp_send.c */
 void rds_tcp_xmit_prepare(struct rds_connection *conn);
 void rds_tcp_xmit_complete(struct rds_connection *conn);
-int rds_tcp_xmit_header(struct rds_connection *conn,
-			struct rds_message *rm, unsigned int off);
-int rds_tcp_xmit_data(struct rds_connection *conn,
-		      struct rds_message *rm, unsigned int sg,
-		      unsigned int off);
+int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
+	         unsigned int hdr_off, unsigned int sg, unsigned int off);
 void rds_tcp_write_space(struct sock *sk);
 
 #endif

Modified: trunk/linux/net/rds/tcp_send.c
===================================================================
--- trunk/linux/net/rds/tcp_send.c	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/tcp_send.c	2006-07-12 00:26:09 UTC (rev 131)
@@ -79,14 +79,22 @@
 }
 
 /* the core send_sem serializes this with other xmit and shutdown */
-int rds_tcp_xmit_data(struct rds_connection *conn,
-		      struct rds_message *rm, unsigned int sg,
-		      unsigned int off)
+int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
+	         unsigned int hdr_off, unsigned int sg, unsigned int off)
 {
 	struct rds_tcp_connection *tc = conn->c_transport_data;
 	int done = 0;
 	int ret = 0;
 
+	if (hdr_off < sizeof(struct rds_header)) {
+		ret = rds_tcp_xmit_header(conn, rm, hdr_off);
+		if (ret < 0)
+			goto out;
+		done += ret;
+		if (hdr_off + done != sizeof(struct rds_header))
+			goto out;
+	}
+
 	while (sg < rm->m_nents) {
 		ret = tc->t_sock->ops->sendpage(tc->t_sock, 
 						rm->m_sg[sg].page,
@@ -116,6 +124,7 @@
 		}
 	}
 
+out:
 	if (done == 0)
 		done = ret;
 	return done;

Modified: trunk/linux/net/rds/threads.c
===================================================================
--- trunk/linux/net/rds/threads.c	2006-07-10 23:10:34 UTC (rev 130)
+++ trunk/linux/net/rds/threads.c	2006-07-12 00:26:09 UTC (rev 131)
@@ -111,7 +111,7 @@
 	int ret;
 
 	if (test_bit(RDS_CONN_CONNECTED, &conn->c_status)) {
-		ret = conn->c_trans->xmit(conn);
+		ret = rds_send_xmit(conn);
 		rdsdebug("sending to conn %p returned %d\n", conn, ret);
 		switch (ret) {
 			case -EAGAIN:




More information about the rds-commits mailing list