[rds-devel] iWARP RDMA patch

Richard Frank richard.frank at oracle.com
Thu Nov 6 08:59:55 PST 2008


The most important (IMO) support to work on - is sorting out the rdma 
completion / error processing... for iWARP..

We can get by with out rdma loopback support for iWARP... and bcopy 
loopback is already in place..

Jon Mason wrote:
> Hey Andy,
> I migrated my current patch set to your new code location, and I wanted
> to do a quick code dump before I start working on the higher priority
> code, iWARP loopback.
>
> It is mostly changes to refer to the correct values, and some code
> cleanup.  The most significant code change is a migration from a qp
> reference in the rds_ib_mr struct.  This enables the fastreg and
> invalidate work requests to be on the ic->i_send_ring (thus removing the
> potential adapter send queue overflow).  Unfortunately the code that
> migrated the fastreg and invalidate to the i_send_ring is not working
> (so i have excluded it).
>
> Thanks,
> Jon
>
> Signed-Off-By: Jon Mason <jon at opengridcomputing.com>
>
> diff --git a/drivers/infiniband/ulp/rds/iw.c b/drivers/infiniband/ulp/rds/iw.c
> index 1b2946f..87f8918 100644
> --- a/drivers/infiniband/ulp/rds/iw.c
> +++ b/drivers/infiniband/ulp/rds/iw.c
> @@ -91,13 +91,14 @@ void rds_ib_add_one(struct ib_device *device)
>  	spin_lock_init(&rds_ibdev->spinlock);
>  
>  	rds_ibdev->dma_local_lkey = !!(dev_attr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY);
> +	rds_ibdev->use_fastreg = !!(dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS);
>  	rds_ibdev->max_wrs = dev_attr->max_qp_wr;
>  	rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
>  
>  	rds_ibdev->fmr_page_shift = max(9, ffs(dev_attr->page_size_cap) - 1);
>  	rds_ibdev->fmr_page_size  = 1 << rds_ibdev->fmr_page_shift;
>  	rds_ibdev->fmr_page_mask  = ~((u64) rds_ibdev->fmr_page_size - 1);
> -	rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
> +	rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr ? 0 : 32;
>  	rds_ibdev->max_fmrs = dev_attr->max_fmr?
>  			min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
>  			fmr_pool_size;
> @@ -112,7 +113,6 @@ void rds_ib_add_one(struct ib_device *device)
>  			rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
>  						IB_ACCESS_LOCAL_WRITE);
>  		} else {
> -			/* Why does it have to have these permissions? */
>  			rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
>  						IB_ACCESS_REMOTE_READ |
>  						IB_ACCESS_REMOTE_WRITE |
> @@ -123,10 +123,6 @@ void rds_ib_add_one(struct ib_device *device)
>  	} else
>  		rds_ibdev->mr = NULL;
>  
> -	/* Tell the RDMA code to use the fastreg API */
> -	if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)
> -		rds_ibdev->use_fastreg = 1;
> -
>  	rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
>  	if (IS_ERR(rds_ibdev->mr_pool)) {
>  		rds_ibdev->mr_pool = NULL;
> diff --git a/drivers/infiniband/ulp/rds/iw_rdma.c b/drivers/infiniband/ulp/rds/iw_rdma.c
> index a107728..967ae49 100644
> --- a/drivers/infiniband/ulp/rds/iw_rdma.c
> +++ b/drivers/infiniband/ulp/rds/iw_rdma.c
> @@ -45,8 +45,7 @@ extern struct list_head rds_ib_devices;
>  struct rds_ib_mr {
>  	struct rds_ib_device	*device;
>  	struct rds_ib_mr_pool	*pool;
> -
> -	struct ib_qp *qp;
> +	struct rdma_cm_id	*cm_id;
>  
>  	union {
>  	    struct ib_fmr	*fmr;
> @@ -136,13 +135,13 @@ static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
>  	.destroy	= rds_ib_destroy_fastreg,
>  };
>  
> -static int rds_ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibdev, struct ib_qp **qp)
> +static int rds_ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibdev, struct rdma_cm_id **cm_id)
>  {
>  	struct rds_ib_device *ibdev;
>  	struct rds_ib_cm_id *i_cm_id;
>  
>  	*rds_ibdev = NULL;
> -	*qp = NULL;
> +	*cm_id = NULL;
>  
>  	list_for_each_entry(ibdev, &rds_ib_devices, list) {
>  		spin_lock_irq(&ibdev->spinlock);
> @@ -163,15 +162,22 @@ static int rds_ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibd
>  				rs->rs_bound_port,
>  				rs->rs_conn_addr,
>  				rs->rs_conn_port);
> +#if WORKING_TUPLE_DETECTION
> +			if (src_addr->sin_addr.s_addr == rs->rs_bound_addr &&
> +			    src_addr->sin_port == rs->rs_bound_port &&
> +			    dst_addr->sin_addr.s_addr == rs->rs_conn_addr &&
> +			    dst_addr->sin_port == rs->rs_conn_port) {
> +#else
>  			/* FIXME - needs to compare the local and remote ipaddr/port tuple, but the
>  			 * ipaddr is the only available infomation in the rds_sock (as the rest are
>  			 * zero'ed.  It doesn't appear to be properly populated during connection
>  			 * setup...
>  			 */
>  			if (src_addr->sin_addr.s_addr == rs->rs_bound_addr) {
> +#endif
>  				spin_unlock_irq(&ibdev->spinlock);
>  				*rds_ibdev = ibdev;
> -				*qp = i_cm_id->cm_id->qp;
> +				*cm_id = i_cm_id->cm_id;
>  				return 0;
>  			}
>  		}
> @@ -219,7 +225,7 @@ int rds_ib_update_cm_id(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_i
>  	struct sockaddr_in *src_addr, *dst_addr;
>          struct rds_ib_device *rds_ibdev_old;
>  	struct rds_sock rs;
> -	struct ib_qp *qp;
> +	struct rdma_cm_id *pcm_id;
>  	int rc;
>  
>  	src_addr = (struct sockaddr_in *)&cm_id->route.addr.src_addr;
> @@ -230,7 +236,7 @@ int rds_ib_update_cm_id(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_i
>  	rs.rs_conn_addr = dst_addr->sin_addr.s_addr;
>  	rs.rs_conn_port = dst_addr->sin_port;
>  
> -        rc = rds_ib_get_device(&rs, &rds_ibdev_old, &qp);
> +	rc = rds_ib_get_device(&rs, &rds_ibdev_old, &pcm_id);
>  	if (rc)
>  		rds_ib_remove_cm_id(rds_ibdev, cm_id);
>  
> @@ -364,7 +370,7 @@ static u64 *rds_ib_map_scatterlist(struct rds_ib_device *rds_ibdev,
>  	}
>  
>  	/* Now gather the dma addrs into one list */
> -	if (sg->dma_npages > fmr_message_size)
> +	if ((rds_ibdev->use_fastreg && sg->dma_npages > fastreg_message_size) || sg->dma_npages > fmr_message_size)
>  		goto out_unmap;
>  
>  	dma_pages = kmalloc(sizeof(u64) * sg->dma_npages, GFP_ATOMIC);
> @@ -382,6 +388,7 @@ static u64 *rds_ib_map_scatterlist(struct rds_ib_device *rds_ibdev,
>  		dma_addr &= ~dma_mask;
>  		for (; dma_addr < end_addr; dma_addr += dma_page_size)
>  			dma_pages[j++] = dma_addr;
> +		BUG_ON(j > sg->dma_npages);
>  	}
>  
>  	return dma_pages;
> @@ -417,7 +424,8 @@ static struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_
>  	pool->max_items = pool_size;
>  	pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
>  
> -	pool->fmr_attr.max_pages = fmr_message_size;
> +	pool->fmr_attr.max_pages = rds_ibdev->use_fastreg ? fastreg_message_size :
> +				   fmr_message_size;
>  	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
>  	pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
>  	pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
> @@ -435,9 +443,10 @@ static struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_
>  struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
>  {
>  	struct rds_ib_mr_pool *pool;
> -	unsigned int pool_size;
>  
>  	if (!rds_ibdev->use_fastreg) {
> +		unsigned int pool_size;
> +
>  		/* Use FMRs to implement memory registrations */
>  		pool_size = fmr_pool_size;
>  
> @@ -447,23 +456,17 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
>  		pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
>  					&rds_ib_fmr_pool_ops);
>  
> -		if (!IS_ERR(pool)) {
> -			pool->fmr_attr.max_pages = pool->max_message_size;
> -			pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
> -			pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
> -		}
> +		if (IS_ERR(pool))
> +			printk("__rds_ib_create_mr_pool error\n");
>  	} else {
>  		/* Use fastregs to implement memory registrations */
> -		pool_size = fastreg_pool_size;
> -
>  		pool = __rds_ib_create_mr_pool(rds_ibdev,
>  					fastreg_message_size,
> -					pool_size,
> +					fastreg_pool_size,
>  					&rds_ib_fastreg_pool_ops);
>  
> -		if (IS_ERR(pool)) {
> +		if (IS_ERR(pool))
>  			printk("__rds_ib_create_mr_pool error\n");
> -		}
>  	}
>  
>  	return pool;
> @@ -726,11 +729,11 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
>  	struct rds_ib_device *rds_ibdev;
>  	struct rds_ib_mr_pool *pool;
>  	struct rds_ib_mr *ibmr = NULL;
> -	struct ib_qp *qp;
> +	struct rdma_cm_id *cm_id;
>  	int ret;
>  
> -	ret = rds_ib_get_device(rs, &rds_ibdev, &qp);
> -	if (ret || !qp) {
> +	ret = rds_ib_get_device(rs, &rds_ibdev, &cm_id);
> +	if (ret || !cm_id) {
>  		ret = -ENODEV;
>  		goto out;
>  	}
> @@ -744,7 +747,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
>  	if (IS_ERR(ibmr))
>  		return ibmr;
>  
> -	ibmr->qp = qp;
> +	ibmr->cm_id = cm_id;
>  	ibmr->device = rds_ibdev;
>  
>  	ret = pool->op->map(pool, ibmr, sg, nents);
> @@ -804,6 +807,13 @@ static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
>  
>  	ret = ib_map_phys_fmr(ibmr->u.fmr, dma_pages, ibsg.dma_npages, 0);
>  	if (ret) {
> +		/* FIXME - On line 263 of rdma.c, rs->rs_transport->get_mr is
> +		 * called (note the sg pointer being passed in).  If this get_mr
> +		 * returns an error, among other things it will kfree that sg. 
> +		 * rds_ib_drop_scatterlist will kfree that same sg list.  So in
> +		 * the error case below, we have a double free.  Is the cleanup
> +		 * below really necessary?
> +		 */
>  		rds_ib_drop_scatterlist(rds_ibdev, &ibsg);
>  		goto out;
>  	}
> @@ -910,7 +920,6 @@ static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
>  				struct rds_ib_mr *ibmr)
>  {
>  	struct rds_ib_device *rds_ibdev = pool->device;
> -	struct rds_ib_mapping *mapping = &ibmr->mapping;
>  	struct ib_fast_reg_page_list *page_list = NULL;
>  	struct ib_mr *mr;
>  	int err;
> @@ -923,7 +932,10 @@ static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
>  		return err;
>  	}
>  
> -	page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, mapping->m_sg.dma_npages);
> +	/* FIXME - this is overkill, but mapping->m_sg.dma_len/mapping->m_sg.dma_npages
> +	 * is not filled in.
> +	 */
> +	page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, pool->max_message_size);
>  	if (IS_ERR(page_list)) {
>  		err = PTR_ERR(page_list);
>  
> @@ -937,7 +949,7 @@ static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
>  	return 0;
>  }
>  
> -static int rds_ib_rdma_build_fastreg(struct ib_qp *qp, struct rds_ib_mapping *mapping)
> +static int rds_ib_rdma_build_fastreg(struct rds_ib_mapping *mapping)
>  {
>  	struct rds_ib_mr *ibmr = mapping->m_mr;
>  	struct ib_send_wr f_wr, *failed_wr;
> @@ -966,7 +978,7 @@ static int rds_ib_rdma_build_fastreg(struct ib_qp *qp, struct rds_ib_mapping *ma
>  	f_wr.send_flags = IB_SEND_SIGNALED;
>  
>  	failed_wr = &f_wr;
> -	ret = ib_post_send(qp, &f_wr, &failed_wr);
> +	ret = ib_post_send(ibmr->cm_id->qp, &f_wr, &failed_wr);
>  	BUG_ON(failed_wr != &f_wr);
>  	if (ret) {
>  		printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
> @@ -983,7 +995,7 @@ static int rds_ib_rdma_fastreg_inv(struct rds_ib_mr *ibmr)
>  	struct ib_send_wr s_wr, *failed_wr;
>  	int ret = 0;
>  
> -	if (!ibmr->qp || !ibmr->fr_mr)
> +	if (!ibmr->cm_id->qp || !ibmr->fr_mr)
>  		goto out;
>  
>  	memset(&s_wr, 0, sizeof(s_wr));
> @@ -992,7 +1004,7 @@ static int rds_ib_rdma_fastreg_inv(struct rds_ib_mr *ibmr)
>  	s_wr.send_flags = IB_SEND_SIGNALED;
>  
>  	failed_wr = &s_wr;
> -	ret = ib_post_send(ibmr->qp, &s_wr, &failed_wr);
> +	ret = ib_post_send(ibmr->cm_id->qp, &s_wr, &failed_wr);
>  	if (ret) {
>  		printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
>  			__func__, __LINE__, ret);
> @@ -1010,7 +1022,7 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
>  	struct rds_ib_device *rds_ibdev = pool->device;
>  	struct rds_ib_mapping *mapping = &ibmr->mapping;
>  	u64 *dma_pages;
> -	int i, ret;
> +	int i, ret = 0;
>  
>  	rds_ib_set_scatterlist(&mapping->m_sg, sg, sg_len);
>  
> @@ -1024,7 +1036,6 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
>  	}
>  
>  	if (mapping->m_sg.dma_len > pool->max_message_size) {
> -		printk("mapping->m_sg.dma_len > pool->max_message_size\n");
>  		ret = -EMSGSIZE;
>  		goto out;
>  	}
> @@ -1032,10 +1043,11 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
>  	for (i = 0; i < mapping->m_sg.dma_npages; ++i)
>  		ibmr->fr_page_list->page_list[i] = dma_pages[i];
>  
> -	rds_ib_rdma_build_fastreg(ibmr->qp, mapping);
> +	ret = rds_ib_rdma_build_fastreg(mapping);
> +	if (ret)
> +		goto out;
>  
>  	rds_ib_stats_inc(s_ib_rdma_mr_used);
> -	ret = 0;
>  
>  out:
>  	kfree(dma_pages);
> @@ -1050,11 +1062,14 @@ static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool,
>  		struct rds_ib_mr *ibmr)
>  {
>  	unsigned long flags;
> +	int ret;
>  
>  	if (!ibmr->mapping.m_sg.dma_len)
>  		return;
>  
> -	rds_ib_rdma_fastreg_inv(ibmr);
> +	ret = rds_ib_rdma_fastreg_inv(ibmr);
> +	if (ret)
> +		return;
>  
>  	/* Try to post the LOCAL_INV WR to the queue. */
>  	spin_lock_irqsave(&pool->list_lock, flags);
> diff --git a/drivers/infiniband/ulp/rds/iw_send.c b/drivers/infiniband/ulp/rds/iw_send.c
> index 2f0c1fb..c67a7be 100644
> --- a/drivers/infiniband/ulp/rds/iw_send.c
> +++ b/drivers/infiniband/ulp/rds/iw_send.c
> @@ -154,13 +154,13 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
>  		sge->lkey = 0;
>  
>  		if (ic->i_iwarp) {
> -			send->s_mr = ib_alloc_fast_reg_mr(ic->i_pd, fmr_message_size);
> +			send->s_mr = ib_alloc_fast_reg_mr(ic->i_pd, fastreg_message_size);
>  			if (IS_ERR(send->s_mr)) {
>  				printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_mr failed\n");
>  				break;
>  			}
>  
> -			send->s_page_list = ib_alloc_fast_reg_page_list(ic->i_cm_id->device, RDS_IB_MAX_SGE);
> +			send->s_page_list = ib_alloc_fast_reg_page_list(ic->i_cm_id->device, fastreg_message_size);
>  			if (IS_ERR(send->s_page_list)) {
>  				printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_page_list failed\n");
>  				break;
> @@ -206,7 +206,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
>  	struct rds_ib_send_work *send;
>  	u32 completed;
>  	u32 oldest;
> -	u32 i = 0;
> +	u32 i;
>  	int ret;
>  
>  	rdsdebug("cq %p conn %p\n", cq, conn);
> @@ -897,17 +897,18 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
>  
>  		for (j = 0; j < send->s_wr.num_sge && scat != &op->r_sg[op->r_count]; j++) {
>  			len = sg_dma_len(scat);
> -			send->s_sge[j].addr = sg_dma_address(scat);
> -			send->s_sge[j].length = len;
>  
>  			if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV)
>  				send->s_page_list->page_list[j] = sg_dma_address(scat);
> -			else
> +			else {
> +				send->s_sge[j].addr = sg_dma_address(scat);
> +				send->s_sge[j].length = len;
>  				send->s_sge[j].lkey = rds_ib_local_dma_lkey(ic);
> +			}
>  
>  			sent += len;
>  			rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
> -			remote_addr += sg_dma_len(scat);
> +			remote_addr += len;
>  
>  			scat++;
>  		}
> @@ -926,23 +927,24 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
>  			send = ic->i_sends;
>  	}
>  
> +	/* if we finished the message then send completion owns it */
> +	if (scat == &op->r_sg[op->r_count])
> +		prev->s_wr.send_flags = IB_SEND_SIGNALED;
> +
> +	if (i < work_alloc) {
> +		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
> +		work_alloc = i;
> +	}
> +
>  	/* On iWARP, local memory access by a remote system (ie, RDMA Read) is not
>  	 * recommended.  Putting the lkey on the wire is a security hole, as it can
>  	 * allow for memory access to all of memory on the remote system.  Some
>  	 * adapters do not allow using the lkey for this at all.  To bypass this use a
>  	 * fastreg_mr (or possibly a dma_mr)
>  	 */
> -	if (!op->r_write && ic->i_iwarp)
> +	if (!op->r_write && ic->i_iwarp) {
>  		rds_ib_build_send_fastreg(rds_ibdev, ic, &ic->i_sends[fr_pos], op->r_count, sent, conn->c_xmit_rm->m_rs->rs_user_addr);
> -
> -	/* if we finished the message then send completion owns it */
> -	if (scat == &op->r_sg[op->r_count]) {
> -		prev->s_wr.send_flags = IB_SEND_SIGNALED;
> -	}
> -
> -	if (i < work_alloc) {
> -		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
> -		work_alloc = i;
> +		work_alloc++;
>  	}
>  
>  	failed_wr = &first->s_wr;
> diff --git a/drivers/infiniband/ulp/rds/send.c b/drivers/infiniband/ulp/rds/send.c
> index 3a9247e..ae81698 100644
> --- a/drivers/infiniband/ulp/rds/send.c
> +++ b/drivers/infiniband/ulp/rds/send.c
> @@ -127,6 +127,7 @@ int rds_send_xmit(struct rds_connection *conn)
>  	 */
>  	if (down_trylock(&conn->c_send_sem)) {
>  		rds_stats_inc(s_send_sem_contention);
> +		ret = -ENOMEM;
>  		goto out;
>  	}
>  
>
> _______________________________________________
> rds-devel mailing list
> rds-devel at oss.oracle.com
> http://oss.oracle.com/mailman/listinfo/rds-devel
>   



More information about the rds-devel mailing list