[rds-devel] RDS RDMA implementation

Vladimir Sokolovsky vlad at mellanox.co.il
Tue Sep 4 08:17:05 PDT 2007


Hi,
Here is the drop of rdma_xmit implementation and bug fixes.
Please review.

    RDS: Added rds_ib_xmit_rdma function
    
    setup_qp: open QP with max available sge
    Added r_key to rds_rdma_args structure
    Set page dirty flag after RDMA READ operation
    Use same PD and MR for all QPs per device
    
    Signed-off-by: Vladimir Sokolovsky <vlad at mellanox.co.il>

diff --git a/net/rds/ib.c b/net/rds/ib.c
index 7548320..cd741ee 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -47,16 +47,16 @@ MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr
per HCA");
 
 struct list_head rds_ib_devices;
 
-static void rds_ib_add_one(struct ib_device *device);
-static void rds_ib_remove_one(struct ib_device *device);
+void rds_ib_add_one(struct ib_device *device);
+void rds_ib_remove_one(struct ib_device *device);
 
-static struct ib_client rds_ib_client = {
+struct ib_client rds_ib_client = {
 	.name   = "rds_ib",
 	.add    = rds_ib_add_one,
 	.remove = rds_ib_remove_one
 };
 
-static void rds_ib_add_one(struct ib_device *device)
+void rds_ib_add_one(struct ib_device *device)
 {
 	struct rds_ib_device *rds_ibdev;
 	struct ib_device_attr *dev_attr;
@@ -77,6 +77,8 @@ static void rds_ib_add_one(struct ib_device *device)
 
 	spin_lock_init(&rds_ibdev->spinlock);
 
+	rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
+
 	rds_ibdev->fmr_page_shift = max(9, ffs(dev_attr->page_size_cap)
- 1);
 	rds_ibdev->fmr_page_size  = 1 << rds_ibdev->fmr_page_shift;
 	rds_ibdev->fmr_page_mask  = ~((u64) rds_ibdev->fmr_page_size -
1);
@@ -126,7 +128,7 @@ free_attr:
 	kfree(dev_attr);
 }
 
-static void rds_ib_remove_one(struct ib_device *device)
+void rds_ib_remove_one(struct ib_device *device)
 {
 	struct rds_ib_device *rds_ibdev;
 	struct rds_ib_ipaddr *i_ipaddr;
@@ -191,6 +193,7 @@ struct rds_transport rds_ib_transport = {
 	.laddr_check		= rds_ib_laddr_check,
 	.xmit			= rds_ib_xmit,
 	.xmit_cong_map		= rds_ib_xmit_cong_map,
+	.xmit_rdma		= rds_ib_xmit_rdma,
 	.recv			= rds_ib_recv,
 	.conn_alloc		= rds_ib_conn_alloc,
 	.conn_free		= rds_ib_conn_free,
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 9ccefaa..bb66cf1 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -13,6 +13,8 @@
 #define RDS_FMR_SIZE			256
 #define RDS_FMR_POOL_SIZE		1024
 
+#define RDS_IB_MAX_SGE			32
+
 /*
  * IB posts RDS_FRAG_SIZE fragments of pages to the receive queues to 
  * try and minimize the amount of memory tied up both the device and
@@ -38,8 +40,9 @@ struct rds_ib_connect_private {
 
 struct rds_ib_send_work {
 	struct rds_message	*s_rm;
+	struct rds_rdma_op	*s_op;
 	struct ib_send_wr	s_wr;
-	struct ib_sge		s_sge[2];
+	struct ib_sge		s_sge[RDS_IB_MAX_SGE];
 };
 
 struct rds_ib_recv_work {
@@ -118,6 +121,7 @@ struct rds_ib_device {
 	int			fmr_page_shift;
 	int			fmr_page_size;
 	u64			fmr_page_mask;
+	int			max_sge;
 	spinlock_t		spinlock;
 };
 
@@ -142,6 +146,9 @@ extern struct workqueue_struct *rds_ib_wq;
 
 /* ib.c */
 extern struct rds_transport rds_ib_transport;
+extern void rds_ib_add_one(struct ib_device *device);
+extern void rds_ib_remove_one(struct ib_device *device);
+extern struct ib_client rds_ib_client;
 
 /* ib_cm.c */
 int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp);
@@ -191,6 +198,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection
*ic);
 void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
 int rds_ib_xmit_cong_map(struct rds_connection *conn,
 			 struct rds_cong_map *map, unsigned long
offset);
+int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op
*op);
 
 /* ib_stats.c */
 RDS_DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 89e2c48..8e2324a 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -80,9 +80,12 @@ static int rds_ib_setup_qp(struct rds_connection
*conn)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct ib_qp_init_attr attr;
+	struct rds_ib_device *rds_ibdev;
 	int ret;
 
