[rds-devel] iWARP RDMA patch

Jon Mason jon at opengridcomputing.com
Thu Nov 6 08:46:07 PST 2008


Hey Andy,
I migrated my current patch set to your new code location, and I wanted
to do a quick code dump before I start working on the higher priority
code, iWARP loopback.

It is mostly changes to refer to the correct values, and some code
cleanup.  The most significant code change is a migration from a qp
reference in the rds_ib_mr struct.  This enables the fastreg and
invalidate work requests to be on the ic->i_send_ring (thus removing the
potential adapter send queue overflow).  Unfortunately the code that
migrated the fastreg and invalidate to the i_send_ring is not working
(so i have excluded it).

Thanks,
Jon

Signed-Off-By: Jon Mason <jon at opengridcomputing.com>

diff --git a/drivers/infiniband/ulp/rds/iw.c b/drivers/infiniband/ulp/rds/iw.c
index 1b2946f..87f8918 100644
--- a/drivers/infiniband/ulp/rds/iw.c
+++ b/drivers/infiniband/ulp/rds/iw.c
@@ -91,13 +91,14 @@ void rds_ib_add_one(struct ib_device *device)
 	spin_lock_init(&rds_ibdev->spinlock);
 
 	rds_ibdev->dma_local_lkey = !!(dev_attr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY);
+	rds_ibdev->use_fastreg = !!(dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS);
 	rds_ibdev->max_wrs = dev_attr->max_qp_wr;
 	rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
 
 	rds_ibdev->fmr_page_shift = max(9, ffs(dev_attr->page_size_cap) - 1);
 	rds_ibdev->fmr_page_size  = 1 << rds_ibdev->fmr_page_shift;
 	rds_ibdev->fmr_page_mask  = ~((u64) rds_ibdev->fmr_page_size - 1);
-	rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
+	rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr ? 0 : 32;
 	rds_ibdev->max_fmrs = dev_attr->max_fmr?
 			min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
 			fmr_pool_size;
@@ -112,7 +113,6 @@ void rds_ib_add_one(struct ib_device *device)
 			rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
 						IB_ACCESS_LOCAL_WRITE);
 		} else {
-			/* Why does it have to have these permissions? */
 			rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
 						IB_ACCESS_REMOTE_READ |
 						IB_ACCESS_REMOTE_WRITE |
@@ -123,10 +123,6 @@ void rds_ib_add_one(struct ib_device *device)
 	} else
 		rds_ibdev->mr = NULL;
 
