[rds-devel] iWARP RDMA patch
Richard Frank
richard.frank at oracle.com
Thu Nov 6 08:59:55 PST 2008
The most important (IMO) support to work on - is sorting out the rdma
completion / error processing... for iWARP..
We can get by with out rdma loopback support for iWARP... and bcopy
loopback is already in place..
Jon Mason wrote:
> Hey Andy,
> I migrated my current patch set to your new code location, and I wanted
> to do a quick code dump before I start working on the higher priority
> code, iWARP loopback.
>
> It is mostly changes to refer to the correct values, and some code
> cleanup. The most significant code change is a migration from a qp
> reference in the rds_ib_mr struct. This enables the fastreg and
> invalidate work requests to be on the ic->i_send_ring (thus removing the
> potential adapter send queue overflow). Unfortunately the code that
> migrated the fastreg and invalidate to the i_send_ring is not working
> (so i have excluded it).
>
> Thanks,
> Jon
>
> Signed-Off-By: Jon Mason <jon at opengridcomputing.com>
>
> diff --git a/drivers/infiniband/ulp/rds/iw.c b/drivers/infiniband/ulp/rds/iw.c
> index 1b2946f..87f8918 100644
> --- a/drivers/infiniband/ulp/rds/iw.c
> +++ b/drivers/infiniband/ulp/rds/iw.c
> @@ -91,13 +91,14 @@ void rds_ib_add_one(struct ib_device *device)
> spin_lock_init(&rds_ibdev->spinlock);
>
> rds_ibdev->dma_local_lkey = !!(dev_attr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY);
> + rds_ibdev->use_fastreg = !!(dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS);
> rds_ibdev->max_wrs = dev_attr->max_qp_wr;
> rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
>
> rds_ibdev->fmr_page_shift = max(9, ffs(dev_attr->page_size_cap) - 1);
> rds_ibdev->fmr_page_size = 1 << rds_ibdev->fmr_page_shift;
> rds_ibdev->fmr_page_mask = ~((u64) rds_ibdev->fmr_page_size - 1);
> - rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
> + rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr ? 0 : 32;
> rds_ibdev->max_fmrs = dev_attr->max_fmr?
> min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
> fmr_pool_size;
> @@ -112,7 +113,6 @@ void rds_ib_add_one(struct ib_device *device)
> rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
> IB_ACCESS_LOCAL_WRITE);
> } else {
> - /* Why does it have to have these permissions? */
> rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
> IB_ACCESS_REMOTE_READ |
> IB_ACCESS_REMOTE_WRITE |
> @@ -123,10 +123,6 @@ void rds_ib_add_one(struct ib_device *device)
> } else
> rds_ibdev->mr = NULL;
>
> - /* Tell the RDMA code to use the fastreg API */
> - if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)
> - rds_ibdev->use_fastreg = 1;
> -
> rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
> if (IS_ERR(rds_ibdev->mr_pool)) {
> rds_ibdev->mr_pool = NULL;
> diff --git a/drivers/infiniband/ulp/rds/iw_rdma.c b/drivers/infiniband/ulp/rds/iw_rdma.c
> index a107728..967ae49 100644
> --- a/drivers/infiniband/ulp/rds/iw_rdma.c
> +++ b/drivers/infiniband/ulp/rds/iw_rdma.c
> @@ -45,8 +45,7 @@ extern struct list_head rds_ib_devices;
> struct rds_ib_mr {
> struct rds_ib_device *device;
> struct rds_ib_mr_pool *pool;
> -
> - struct ib_qp *qp;
> + struct rdma_cm_id *cm_id;
>
> union {
> struct ib_fmr *fmr;
> @@ -136,13 +135,13 @@ static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
> .destroy = rds_ib_destroy_fastreg,
> };
>
> -static int rds_ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibdev, struct ib_qp **qp)
> +static int rds_ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibdev, struct rdma_cm_id **cm_id)
> {
> struct rds_ib_device *ibdev;
> struct rds_ib_cm_id *i_cm_id;
>
> *rds_ibdev = NULL;
> - *qp = NULL;
> + *cm_id = NULL;
>
> list_for_each_entry(ibdev, &rds_ib_devices, list) {
> spin_lock_irq(&ibdev->spinlock);
> @@ -163,15 +162,22 @@ static int rds_ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibd
> rs->rs_bound_port,
> rs->rs_conn_addr,
> rs->rs_conn_port);
> +#if WORKING_TUPLE_DETECTION
> + if (src_addr->sin_addr.s_addr == rs->rs_bound_addr &&
> + src_addr->sin_port == rs->rs_bound_port &&
> + dst_addr->sin_addr.s_addr == rs->rs_conn_addr &&
> + dst_addr->sin_port == rs->rs_conn_port) {
> +#else
> /* FIXME - needs to compare the local and remote ipaddr/port tuple, but the
> * ipaddr is the only available infomation in the rds_sock (as the rest are
> * zero'ed. It doesn't appear to be properly populated during connection
> * setup...
> */
> if (src_addr->sin_addr.s_addr == rs->rs_bound_addr) {
> +#endif
> spin_unlock_irq(&ibdev->spinlock);
> *rds_ibdev = ibdev;
> - *qp = i_cm_id->cm_id->qp;
> + *cm_id = i_cm_id->cm_id;
> return 0;
> }
> }
> @@ -219,7 +225,7 @@ int rds_ib_update_cm_id(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_i
> struct sockaddr_in *src_addr, *dst_addr;
> struct rds_ib_device *rds_ibdev_old;
> struct rds_sock rs;
> - struct ib_qp *qp;
> + struct rdma_cm_id *pcm_id;
> int rc;
>
> src_addr = (struct sockaddr_in *)&cm_id->route.addr.src_addr;
> @@ -230,7 +236,7 @@ int rds_ib_update_cm_id(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_i
> rs.rs_conn_addr = dst_addr->sin_addr.s_addr;
> rs.rs_conn_port = dst_addr->sin_port;
>
> - rc = rds_ib_get_device(&rs, &rds_ibdev_old, &qp);
> + rc = rds_ib_get_device(&rs, &rds_ibdev_old, &pcm_id);
> if (rc)
> rds_ib_remove_cm_id(rds_ibdev, cm_id);
>
> @@ -364,7 +370,7 @@ static u64 *rds_ib_map_scatterlist(struct rds_ib_device *rds_ibdev,
> }
>
> /* Now gather the dma addrs into one list */
> - if (sg->dma_npages > fmr_message_size)
> + if ((rds_ibdev->use_fastreg && sg->dma_npages > fastreg_message_size) || sg->dma_npages > fmr_message_size)
> goto out_unmap;
>
> dma_pages = kmalloc(sizeof(u64) * sg->dma_npages, GFP_ATOMIC);
> @@ -382,6 +388,7 @@ static u64 *rds_ib_map_scatterlist(struct rds_ib_device *rds_ibdev,
> dma_addr &= ~dma_mask;
> for (; dma_addr < end_addr; dma_addr += dma_page_size)
> dma_pages[j++] = dma_addr;
> + BUG_ON(j > sg->dma_npages);
> }
>
> return dma_pages;
> @@ -417,7 +424,8 @@ static struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_
> pool->max_items = pool_size;
> pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
>
> - pool->fmr_attr.max_pages = fmr_message_size;
> + pool->fmr_attr.max_pages = rds_ibdev->use_fastreg ? fastreg_message_size :
> + fmr_message_size;
> pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
> pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
> pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
> @@ -435,9 +443,10 @@ static struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_
> struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
> {
> struct rds_ib_mr_pool *pool;
> - unsigned int pool_size;
>
> if (!rds_ibdev->use_fastreg) {
> + unsigned int pool_size;
> +
> /* Use FMRs to implement memory registrations */
> pool_size = fmr_pool_size;
>
> @@ -447,23 +456,17 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
> pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
> &rds_ib_fmr_pool_ops);
>
> - if (!IS_ERR(pool)) {
> - pool->fmr_attr.max_pages = pool->max_message_size;
> - pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
> - pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
> - }
> + if (IS_ERR(pool))
> + printk("__rds_ib_create_mr_pool error\n");
> } else {
> /* Use fastregs to implement memory registrations */
> - pool_size = fastreg_pool_size;
> -
> pool = __rds_ib_create_mr_pool(rds_ibdev,
> fastreg_message_size,
> - pool_size,
> + fastreg_pool_size,
> &rds_ib_fastreg_pool_ops);
>
> - if (IS_ERR(pool)) {
> + if (IS_ERR(pool))
> printk("__rds_ib_create_mr_pool error\n");
> - }
> }
>
> return pool;
> @@ -726,11 +729,11 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
> struct rds_ib_device *rds_ibdev;
> struct rds_ib_mr_pool *pool;
> struct rds_ib_mr *ibmr = NULL;
> - struct ib_qp *qp;
> + struct rdma_cm_id *cm_id;
> int ret;
>
> - ret = rds_ib_get_device(rs, &rds_ibdev, &qp);
> - if (ret || !qp) {
> + ret = rds_ib_get_device(rs, &rds_ibdev, &cm_id);
> + if (ret || !cm_id) {
> ret = -ENODEV;
> goto out;
> }
> @@ -744,7 +747,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
> if (IS_ERR(ibmr))
> return ibmr;
>
> - ibmr->qp = qp;
> + ibmr->cm_id = cm_id;
> ibmr->device = rds_ibdev;
>
> ret = pool->op->map(pool, ibmr, sg, nents);
> @@ -804,6 +807,13 @@ static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
>
> ret = ib_map_phys_fmr(ibmr->u.fmr, dma_pages, ibsg.dma_npages, 0);
> if (ret) {
> + /* FIXME - On line 263 of rdma.c, rs->rs_transport->get_mr is
> + * called (note the sg pointer being passed in). If this get_mr
> + * returns an error, among other things it will kfree that sg.
> + * rds_ib_drop_scatterlist will kfree that same sg list. So in
> + * the error case below, we have a double free. Is the cleanup
> + * below really necessary?
> + */
> rds_ib_drop_scatterlist(rds_ibdev, &ibsg);
> goto out;
> }
> @@ -910,7 +920,6 @@ static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
> struct rds_ib_mr *ibmr)
> {
> struct rds_ib_device *rds_ibdev = pool->device;
> - struct rds_ib_mapping *mapping = &ibmr->mapping;
> struct ib_fast_reg_page_list *page_list = NULL;
> struct ib_mr *mr;
> int err;
> @@ -923,7 +932,10 @@ static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
> return err;
> }
>
> - page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, mapping->m_sg.dma_npages);
> + /* FIXME - this is overkill, but mapping->m_sg.dma_len/mapping->m_sg.dma_npages
> + * is not filled in.
> + */
> + page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, pool->max_message_size);
> if (IS_ERR(page_list)) {
> err = PTR_ERR(page_list);
>
> @@ -937,7 +949,7 @@ static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
> return 0;
> }
>
> -static int rds_ib_rdma_build_fastreg(struct ib_qp *qp, struct rds_ib_mapping *mapping)
> +static int rds_ib_rdma_build_fastreg(struct rds_ib_mapping *mapping)
> {
> struct rds_ib_mr *ibmr = mapping->m_mr;
> struct ib_send_wr f_wr, *failed_wr;
> @@ -966,7 +978,7 @@ static int rds_ib_rdma_build_fastreg(struct ib_qp *qp, struct rds_ib_mapping *ma
> f_wr.send_flags = IB_SEND_SIGNALED;
>
> failed_wr = &f_wr;
> - ret = ib_post_send(qp, &f_wr, &failed_wr);
> + ret = ib_post_send(ibmr->cm_id->qp, &f_wr, &failed_wr);
> BUG_ON(failed_wr != &f_wr);
> if (ret) {
> printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
> @@ -983,7 +995,7 @@ static int rds_ib_rdma_fastreg_inv(struct rds_ib_mr *ibmr)
> struct ib_send_wr s_wr, *failed_wr;
> int ret = 0;
>
> - if (!ibmr->qp || !ibmr->fr_mr)
> + if (!ibmr->cm_id->qp || !ibmr->fr_mr)
> goto out;
>
> memset(&s_wr, 0, sizeof(s_wr));
> @@ -992,7 +1004,7 @@ static int rds_ib_rdma_fastreg_inv(struct rds_ib_mr *ibmr)
> s_wr.send_flags = IB_SEND_SIGNALED;
>
> failed_wr = &s_wr;
> - ret = ib_post_send(ibmr->qp, &s_wr, &failed_wr);
> + ret = ib_post_send(ibmr->cm_id->qp, &s_wr, &failed_wr);
> if (ret) {
> printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
> __func__, __LINE__, ret);
> @@ -1010,7 +1022,7 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
> struct rds_ib_device *rds_ibdev = pool->device;
> struct rds_ib_mapping *mapping = &ibmr->mapping;
> u64 *dma_pages;
> - int i, ret;
> + int i, ret = 0;
>
> rds_ib_set_scatterlist(&mapping->m_sg, sg, sg_len);
>
> @@ -1024,7 +1036,6 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
> }
>
> if (mapping->m_sg.dma_len > pool->max_message_size) {
> - printk("mapping->m_sg.dma_len > pool->max_message_size\n");
> ret = -EMSGSIZE;
> goto out;
> }
> @@ -1032,10 +1043,11 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
> for (i = 0; i < mapping->m_sg.dma_npages; ++i)
> ibmr->fr_page_list->page_list[i] = dma_pages[i];
>
> - rds_ib_rdma_build_fastreg(ibmr->qp, mapping);
> + ret = rds_ib_rdma_build_fastreg(mapping);
> + if (ret)
> + goto out;
>
> rds_ib_stats_inc(s_ib_rdma_mr_used);
> - ret = 0;
>
> out:
> kfree(dma_pages);
> @@ -1050,11 +1062,14 @@ static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool,
> struct rds_ib_mr *ibmr)
> {
> unsigned long flags;
> + int ret;
>
> if (!ibmr->mapping.m_sg.dma_len)
> return;
>
> - rds_ib_rdma_fastreg_inv(ibmr);
> + ret = rds_ib_rdma_fastreg_inv(ibmr);
> + if (ret)
> + return;
>
> /* Try to post the LOCAL_INV WR to the queue. */
> spin_lock_irqsave(&pool->list_lock, flags);
> diff --git a/drivers/infiniband/ulp/rds/iw_send.c b/drivers/infiniband/ulp/rds/iw_send.c
> index 2f0c1fb..c67a7be 100644
> --- a/drivers/infiniband/ulp/rds/iw_send.c
> +++ b/drivers/infiniband/ulp/rds/iw_send.c
> @@ -154,13 +154,13 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
> sge->lkey = 0;
>
> if (ic->i_iwarp) {
> - send->s_mr = ib_alloc_fast_reg_mr(ic->i_pd, fmr_message_size);
> + send->s_mr = ib_alloc_fast_reg_mr(ic->i_pd, fastreg_message_size);
> if (IS_ERR(send->s_mr)) {
> printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_mr failed\n");
> break;
> }
>
> - send->s_page_list = ib_alloc_fast_reg_page_list(ic->i_cm_id->device, RDS_IB_MAX_SGE);
> + send->s_page_list = ib_alloc_fast_reg_page_list(ic->i_cm_id->device, fastreg_message_size);
> if (IS_ERR(send->s_page_list)) {
> printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_page_list failed\n");
> break;
> @@ -206,7 +206,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
> struct rds_ib_send_work *send;
> u32 completed;
> u32 oldest;
> - u32 i = 0;
> + u32 i;
> int ret;
>
> rdsdebug("cq %p conn %p\n", cq, conn);
> @@ -897,17 +897,18 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
>
> for (j = 0; j < send->s_wr.num_sge && scat != &op->r_sg[op->r_count]; j++) {
> len = sg_dma_len(scat);
> - send->s_sge[j].addr = sg_dma_address(scat);
> - send->s_sge[j].length = len;
>
> if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV)
> send->s_page_list->page_list[j] = sg_dma_address(scat);
> - else
> + else {
> + send->s_sge[j].addr = sg_dma_address(scat);
> + send->s_sge[j].length = len;
> send->s_sge[j].lkey = rds_ib_local_dma_lkey(ic);
> + }
>
> sent += len;
> rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
> - remote_addr += sg_dma_len(scat);
> + remote_addr += len;
>
> scat++;
> }
> @@ -926,23 +927,24 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
> send = ic->i_sends;
> }
>
> + /* if we finished the message then send completion owns it */
> + if (scat == &op->r_sg[op->r_count])
> + prev->s_wr.send_flags = IB_SEND_SIGNALED;
> +
> + if (i < work_alloc) {
> + rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
> + work_alloc = i;
> + }
> +
> /* On iWARP, local memory access by a remote system (ie, RDMA Read) is not
> * recommended. Putting the lkey on the wire is a security hole, as it can
> * allow for memory access to all of memory on the remote system. Some
> * adapters do not allow using the lkey for this at all. To bypass this use a
> * fastreg_mr (or possibly a dma_mr)
> */
> - if (!op->r_write && ic->i_iwarp)
> + if (!op->r_write && ic->i_iwarp) {
> rds_ib_build_send_fastreg(rds_ibdev, ic, &ic->i_sends[fr_pos], op->r_count, sent, conn->c_xmit_rm->m_rs->rs_user_addr);
> -
> - /* if we finished the message then send completion owns it */
> - if (scat == &op->r_sg[op->r_count]) {
> - prev->s_wr.send_flags = IB_SEND_SIGNALED;
> - }
> -
> - if (i < work_alloc) {
> - rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
> - work_alloc = i;
> + work_alloc++;
> }
>
> failed_wr = &first->s_wr;
> diff --git a/drivers/infiniband/ulp/rds/send.c b/drivers/infiniband/ulp/rds/send.c
> index 3a9247e..ae81698 100644
> --- a/drivers/infiniband/ulp/rds/send.c
> +++ b/drivers/infiniband/ulp/rds/send.c
> @@ -127,6 +127,7 @@ int rds_send_xmit(struct rds_connection *conn)
> */
> if (down_trylock(&conn->c_send_sem)) {
> rds_stats_inc(s_send_sem_contention);
> + ret = -ENOMEM;
> goto out;
> }
>
>
> _______________________________________________
> rds-devel mailing list
> rds-devel at oss.oracle.com
> http://oss.oracle.com/mailman/listinfo/rds-devel
>
More information about the rds-devel
mailing list