[rds-devel] [RFC] rds: iWARP fastreg/RDMA support
Jon Mason
jon at opengridcomputing.com
Wed Aug 20 15:14:26 PDT 2008
iWARP zcopy is working!
I have been able to hack Olaf's future-20080715 git tree to have
rudimentary fastreg/RDMA support. It passes a 10 minute rds-stress w/
RDMA enabled test, but in no way is ready for "prime time".
I have it generating a rkey/cookie when get_mr is being called, and
registering the fastreg_mr prior to the first send on the qp. I realize
that this needs to be reworked to have the mr registered prior to
completing the get_mr request (per a prior discussion in another thread).
After discussing it with Steve, I have a few ideas on how to implement
this.
I hope to have a working implementation of the revised code soon.
Thanks,
Jon
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 13eb1f2..80ac6f3 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -74,6 +74,7 @@ struct rds_ib_mapping {
struct list_head m_list;
struct rds_ib_mr * m_mr;
uint32_t m_rkey;
+ uint32_t m_prev_rkey;
struct rds_ib_scatterlist m_sg;
};
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index ee473ca..69357af 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -95,13 +95,6 @@ struct rds_ib_mr_pool {
unsigned long max_free_pinned;
struct ib_fmr_attr fmr_attr;
- /* Dummy QP used to handle invalidate for fastreg */
- struct {
- struct ib_qp *qp;
- struct rds_ib_inv_wr *send_wrs;
- struct rds_ib_work_ring send_ring, recv_ring;
- } fastreg;
-
struct rds_ib_mr_pool_ops *op;
};
@@ -139,7 +132,6 @@ static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
static int rds_ib_local_invalidate(struct rds_ib_mr_pool *pool,
struct rds_ib_mapping *mapping);
-static void rds_ib_inval_cq_handler(struct ib_cq *cq, void *context);
static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
.init = rds_ib_init_fmr,
@@ -393,23 +385,8 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
pool_size,
&rds_ib_fastreg_pool_ops);
- if (!IS_ERR(pool)) {
- /* Fill in the blanks:
- * create a dummy QP to which we can post LOCAL_INV
- * requests when invalidating MRs
- */
- rds_ib_ring_init(&pool->fastreg.send_ring, 64);
- rds_ib_ring_init(&pool->fastreg.recv_ring, 64);
- pool->fastreg.qp = rds_ib_create_qp(rds_ibdev,
- &pool->fastreg.send_ring,
- rds_ib_inval_cq_handler,
- &pool->fastreg.recv_ring,
- NULL,
- pool);
-
- if (IS_ERR(pool->fastreg.qp))
- BUG(); /* FIXME handle gracefully */
- /* FIXME allocate pool->fasteg.send_wrs */
+ if (IS_ERR(pool)) {
+ printk("__rds_ib_create_mr_pool error\n");
}
}
@@ -430,10 +407,6 @@ void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
rds_ib_flush_mr_pool(pool, 1);
BUG_ON(atomic_read(&pool->item_count));
BUG_ON(atomic_read(&pool->free_pinned));
-
- if (pool->fastreg.qp)
- ib_destroy_qp(pool->fastreg.qp);
-
kfree(pool);
}
@@ -697,7 +670,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
ret = pool->op->map(pool, ibmr, sg, nents);
if (ret == 0)
- *key_ret = ibmr->u.fmr->rkey;
+ *key_ret = rds_ibdev->dev->node_type == RDMA_NODE_RNIC ? ibmr->fr_mr->rkey : ibmr->u.fmr->rkey;
else
printk(KERN_WARNING "RDS/IB: failed to map mr (errno=%d)\n", ret);
@@ -877,7 +850,7 @@ static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
if (IS_ERR(mr)) {
err = PTR_ERR(mr);
- printk(KERN_WARNING "RDS/IWARP: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
+ printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
return err;
}
@@ -885,7 +858,7 @@ static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
if (IS_ERR(page_list)) {
err = PTR_ERR(page_list);
- printk(KERN_WARNING "RDS/iWARP: ib_alloc_fast_reg_page_list failed (err=%d)\n", err);
+ printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_page_list failed (err=%d)\n", err);
ib_dereg_mr(mr);
return err;
}
@@ -908,6 +881,8 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
rds_ib_set_scatterlist(&mapping->m_sg, sg, sg_len);
+ ibmr->fr_page_shift = rds_ibdev->fmr_page_shift; /* XXX really? */
+
dma_pages = rds_ib_map_scatterlist(rds_ibdev,
&mapping->m_sg,
ibmr->fr_page_shift);
@@ -923,11 +898,11 @@ static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
}
ibmr->fr_page_list_len = mapping->m_sg.dma_len;
- ibmr->fr_page_shift = rds_ibdev->fmr_page_shift; /* XXX really? */
for (i = 0; i < mapping->m_sg.dma_npages; ++i)
ibmr->fr_page_list->page_list[i] = dma_pages[i];
+ mapping->m_prev_rkey = ibmr->fr_mr->rkey;
ib_update_fast_reg_key(ibmr->fr_mr, ibmr->remap_count++);
mapping->m_rkey = ibmr->fr_mr->rkey;
@@ -948,95 +923,19 @@ out:
static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool,
struct rds_ib_mr *ibmr)
{
- struct rds_ib_mapping *mapping = NULL;
unsigned long flags;
- int state;
if (!ibmr->mapping.m_sg.dma_len)
return;
- /* FIXME the locking here is fishy */
-
- state = ibmr->mapping.m_state;
- switch (state) {
- case RDS_IB_MAP_CLEAR:
- printk(KERN_NOTICE "RDS/IB: trying to free clear MR?\n");
- return;
-
- case RDS_IB_MAP_PENDING:
- /* This MR was created but never mapped */
- /* FIXME move to different func */
- rds_ib_local_inv_complete(pool, &ibmr->mapping, IB_WC_SUCCESS);
-
- spin_lock_irqsave(&pool->list_lock, flags);
- list_add(&mapping->m_list, &pool->clean_list);
- spin_unlock_irqrestore(&pool->list_lock, flags);
- return;
-
- case RDS_IB_MAP_MAPPING:
- /* Ouch - this is still on the send queue.
- * We have to wait for the mapping to complete before
- * we can queue the unmap */
- break;
-
- case RDS_IB_MAP_ACTIVE:
- /* We have an active mapping. Try to create a clone of
- * the mapping object, which we can enqueue for invalidation
- * while the MR is ready for immediate reuse.
- * If the kzalloc fails, we have to put the ibmr->mapping
- * on the clean list - which isn't that bad, it just
- * means we can't reuse the MR while the invalidate is in
- * progress. */
- mapping = kzalloc(sizeof(*mapping), GFP_KERNEL);
- if (mapping != NULL) {
- /* Create a clone of the mapping, and clear up
- * the ibmr's mapping state.
- * We set m_mr pointer to NULL to indicate that this
- * is a transient object that should be freed when the
- * WR completes. */
- *mapping = ibmr->mapping;
- mapping->m_mr = NULL;
-
- ibmr->mapping.m_state = RDS_IB_MAP_CLEAR;
- memset(&ibmr->mapping.m_sg, 0, sizeof(struct rds_ib_scatterlist));
- ibmr->mapping.m_rkey = 0;
- }
- break;
-
- case RDS_IB_MAP_UNMAPPING:
- printk(KERN_NOTICE "RDS/IB: trying to free MR while being unmapped?\n");
- BUG();
-
- }
-
- if (mapping == NULL) {
- /* Create a clone of the mapping, and clear up
- * the ibmr's mapping state.
- * We set m_mr pointer to NULL to indicate that this
- * is a transient object that should be freed when the
- * WR completes. */
- *mapping = ibmr->mapping;
- mapping->m_mr = NULL;
-
- ibmr->mapping.m_state = RDS_IB_MAP_CLEAR;
- memset(&ibmr->mapping.m_sg, 0, sizeof(struct rds_ib_scatterlist));
- ibmr->mapping.m_rkey = 0;
- } else {
- /* Go the long way and trash this MR synchronously.
- * Note that this mapping object's mr_mr points back to
- * the container MR.
- */
- mapping = &ibmr->mapping;
- }
-
/* Try to post the LOCAL_INV WR to the queue. */
spin_lock_irqsave(&pool->list_lock, flags);
- if (state == RDS_IB_MAP_ACTIVE
- && rds_ib_local_invalidate(pool, mapping)) {
- list_add(&mapping->m_list, &pool->laundry_list);
- } else {
- list_add(&mapping->m_list, &pool->dirty_list);
- }
+
+ ibmr->mapping.m_state = RDS_IB_MAP_CLEAR;
+ list_add_tail(&ibmr->mapping.m_list, &pool->dirty_list);
+ atomic_add(ibmr->mapping.m_sg.len, &pool->free_pinned);
+ atomic_inc(&pool->dirty_count);
+
spin_unlock_irqrestore(&pool->list_lock, flags);
}
@@ -1053,7 +952,8 @@ struct rds_ib_mapping *rds_ib_rdma_get_mapping(struct rds_mr *mr)
/* Okay, we should register the mapping now.
* Set map_seq so the flush worker knows whether a
* mapping is newer */
- ibmr->map_seq = atomic_read(&ibmr->pool->flush_seq);
+ if (ibmr->pool)
+ ibmr->map_seq = atomic_read(&ibmr->pool->flush_seq);
return mapping;
case RDS_IB_MAP_MAPPING:
@@ -1077,33 +977,18 @@ struct rds_ib_mapping *rds_ib_rdma_get_mapping(struct rds_mr *mr)
*/
int rds_ib_local_invalidate(struct rds_ib_mr_pool *pool, struct rds_ib_mapping *mapping)
{
- struct rds_ib_inv_wr *inval;
- struct ib_send_wr *failed_wr;
unsigned long flags;
- u32 pos;
- int ret;
-
- if (!rds_ib_ring_alloc(&pool->fastreg.send_ring, 1, &pos))
- return 0;
-
- inval = &pool->fastreg.send_wrs[pos];
-
- memset(inval, 0, sizeof(*inval));
- inval->i_wr.wr_id = pos;
- inval->i_wr.opcode = IB_WR_LOCAL_INV;
- inval->i_wr.ex.invalidate_rkey = mapping->m_rkey;
- inval->i_mapping = mapping;
+ /*FIXME - potential problem. We are invalidating the fastreg prior to mapping
+ * it, but it should be done on the qp being used. Unfortunately, we cannot
+ * get to there from here. So, lie to the state machine for now, as the
+ * mapping will be invalidated eventually.
+ */
spin_lock_irqsave(&mapping->m_lock, flags);
- ret = ib_post_send(pool->fastreg.qp, &inval->i_wr, &failed_wr);
- if (ret == 0) {
- mapping->m_state = RDS_IB_MAP_UNMAPPING;
- } else {
- rds_ib_ring_unalloc(&pool->fastreg.send_ring, 1);
- }
+ mapping->m_state = RDS_IB_MAP_UNMAPPING;
spin_unlock_irqrestore(&mapping->m_lock, flags);
- return ret == 0;
+ return 1;
}
void rds_ib_local_inv_complete(struct rds_ib_mr_pool *pool,
@@ -1126,23 +1011,6 @@ void rds_ib_local_inv_complete(struct rds_ib_mr_pool *pool,
}
}
-static void rds_ib_inval_cq_handler(struct ib_cq *cq, void *context)
-{
- struct rds_ib_mr_pool *pool = context;
- struct ib_wc wc;
-
- ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
- while (ib_poll_cq(cq, 1, &wc) > 0) {
- struct rds_ib_inv_wr *wr;
-
- wr = &pool->fastreg.send_wrs[wc.wr_id];
- rds_ib_local_inv_complete(pool, wr->i_mapping, wc.status);
- }
-
- if (waitqueue_active(&pool->flush_waitq))
- wake_up(&pool->flush_waitq);
-}
-
void rds_ib_rdma_build_fastreg(struct ib_send_wr *wr, struct rds_ib_mapping *mapping)
{
struct rds_ib_mr *ibmr = mapping->m_mr;
@@ -1156,6 +1024,7 @@ void rds_ib_rdma_build_fastreg(struct ib_send_wr *wr, struct rds_ib_mapping *map
wr->wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
IB_ACCESS_REMOTE_READ |
IB_ACCESS_REMOTE_WRITE;
+ wr->wr.fast_reg.iova_start = 0;
}
void rds_ib_fast_reg_complete(struct rds_ib_mapping *mapping, int status)
@@ -1206,9 +1075,6 @@ static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
while (!list_empty(unmap_list)) {
unsigned long flags;
- wait_event(pool->flush_waitq,
- rds_ib_ring_empty(&pool->fastreg.send_ring));
-
spin_lock_irqsave(&pool->list_lock, flags);
list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
switch (mapping->m_state) {
@@ -1231,8 +1097,8 @@ static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
case RDS_IB_MAP_UNMAPPING:
/* We already submitted a local_inv request
* for this. */
- continue;
-
+ rds_ib_local_inv_complete(pool, mapping, IB_WC_SUCCESS);
+ break;
}
}
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 6d4e99d..a5b63b1 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -137,6 +137,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
send->s_op = NULL;
send->s_mapping = NULL;
+ send->s_wr.next = NULL;
send->s_wr.wr_id = i;
send->s_wr.sg_list = send->s_sge;
send->s_wr.num_sge = 1;
@@ -195,7 +196,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
}
- while (ib_poll_cq(cq, 1, &wc) > 0 ) {
+ while (ib_poll_cq(cq, 1, &wc) > 0) {
rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
(unsigned long long)wc.wr_id, wc.status, wc.byte_len,
be32_to_cpu(wc.ex.imm_data));
@@ -221,6 +222,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
if (send->s_rm)
rds_ib_send_unmap_rm(ic, send, wc.status);
break;
+ case IB_WR_LOCAL_INV:
case IB_WR_RDMA_WRITE:
case IB_WR_RDMA_READ:
/* Nothing to be done - the SG list will be unmapped
@@ -233,8 +235,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
default:
if (printk_ratelimit())
printk(KERN_NOTICE
- "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
- __FUNCTION__, send->s_wr.opcode);
+ "RDS/IB: %s: unexpected opcode 0x%x in WR! %d\n",
+ __FUNCTION__, send->s_wr.opcode, wc.opcode);
break;
}
@@ -253,7 +255,6 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
if ((rm = rds_send_get_message(conn, send->s_op)) != NULL)
rds_ib_send_rdma_complete(rm, wc.status);
}
-
oldest = (oldest + 1) % ic->i_send_ring.w_nr;
}
@@ -866,7 +867,7 @@ int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
{
struct rds_ib_mapping *mapping;
struct rds_ib_connection *ic = conn->c_transport_data;
- struct rds_ib_send_work *send = NULL;
+ struct rds_ib_send_work *send1 = NULL, *send2 = NULL;
struct ib_send_wr *failed_wr;
u32 pos;
u32 work_alloc = 0;
@@ -885,31 +886,55 @@ int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
* inside the fast_reg_mr WR. The key used is a rolling 8bit
* counter, which should guarantee uniqueness.
*/
- work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos);
- if (work_alloc == 0) {
+ work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 2, &pos);
+ if (work_alloc < 2) {
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
rds_ib_stats_inc(s_ib_tx_ring_full);
ret = -ENOMEM;
goto out;
}
+ send1 = &ic->i_sends[pos];
+ send2 = &ic->i_sends[(pos+1) % ic->i_send_ring.w_nr];
+
+ send1->s_wr.opcode = IB_WR_LOCAL_INV;
+ send1->s_wr.ex.invalidate_rkey = mapping->m_prev_rkey;
+ send1->s_wr.send_flags = IB_SEND_SIGNALED;
+ send1->s_wr.next = NULL;
+ send1->s_wr.num_sge = 1;
+ send1->s_wr.sg_list = send1->s_sge;
+ send1->s_queued = jiffies;
+
+ rds_ib_rdma_build_fastreg(&send2->s_wr, mapping);
+ send2->s_wr.send_flags = IB_SEND_SIGNALED;
+ send2->s_wr.next = NULL;
+ send2->s_wr.num_sge = 1;
+ send2->s_wr.sg_list = send2->s_sge;
+ send2->s_mapping = mapping;
+ send2->s_queued = jiffies;
+
+ failed_wr = &send1->s_wr;
+ ret = ib_post_send(ic->i_cm_id->qp, &send1->s_wr, &failed_wr);
- send = &ic->i_sends[pos];
-
- memset(send, 0, sizeof(*send));
- rds_ib_rdma_build_fastreg(&send->s_wr, mapping);
- send->s_mapping = mapping;
- send->s_queued = jiffies;
+ rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
+ send1, &send1->s_wr, ret, failed_wr);
+ BUG_ON(failed_wr != &send1->s_wr);
+ if (ret) {
+ printk(KERN_WARNING "RDS/IB: %s %d ib_post_send to %u.%u.%u.%u "
+ "returned %d\n", __func__, __LINE__, NIPQUAD(conn->c_faddr), ret);
+ rds_ib_ring_unalloc(&ic->i_send_ring, 2);
+ return ret;
+ }
- failed_wr = &send->s_wr;
- ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
+ failed_wr = &send2->s_wr;
+ ret = ib_post_send(ic->i_cm_id->qp, &send2->s_wr, &failed_wr);
rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
- send, &send->s_wr, ret, failed_wr);
- BUG_ON(failed_wr != &send->s_wr);
+ send2, &send2->s_wr, ret, failed_wr);
+ BUG_ON(failed_wr != &send2->s_wr);
if (ret) {
- printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to %u.%u.%u.%u "
- "returned %d\n", NIPQUAD(conn->c_faddr), ret);
- rds_ib_ring_unalloc(&ic->i_send_ring, 1);
+ printk(KERN_WARNING "RDS/IB: %s %d ib_post_send to %u.%u.%u.%u "
+ "returned %d\n", __func__, __LINE__, NIPQUAD(conn->c_faddr), ret);
+ rds_ib_ring_unalloc(&ic->i_send_ring, 2);
return ret;
}
More information about the rds-devel
mailing list