-	ic->i_pd = ib_alloc_pd(ic->i_cm_id->device);
+	rds_ibdev = ib_get_client_data(ic->i_cm_id->device,
&rds_ib_client);
+
+	ic->i_pd = rds_ibdev->pd;
 	if (IS_ERR(ic->i_pd)) {
 		ret = PTR_ERR(ic->i_pd);
 		ic->i_pd = NULL;
@@ -90,7 +93,7 @@ static int rds_ib_setup_qp(struct rds_connection
*conn)
 		goto out;
 	}
 
-	ic->i_mr = ib_get_dma_mr(ic->i_pd, IB_ACCESS_LOCAL_WRITE);
+	ic->i_mr = rds_ibdev->mr;
 	if (IS_ERR(ic->i_mr)) {
 		ret = PTR_ERR(ic->i_mr);
 		ic->i_mr = NULL;
@@ -139,8 +142,8 @@ static int rds_ib_setup_qp(struct rds_connection
*conn)
 	/* + 1 to allow for the single ack message */
 	attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1;
 	attr.cap.max_recv_wr = ic->i_recv_ring.w_nr + 1;
-	attr.cap.max_send_sge = 2;
-	attr.cap.max_recv_sge = 2;
+	attr.cap.max_send_sge = rds_ibdev->max_sge;
+	attr.cap.max_recv_sge = rds_ibdev->max_sge;
 	attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 	attr.qp_type = IB_QPT_RC;
 	attr.send_cq = ic->i_send_cq;
@@ -221,6 +224,7 @@ static int rds_ib_cm_handle_connect(struct
rdma_cm_id *cm_id,
 	struct rds_connection *conn;
 	struct rds_ib_connection *ic;
 	struct rdma_conn_param conn_param;
+	struct rds_ib_device *rds_ibdev;
 	int ret;
 
 	rdsdebug("saddr %u.%u.%u.%u daddr %u.%u.%u.%u lguid 0x%llx fguid
"
@@ -287,8 +291,8 @@ static int rds_ib_cm_handle_connect(struct
rdma_cm_id *cm_id,
 	cm_id = NULL;
 
 	/* update ib_device with this local ipaddr */
-#warning "should this use ib_get_client_data() to get the
rds_ib_device?"
-	ib_update_ipaddr_for_device(ic->i_cm_id->device, dp->dp_saddr);
+	rds_ibdev = ib_get_client_data(ic->i_cm_id->device,
&rds_ib_client);
+	ib_update_ipaddr_for_device(rds_ibdev, dp->dp_saddr);
 
 out:
 	if (ret && cm_id)
@@ -504,10 +508,6 @@ void rds_ib_conn_shutdown(struct rds_connection
*conn)
 			ib_destroy_cq(ic->i_send_cq);
 		if (ic->i_recv_cq)
 			ib_destroy_cq(ic->i_recv_cq);
-		if (ic->i_mr)
-			ib_dereg_mr(ic->i_mr);
-		if (ic->i_pd)
-			ib_dealloc_pd(ic->i_pd);
 		rdma_destroy_id(ic->i_cm_id);
 
 		ic->i_cm_id = NULL;
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index b0b2ad6..3629212 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -36,6 +36,7 @@
 #include <linux/dmapool.h>
 
 #include "rds.h"
+#include "rdma.h"
 #include "ib.h"
 
 void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
@@ -56,6 +57,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection
*ic)
 
 	for(i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++,
send++) {
 		send->s_rm = NULL;
+		send->s_op = NULL;
 
 		send->s_wr.wr_id = i;
 		send->s_wr.sg_list = send->s_sge;
@@ -125,8 +127,23 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq,
void *context)
 		completed = rds_ib_ring_completed(&ic->i_send_ring,
wc.wr_id, oldest);
 
 		for (i = 0; i < completed; i++) {
-			if (send->s_rm)
-				rds_ib_send_unmap_rm(ic, send);
+			if (wc.opcode == IB_WC_SEND) {
+				if (send->s_rm)
+					rds_ib_send_unmap_rm(ic, send);
+			}
+			else if (wc.opcode == IB_WR_RDMA_WRITE) {
+				if (send->s_op)
+
dma_unmap_sg(ic->i_cm_id->device->dma_device,
+						send->s_op->r_sg,
send->s_op->r_nents,
+						DMA_TO_DEVICE);
+			}
+			else if (wc.opcode == IB_WR_RDMA_READ) {
+				if (send->s_op)
+
dma_unmap_sg(ic->i_cm_id->device->dma_device,
+						send->s_op->r_sg,
send->s_op->r_nents,
+						DMA_FROM_DEVICE);
+			}
+
 			send->s_wr.num_sge = 1;
 			if (++send ==
&ic->i_sends[ic->i_send_ring.w_nr])
 				send = ic->i_sends;
@@ -415,3 +432,138 @@ int rds_ib_xmit(struct rds_connection *conn,
struct rds_message *rm,
 out:
 	return ret;
 }
+
+int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op
*op)
+{
+	struct rds_ib_connection *ic = conn->c_transport_data;
+	struct rds_ib_send_work *send = NULL;
+	struct rds_ib_send_work *first;
+	struct rds_ib_send_work *prev;
+	struct ib_send_wr *failed_wr;
+	struct rds_ib_device *rds_ibdev;
+	struct scatterlist *scat;
+	unsigned long len;
+	static u32 unsignaled_wrs_count = 0;
+	u64 remote_addr = op->remote_addr;
+	u32 pos;
+	u32 work_alloc;
+	u32 i;
+	u32 j;
+	int sent;
+	int ret;
+	int num_sge;
+
+	rds_ibdev = ib_get_client_data(ic->i_cm_id->device,
&rds_ib_client);
+
+	/* map the message the first time we see it */
+	op->r_count = dma_map_sg(ic->i_cm_id->device->dma_device,
+					op->r_sg, op->r_nents,
(op->r_write) ?
+					DMA_TO_DEVICE :
DMA_FROM_DEVICE);
+	rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->r_count);
+	if (op->r_count == 0) {
+		rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
+		ret = -ENOMEM; /* XXX ? */
+		goto out;
+	}
+
+	/*
+	 * Instead of knowing how to return a partial rdma read/write we
insist that there
+	 * be enough work requests to send the entire message.
+	 */
+	if ( op->r_count < rds_ibdev->max_sge ) {
+		i = 1;
+		num_sge = op->r_count;
+	}
+	else {
+		i = ceil(rds_ibdev->max_sge, op->r_count);
+		num_sge = rds_ibdev->max_sge;
+	}
+
+	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
+	if (work_alloc != i) {
+		rds_ib_stats_inc(s_ib_tx_ring_full);
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	send = &ic->i_sends[pos];
+	first = send;
+	prev = NULL;
+	scat = &op->r_sg[0];
+	sent = 0;
+
+	for ( i = 0; i < work_alloc && scat != &op->r_sg[op->r_count];
i++ ) {
+		++unsignaled_wrs_count;
+		/*
+		 * We want to delay signaling completions just enough to
get
+		 * the batching benefits but not so much that we create
dead time on the wire.
+		 */
+		if ( rds_ib_sysctl_max_unsig_wrs > 0 &&
unsignaled_wrs_count >= rds_ib_sysctl_max_unsig_wrs ) {
+			unsignaled_wrs_count = 0;
+			send->s_wr.send_flags = IB_SEND_SIGNALED |
IB_SEND_SOLICITED;
+		}
+		else
+			send->s_wr.send_flags = 0;
+
+		send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE :
IB_WR_RDMA_READ;
+		send->s_wr.wr.rdma.remote_addr = remote_addr;
+		send->s_wr.wr.rdma.rkey = op->r_key;
+
+		if (num_sge > rds_ibdev->max_sge) {
+			send->s_wr.num_sge = rds_ibdev->max_sge;
+			num_sge -= rds_ibdev->max_sge;
+		}
+		else
+			send->s_wr.num_sge = num_sge;
+
+		send->s_wr.next = NULL;
+
+		if (prev)
+			prev->s_wr.next = &send->s_wr;
+
+		for ( j = 0; j < num_sge && scat !=
&op->r_sg[op->r_count]; j++ ) {
+			len = sg_dma_len(scat);
+			send->s_sge[j].addr = sg_dma_address(scat);
+			send->s_sge[j].length = len;
+
+			sent += len;
+			rdsdebug("ic %p sent %d remote_addr %llu\n", ic,
sent, remote_addr);
+
+			remote_addr += sg_dma_len(scat);
+			scat++;
+		}
+
+		rdsdebug("send %p wr %p num_sge %u next %p\n", send,
+			&send->s_wr, send->s_wr.num_sge,
send->s_wr.next);
+
+		prev = send;
+		if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
+			send = ic->i_sends;
+	}
+
+	/* if we finished the message then send completion owns it */
+	if (scat == &op->r_sg[op->r_count]) {
+		prev->s_wr.send_flags = IB_SEND_SIGNALED |
IB_SEND_SOLICITED;
+		prev->s_op = op;
+	}
+
+	if (i < work_alloc) {
+		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
+		work_alloc = i;
+	}
+
+	failed_wr = &first->s_wr;
+	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
+	rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
+		 first, &first->s_wr, ret, failed_wr);
+	BUG_ON(failed_wr != &first->s_wr);
+	if (ret) {
+		printk(KERN_WARNING "RDS/IB: rdma ib_post_send to
%u.%u.%u.%u "
+		       "returned %d\n", NIPQUAD(conn->c_faddr), ret);
+		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		goto out;
+	}
+
+out:
+	return ret;
+}
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index ca181b0..f520f8a 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -65,6 +65,7 @@ struct rds_free_mr_args {
 
 struct rds_rdma_args {
 	struct rds_iovec	remote_vec;
+	u64			r_key;
 	u64			local_vec_addr;
 	u64			nr_local;
 	u64			flags;
@@ -256,7 +257,8 @@ int rds_get_mr(struct rds_sock *rs, char __user
*optval, int optlen)
 	ret = 0;
 out:
 	kfree(pages);
-	rds_mr_put(rs, mr);
+	if (mr)
+		rds_mr_put(rs, mr);
 	return ret;
 }
 
@@ -264,8 +266,8 @@ int rds_free_mr(struct rds_sock *rs, char __user
*optval, int optlen)
 {
 	struct rds_mr *mr;
 	struct rds_free_mr_args args;
-	void *trans_private;
-	int ret;
+	void *trans_private = NULL;
+	int ret = 0;
 
 	if (optlen != sizeof(struct rds_free_mr_args)) {
 		ret = -EINVAL;
@@ -301,8 +303,9 @@ int rds_free_mr(struct rds_sock *rs, char __user
*optval, int optlen)
 			args.flags & RDS_FREE_MR_ARGS_INVALIDATE ? 1 :
0,
 			mr->r_sg, mr->r_nents);
 
-	rds_mr_put(rs, mr);
-	ret = 0;
+	if (mr)
+		rds_mr_put(rs, mr);
+
 out:
 	return ret;
 }
@@ -316,7 +319,7 @@ int rds_barrier(struct rds_sock *rs, char __user
*optval, int optlen)
 	u64 next_seq;
 
 	if (optval) {
-		if (optval != sizeof(__be32)) {
+		if (optlen != sizeof(__be32)) {
 			ret = -EINVAL;
 			goto out;
 		}
@@ -356,8 +359,11 @@ void rds_rdma_free_op(struct rds_rdma_op *ro)
 {
 	unsigned int i;
 
-	for (i = 0; i < ro->r_nents; i++)
+	for (i = 0; i < ro->r_nents; i++) {
+		if (!ro->r_write)
+			set_page_dirty_lock(ro->r_sg[i].page);
 		put_page(ro->r_sg[i].page);
+	}
 
 	kfree(ro);
 }
@@ -426,6 +432,8 @@ static struct rds_rdma_op *rds_rdma_prepare(struct
rds_sock *rs,
 	}
 
 	op->r_write = args->flags & RDS_RDMA_ARGS_WRITE ? 1 : 0;
+	op->remote_addr = args->remote_vec.addr;
+	op->r_key = args->r_key;
 
 	nr_bytes = 0;
 
@@ -498,20 +506,27 @@ int rds_rdma_msghdr_parse(struct rds_sock *rs,
struct rds_message *rm,
 			  struct msghdr *msg)
 {
 	struct rds_rdma_op *op;
-        struct cmsghdr *cmsg;
+	struct cmsghdr *cmsg;
 	int ret = -EINVAL;
 
-        for (cmsg = CMSG_FIRSTHDR(msg);
-             cmsg != NULL;
-             cmsg = CMSG_NXTHDR((struct msghdr*)msg, cmsg)) {
-                if (!CMSG_OK(msg, cmsg))
+	cmsg = CMSG_FIRSTHDR(msg);
+
+	/* Not an rdma header */
+	if (cmsg == NULL) {
+	        ret = 0;
+	        goto out;
+	}
+
+	for ( ; cmsg != NULL;
+		cmsg = CMSG_NXTHDR((struct msghdr*)msg, cmsg)) {
+		if (!CMSG_OK(msg, cmsg))
 			break;
 
-                if (cmsg->cmsg_level != SOL_RDS)
-                        continue;
+		if (cmsg->cmsg_level != SOL_RDS)
+			continue;
 
 		if ((cmsg->cmsg_type != RDS_CMSG_RDMA_ARGS) ||
-		    (cmsg->cmsg_len != sizeof(struct rds_rdma_args))) 
+			(cmsg->cmsg_len != sizeof(struct
rds_rdma_args))) 
 			break;
 
 		op = rds_rdma_prepare(rs, CMSG_DATA(cmsg));
@@ -524,6 +539,7 @@ int rds_rdma_msghdr_parse(struct rds_sock *rs,
struct rds_message *rm,
 		break;
 	}
 
+out:
 	return ret;
 }
 
diff --git a/net/rds/rdma.h b/net/rds/rdma.h
index 88b7de3..df55c6e 100644
--- a/net/rds/rdma.h
+++ b/net/rds/rdma.h
@@ -20,6 +20,7 @@ struct rds_mr {
 
 struct rds_rdma_op {
 	u64			r_key;
+	u64			remote_addr;
 	unsigned		r_write:1;
 	unsigned int		r_nents;
 	unsigned int		r_count;



More information about the rds-devel mailing list