-	/* Tell the RDMA code to use the fastreg API */
-	if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)
-		rds_ibdev->use_fastreg = 1;
-
 	rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
 	if (IS_ERR(rds_ibdev->mr_pool)) {
 		rds_ibdev->mr_pool = NULL;
diff --git a/drivers/infiniband/ulp/rds/iw_rdma.c b/drivers/infiniband/ulp/rds/iw_rdma.c
index a107728..967ae49 100644
--- a/drivers/infiniband/ulp/rds/iw_rdma.c
+++ b/drivers/infiniband/ulp/rds/iw_rdma.c
@@ -45,8 +45,7 @@ extern struct list_head rds_ib_devices;
 struct rds_ib_mr {
 	struct rds_ib_device	*device;
 	struct rds_ib_mr_pool	*pool;
-
-	struct ib_qp *qp;
+	struct rdma_cm_id	*cm_id;
 
 	union {
 	    struct ib_fmr	*fmr;
@@ -136,13 +135,13 @@ static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
 	.destroy	= rds_ib_destroy_fastreg,
 };
 
-static int rds_ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibdev, struct ib_qp **qp)
+static int rds_ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibdev, struct rdma_cm_id **cm_id)
 {
 	struct rds_ib_device *ibdev;
 	struct rds_ib_cm_id *i_cm_id;
 
 	*rds_ibdev = NULL;
-	*qp = NULL;
+	*cm_id = NULL;
 
 	list_for_each_entry(ibdev, &rds_ib_devices, list) {
 		spin_lock_irq(&ibdev->spinlock);
@@ -163,15 +162,22 @@ static int rds_ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibd
 				rs->rs_bound_port,
 				rs->rs_conn_addr,
 				rs->rs_conn_port);
+#if WORKING_TUPLE_DETECTION
+			if (src_addr->sin_addr.s_addr == rs->rs_bound_addr &&
+			    src_addr->sin_port == rs->rs_bound_port &&
+			    dst_addr->sin_addr.s_addr == rs->rs_conn_addr &&
+			    dst_addr->sin_port == rs->rs_conn_port) {
+#else
 			/* FIXME - needs to compare the local and remote ipaddr/port tuple, but the
 			 * ipaddr is the only available infomation in the rds_sock (as the rest are
 			 * zero'ed.  It doesn't appear to be properly populated during connection
 			 * setup...
 			 */
 			if (src_addr->sin_addr.s_addr == rs->rs_bound_addr) {
+#endif
 				spin_unlock_irq(&ibdev->spinlock);
 				*rds_ibdev = ibdev;
-				*qp = i_cm_id->cm_id->qp;
+				*cm_id = i_cm_id->cm_id;
 				return 0;
 			}
 		}
@@ -219,7 +225,7 @@ int rds_ib_update_cm_id(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_i
 	struct sockaddr_in *src_addr, *dst_addr;
         struct rds_ib_device *rds_ibdev_old;
 	struct rds_sock rs;
-	struct ib_qp *qp;
+	struct rdma_cm_id *pcm_id;
 	int rc;
 
 	src_addr = (struct sockaddr_in *)&cm_id->route.addr.src_addr;
@@ -230,7 +236,7 @@ int rds_ib_update_cm_id(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_i
 	rs.rs_conn_addr = dst_addr->sin_addr.s_addr;
 	rs.rs_conn_port = dst_addr->sin_port;
 
-        rc = rds_ib_get_device(&rs, &rds_ibdev_old, &qp);
+	rc = rds_ib_get_device(&rs, &rds_ibdev_old, &pcm_id);
 	if (rc)
 		rds_ib_remove_cm_id(rds_ibdev, cm_id);
 
@@ -364,7 +370,7 @@ static u64 *rds_ib_map_scatterlist(struct rds_ib_device *rds_ibdev,
 	}
 
 	/* Now gather the dma addrs into one list */
-	if (sg->dma_npages > fmr_message_size)
+	if ((rds_ibdev->use_fastreg && sg->dma_npages > fastreg_message_size) || sg->dma_npages > fmr_message_size)
 		goto out_unmap;
 
 	dma_pages = kmalloc(sizeof(u64) * sg->dma_npages, GFP_ATOMIC);
@@ -382,6 +388,7 @@ static u64 *rds_ib_map_scatterlist(struct rds_ib_device *rds_ibdev,
 		dma_addr &= ~dma_mask;
 		for (; dma_addr < end_addr; dma_addr += dma_page_size)
 			dma_pages[j++] = dma_addr;
+		BUG_ON(j > sg->dma_npages);
 	}
 
 	return dma_pages;
@@ -417,7 +424,8 @@ static struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_
 	pool->max_items = pool_size;
 	pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
 
-	pool->fmr_attr.max_pages = fmr_message_size;
+	pool->fmr_attr.max_pages = rds_ibdev->use_fastreg ? fastreg_message_size :
+				   fmr_message_size;
 	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
 	pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
 	pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
@@ -435,9 +443,10 @@ static struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_
 struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
 {
 	struct rds_ib_mr_pool *pool;
-	unsigned int pool_size;
 
 	if (!rds_ibdev->use_fastreg) {
+		unsigned int pool_size;
+
 		/* Use FMRs to implement memory registrations */
 		pool_size = fmr_pool_size;
 
@@ -447,23 +456,17 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
 		pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
 					&rds_ib_fmr_pool_ops);
 
-		if (!IS_ERR(pool)) {
-			pool->fmr_attr.max_pages = pool->max_message_size;
-			pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
-			pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
-		}
+		if (IS_ERR(pool))
+			printk("__rds_ib_create_mr_pool error\n");
 	} else {
 		/* Use fastregs to implement memory registrations */
-		pool_size = fastreg_pool_size;
-
 		pool = __rds_ib_create_mr_pool(rds_ibdev,
 					fastreg_message_size,
-					pool_size,
+					fastreg_pool_size,
 					&rds_ib_fastreg_pool_ops);
 
-		if (IS_ERR(pool)) {
+		if (IS_ERR(pool))
 			printk("__rds_ib_create_mr_pool error\n");
-		}
 	}
 
 	return pool;
@@ -726,11 +729,11 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 	struct rds_ib_device *rds_ibdev;
 	struct rds_ib_mr_pool *pool;
 	struct rds_ib_mr *ibmr = NULL;
-	struct ib_qp *qp;
+	struct rdma_cm_id *cm_id;
 	int ret;
 
-	ret = rds_ib_get_device(rs, &rds_ibdev, &qp);
-	if (ret || !qp) {
+	ret = rds_ib_get_device(rs, &rds_ibdev, &cm_id);
+	if (ret || !cm_id) {
 		ret = -ENODEV;
 		goto out;
 	}
@@ -744,7 +747,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 	if (IS_ERR(ibmr))
 		return ibmr;
 
-	ibmr->qp = qp;
+	ibmr->cm_id = cm_id;
 	ibmr->device = rds_ibdev;
 
 	ret = pool->op->map(pool, ibmr, sg, nents);
@@ -804,6 +807,13 @@ static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
 
 	ret = ib_map_phys_fmr(ibmr->u.fmr, dma_pages, ibsg.dma_npages, 0);
 	if (ret) {
+		/* FIXME - On line 263 of rdma.c, rs->rs_transport->get_mr is
+		 * called (note the sg pointer being passed in).  If this get_mr
+		 * returns an error, among other things it will kfree that sg. 
+		 * rds_ib_drop_scatterlist will kfree that same sg list.  So in
+		 * the error case below, we have a double free.  Is the cleanup
+		 * below really necessary?
+		 */
 		rds_ib_drop_scatterlist(rds_ibdev, &ibsg);
 		goto out;
 	}
@@ -910,7 +920,6 @@ static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
 				struct rds_ib_mr *ibmr)
 {
 	struct rds_ib_device *rds_ibdev = pool->device;
-	struct rds_ib_mapping *mapping = &ibmr->mapping;
 	struct ib_fast_reg_page_list *page_list = NULL;
 	struct ib_mr *mr;
 	int err;
@@ -923,7 +932,10 @@ static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
 		return err;
 	}
 
-	page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, mapping->m_sg.dma_npages);
+	/* FIXME - this is overkill, but mapping->m_sg.dma_len/mapping->m_sg.dma_npages
+	 * is not filled in.
+	 */
+	page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, pool->max_message_size);
 	if (IS_ERR(page_list)) {
 		err = PTR_ERR(page_list);
 
@@ -937,7 +949,7 @@ static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
 	return 0;
 }
 
-static int rds_ib_rdma_build_fastreg(struct ib_qp *qp, struct rds_ib_mapping *mapping)
+static int rds_ib_rdma_build_fastreg(struct rds_ib_mapping *mapping)
 {
 	struct rds_ib_mr *ibmr = mapping->m_mr;
 	struct ib_send_wr f_wr, *failed_wr;
@@ -966,7 +978,7 @@ static int rds_ib_rdma_build_fastreg(struct ib_qp *qp, struct rds_ib_mapping *ma
 	f_wr.send_flags = IB_SEND_SIGNALED;
 
 	failed_wr = &f_wr;
-	ret = ib_post_send(qp, &f_wr, &failed_wr);
+	ret = ib_post_send(ibmr->cm_id->qp, &f_wr, &failed_wr);
 	BUG_ON(failed_wr != &f_wr);
 	if (ret) {
 		printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
@@ -983,7 +995,7 @@ static int rds_ib_rdma_fastreg_inv(struct rds_ib_mr *ibmr)
 	struct ib_send_wr s_wr, *failed_wr;
 	int ret = 0;
 
-	if (!ibmr->qp || !ibmr->fr_mr)
+	if (!ibmr->cm_id->qp || !ibmr->fr_mr)
 		goto out;
 
 	memset(&s_wr, 0, sizeof(s_wr));
@@ -992,7 +1004,7 @@ static int rds_ib_rdma_fastreg_inv(struct rds_ib_mr *ibmr)
 	s_wr.send_flags = IB_SEND_SIGNALED;
 
 	failed_wr = &s_wr;
-	ret = ib_post_send(ibmr->qp, &s_wr, &failed_wr);
+	ret = ib_post_send(ibmr->cm_id->qp, &s_wr, &failed_wr);
 	if (ret) {
 		printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
 			__func__, __LINE__, ret);
@@ -1010,7 +1022,7 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
 	struct rds_ib_device *rds_ibdev = pool->device;
 	struct rds_ib_mapping *mapping = &ibmr->mapping;
 	u64 *dma_pages;
-	int i, ret;
+	int i, ret = 0;
 
 	rds_ib_set_scatterlist(&mapping->m_sg, sg, sg_len);
 
@@ -1024,7 +1036,6 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
 	}
 
 	if (mapping->m_sg.dma_len > pool->max_message_size) {
-		printk("mapping->m_sg.dma_len > pool->max_message_size\n");
 		ret = -EMSGSIZE;
 		goto out;
 	}
@@ -1032,10 +1043,11 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
 	for (i = 0; i < mapping->m_sg.dma_npages; ++i)
 		ibmr->fr_page_list->page_list[i] = dma_pages[i];
 
-	rds_ib_rdma_build_fastreg(ibmr->qp, mapping);
+	ret = rds_ib_rdma_build_fastreg(mapping);
+	if (ret)
+		goto out;
 
 	rds_ib_stats_inc(s_ib_rdma_mr_used);
-	ret = 0;
 
 out:
 	kfree(dma_pages);
@@ -1050,11 +1062,14 @@ static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool,
 		struct rds_ib_mr *ibmr)
 {
 	unsigned long flags;
+	int ret;
 
 	if (!ibmr->mapping.m_sg.dma_len)
 		return;
 
-	rds_ib_rdma_fastreg_inv(ibmr);
+	ret = rds_ib_rdma_fastreg_inv(ibmr);
+	if (ret)
+		return;
 
 	/* Try to post the LOCAL_INV WR to the queue. */
 	spin_lock_irqsave(&pool->list_lock, flags);
diff --git a/drivers/infiniband/ulp/rds/iw_send.c b/drivers/infiniband/ulp/rds/iw_send.c
index 2f0c1fb..c67a7be 100644
--- a/drivers/infiniband/ulp/rds/iw_send.c
+++ b/drivers/infiniband/ulp/rds/iw_send.c
@@ -154,13 +154,13 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
 		sge->lkey = 0;
 
 		if (ic->i_iwarp) {
-			send->s_mr = ib_alloc_fast_reg_mr(ic->i_pd, fmr_message_size);
+			send->s_mr = ib_alloc_fast_reg_mr(ic->i_pd, fastreg_message_size);
 			if (IS_ERR(send->s_mr)) {
 				printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_mr failed\n");
 				break;
 			}
 
-			send->s_page_list = ib_alloc_fast_reg_page_list(ic->i_cm_id->device, RDS_IB_MAX_SGE);
+			send->s_page_list = ib_alloc_fast_reg_page_list(ic->i_cm_id->device, fastreg_message_size);
 			if (IS_ERR(send->s_page_list)) {
 				printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_page_list failed\n");
 				break;
@@ -206,7 +206,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 	struct rds_ib_send_work *send;
 	u32 completed;
 	u32 oldest;
-	u32 i = 0;
+	u32 i;
 	int ret;
 
 	rdsdebug("cq %p conn %p\n", cq, conn);
@@ -897,17 +897,18 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 
 		for (j = 0; j < send->s_wr.num_sge && scat != &op->r_sg[op->r_count]; j++) {
 			len = sg_dma_len(scat);
-			send->s_sge[j].addr = sg_dma_address(scat);
-			send->s_sge[j].length = len;
 
 			if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV)
 				send->s_page_list->page_list[j] = sg_dma_address(scat);
-			else
+			else {
+				send->s_sge[j].addr = sg_dma_address(scat);
+				send->s_sge[j].length = len;
 				send->s_sge[j].lkey = rds_ib_local_dma_lkey(ic);
+			}
 
 			sent += len;
 			rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
-			remote_addr += sg_dma_len(scat);
+			remote_addr += len;
 
 			scat++;
 		}
@@ -926,23 +927,24 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 			send = ic->i_sends;
 	}
 
+	/* if we finished the message then send completion owns it */
+	if (scat == &op->r_sg[op->r_count])
+		prev->s_wr.send_flags = IB_SEND_SIGNALED;
+
+	if (i < work_alloc) {
+		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
+		work_alloc = i;
+	}
+
 	/* On iWARP, local memory access by a remote system (ie, RDMA Read) is not
 	 * recommended.  Putting the lkey on the wire is a security hole, as it can
 	 * allow for memory access to all of memory on the remote system.  Some
 	 * adapters do not allow using the lkey for this at all.  To bypass this use a
 	 * fastreg_mr (or possibly a dma_mr)
 	 */
-	if (!op->r_write && ic->i_iwarp)
+	if (!op->r_write && ic->i_iwarp) {
 		rds_ib_build_send_fastreg(rds_ibdev, ic, &ic->i_sends[fr_pos], op->r_count, sent, conn->c_xmit_rm->m_rs->rs_user_addr);
-
-	/* if we finished the message then send completion owns it */
-	if (scat == &op->r_sg[op->r_count]) {
-		prev->s_wr.send_flags = IB_SEND_SIGNALED;
-	}
-
-	if (i < work_alloc) {
-		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
-		work_alloc = i;
+		work_alloc++;
 	}
 
 	failed_wr = &first->s_wr;
diff --git a/drivers/infiniband/ulp/rds/send.c b/drivers/infiniband/ulp/rds/send.c
index 3a9247e..ae81698 100644
--- a/drivers/infiniband/ulp/rds/send.c
+++ b/drivers/infiniband/ulp/rds/send.c
@@ -127,6 +127,7 @@ int rds_send_xmit(struct rds_connection *conn)
 	 */
 	if (down_trylock(&conn->c_send_sem)) {
 		rds_stats_inc(s_send_sem_contention);
+		ret = -ENOMEM;
 		goto out;
 	}
 



More information about the rds-devel mailing list