[rds-devel] RDS RDMA implementation
Vladimir Sokolovsky
vlad at mellanox.co.il
Tue Sep 4 08:17:05 PDT 2007
Hi,
Here is the drop of rdma_xmit implementation and bug fixes.
Please review.
RDS: Added rds_ib_xmit_rdma function
setup_qp: open QP with max available sge
Added r_key to rds_rdma_args structure
Set page dirty flag after RDMA READ operation
Use same PD and MR for all QPs per device
Signed-off-by: Vladimir Sokolovsky <vlad at mellanox.co.il>
diff --git a/net/rds/ib.c b/net/rds/ib.c
index 7548320..cd741ee 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -47,16 +47,16 @@ MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr
per HCA");
struct list_head rds_ib_devices;
-static void rds_ib_add_one(struct ib_device *device);
-static void rds_ib_remove_one(struct ib_device *device);
+void rds_ib_add_one(struct ib_device *device);
+void rds_ib_remove_one(struct ib_device *device);
-static struct ib_client rds_ib_client = {
+struct ib_client rds_ib_client = {
.name = "rds_ib",
.add = rds_ib_add_one,
.remove = rds_ib_remove_one
};
-static void rds_ib_add_one(struct ib_device *device)
+void rds_ib_add_one(struct ib_device *device)
{
struct rds_ib_device *rds_ibdev;
struct ib_device_attr *dev_attr;
@@ -77,6 +77,8 @@ static void rds_ib_add_one(struct ib_device *device)
spin_lock_init(&rds_ibdev->spinlock);
+ rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
+
rds_ibdev->fmr_page_shift = max(9, ffs(dev_attr->page_size_cap)
- 1);
rds_ibdev->fmr_page_size = 1 << rds_ibdev->fmr_page_shift;
rds_ibdev->fmr_page_mask = ~((u64) rds_ibdev->fmr_page_size -
1);
@@ -126,7 +128,7 @@ free_attr:
kfree(dev_attr);
}
-static void rds_ib_remove_one(struct ib_device *device)
+void rds_ib_remove_one(struct ib_device *device)
{
struct rds_ib_device *rds_ibdev;
struct rds_ib_ipaddr *i_ipaddr;
@@ -191,6 +193,7 @@ struct rds_transport rds_ib_transport = {
.laddr_check = rds_ib_laddr_check,
.xmit = rds_ib_xmit,
.xmit_cong_map = rds_ib_xmit_cong_map,
+ .xmit_rdma = rds_ib_xmit_rdma,
.recv = rds_ib_recv,
.conn_alloc = rds_ib_conn_alloc,
.conn_free = rds_ib_conn_free,
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 9ccefaa..bb66cf1 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -13,6 +13,8 @@
#define RDS_FMR_SIZE 256
#define RDS_FMR_POOL_SIZE 1024
+#define RDS_IB_MAX_SGE 32
+
/*
* IB posts RDS_FRAG_SIZE fragments of pages to the receive queues to
* try and minimize the amount of memory tied up both the device and
@@ -38,8 +40,9 @@ struct rds_ib_connect_private {
struct rds_ib_send_work {
struct rds_message *s_rm;
+ struct rds_rdma_op *s_op;
struct ib_send_wr s_wr;
- struct ib_sge s_sge[2];
+ struct ib_sge s_sge[RDS_IB_MAX_SGE];
};
struct rds_ib_recv_work {
@@ -118,6 +121,7 @@ struct rds_ib_device {
int fmr_page_shift;
int fmr_page_size;
u64 fmr_page_mask;
+ int max_sge;
spinlock_t spinlock;
};
@@ -142,6 +146,9 @@ extern struct workqueue_struct *rds_ib_wq;
/* ib.c */
extern struct rds_transport rds_ib_transport;
+extern void rds_ib_add_one(struct ib_device *device);
+extern void rds_ib_remove_one(struct ib_device *device);
+extern struct ib_client rds_ib_client;
/* ib_cm.c */
int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp);
@@ -191,6 +198,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection
*ic);
void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
int rds_ib_xmit_cong_map(struct rds_connection *conn,
struct rds_cong_map *map, unsigned long
offset);
+int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op
*op);
/* ib_stats.c */
RDS_DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 89e2c48..8e2324a 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -80,9 +80,12 @@ static int rds_ib_setup_qp(struct rds_connection
*conn)
{
struct rds_ib_connection *ic = conn->c_transport_data;
struct ib_qp_init_attr attr;
+ struct rds_ib_device *rds_ibdev;
int ret;
- ic->i_pd = ib_alloc_pd(ic->i_cm_id->device);
+ rds_ibdev = ib_get_client_data(ic->i_cm_id->device,
&rds_ib_client);
+
+ ic->i_pd = rds_ibdev->pd;
if (IS_ERR(ic->i_pd)) {
ret = PTR_ERR(ic->i_pd);
ic->i_pd = NULL;
@@ -90,7 +93,7 @@ static int rds_ib_setup_qp(struct rds_connection
*conn)
goto out;
}
- ic->i_mr = ib_get_dma_mr(ic->i_pd, IB_ACCESS_LOCAL_WRITE);
+ ic->i_mr = rds_ibdev->mr;
if (IS_ERR(ic->i_mr)) {
ret = PTR_ERR(ic->i_mr);
ic->i_mr = NULL;
@@ -139,8 +142,8 @@ static int rds_ib_setup_qp(struct rds_connection
*conn)
/* + 1 to allow for the single ack message */
attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1;
attr.cap.max_recv_wr = ic->i_recv_ring.w_nr + 1;
- attr.cap.max_send_sge = 2;
- attr.cap.max_recv_sge = 2;
+ attr.cap.max_send_sge = rds_ibdev->max_sge;
+ attr.cap.max_recv_sge = rds_ibdev->max_sge;
attr.sq_sig_type = IB_SIGNAL_REQ_WR;
attr.qp_type = IB_QPT_RC;
attr.send_cq = ic->i_send_cq;
@@ -221,6 +224,7 @@ static int rds_ib_cm_handle_connect(struct
rdma_cm_id *cm_id,
struct rds_connection *conn;
struct rds_ib_connection *ic;
struct rdma_conn_param conn_param;
+ struct rds_ib_device *rds_ibdev;
int ret;
rdsdebug("saddr %u.%u.%u.%u daddr %u.%u.%u.%u lguid 0x%llx fguid
"
@@ -287,8 +291,8 @@ static int rds_ib_cm_handle_connect(struct
rdma_cm_id *cm_id,
cm_id = NULL;
/* update ib_device with this local ipaddr */
-#warning "should this use ib_get_client_data() to get the
rds_ib_device?"
- ib_update_ipaddr_for_device(ic->i_cm_id->device, dp->dp_saddr);
+ rds_ibdev = ib_get_client_data(ic->i_cm_id->device,
&rds_ib_client);
+ ib_update_ipaddr_for_device(rds_ibdev, dp->dp_saddr);
out:
if (ret && cm_id)
@@ -504,10 +508,6 @@ void rds_ib_conn_shutdown(struct rds_connection
*conn)
ib_destroy_cq(ic->i_send_cq);
if (ic->i_recv_cq)
ib_destroy_cq(ic->i_recv_cq);
- if (ic->i_mr)
- ib_dereg_mr(ic->i_mr);
- if (ic->i_pd)
- ib_dealloc_pd(ic->i_pd);
rdma_destroy_id(ic->i_cm_id);
ic->i_cm_id = NULL;
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index b0b2ad6..3629212 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -36,6 +36,7 @@
#include <linux/dmapool.h>
#include "rds.h"
+#include "rdma.h"
#include "ib.h"
void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
@@ -56,6 +57,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection
*ic)
for(i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++,
send++) {
send->s_rm = NULL;
+ send->s_op = NULL;
send->s_wr.wr_id = i;
send->s_wr.sg_list = send->s_sge;
@@ -125,8 +127,23 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq,
void *context)
completed = rds_ib_ring_completed(&ic->i_send_ring,
wc.wr_id, oldest);
for (i = 0; i < completed; i++) {
- if (send->s_rm)
- rds_ib_send_unmap_rm(ic, send);
+ if (wc.opcode == IB_WC_SEND) {
+ if (send->s_rm)
+ rds_ib_send_unmap_rm(ic, send);
+ }
+ else if (wc.opcode == IB_WR_RDMA_WRITE) {
+ if (send->s_op)
+
dma_unmap_sg(ic->i_cm_id->device->dma_device,
+ send->s_op->r_sg,
send->s_op->r_nents,
+ DMA_TO_DEVICE);
+ }
+ else if (wc.opcode == IB_WR_RDMA_READ) {
+ if (send->s_op)
+
dma_unmap_sg(ic->i_cm_id->device->dma_device,
+ send->s_op->r_sg,
send->s_op->r_nents,
+ DMA_FROM_DEVICE);
+ }
+
send->s_wr.num_sge = 1;
if (++send ==
&ic->i_sends[ic->i_send_ring.w_nr])
send = ic->i_sends;
@@ -415,3 +432,138 @@ int rds_ib_xmit(struct rds_connection *conn,
struct rds_message *rm,
out:
return ret;
}
+
+int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op
*op)
+{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+ struct rds_ib_send_work *send = NULL;
+ struct rds_ib_send_work *first;
+ struct rds_ib_send_work *prev;
+ struct ib_send_wr *failed_wr;
+ struct rds_ib_device *rds_ibdev;
+ struct scatterlist *scat;
+ unsigned long len;
+ static u32 unsignaled_wrs_count = 0;
+ u64 remote_addr = op->remote_addr;
+ u32 pos;
+ u32 work_alloc;
+ u32 i;
+ u32 j;
+ int sent;
+ int ret;
+ int num_sge;
+
+ rds_ibdev = ib_get_client_data(ic->i_cm_id->device,
&rds_ib_client);
+
+ /* map the message the first time we see it */
+ op->r_count = dma_map_sg(ic->i_cm_id->device->dma_device,
+ op->r_sg, op->r_nents,
(op->r_write) ?
+ DMA_TO_DEVICE :
DMA_FROM_DEVICE);
+ rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->r_count);
+ if (op->r_count == 0) {
+ rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
+ ret = -ENOMEM; /* XXX ? */
+ goto out;
+ }
+
+ /*
+ * Instead of knowing how to return a partial rdma read/write we
insist that there
+ * be enough work requests to send the entire message.
+ */
+ if ( op->r_count < rds_ibdev->max_sge ) {
+ i = 1;
+ num_sge = op->r_count;
+ }
+ else {
+ i = ceil(rds_ibdev->max_sge, op->r_count);
+ num_sge = rds_ibdev->max_sge;
+ }
+
+ work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
+ if (work_alloc != i) {
+ rds_ib_stats_inc(s_ib_tx_ring_full);
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ send = &ic->i_sends[pos];
+ first = send;
+ prev = NULL;
+ scat = &op->r_sg[0];
+ sent = 0;
+
+ for ( i = 0; i < work_alloc && scat != &op->r_sg[op->r_count];
i++ ) {
+ ++unsignaled_wrs_count;
+ /*
+ * We want to delay signaling completions just enough to
get
+ * the batching benefits but not so much that we create
dead time on the wire.
+ */
+ if ( rds_ib_sysctl_max_unsig_wrs > 0 &&
unsignaled_wrs_count >= rds_ib_sysctl_max_unsig_wrs ) {
+ unsignaled_wrs_count = 0;
+ send->s_wr.send_flags = IB_SEND_SIGNALED |
IB_SEND_SOLICITED;
+ }
+ else
+ send->s_wr.send_flags = 0;
+
+ send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE :
IB_WR_RDMA_READ;
+ send->s_wr.wr.rdma.remote_addr = remote_addr;
+ send->s_wr.wr.rdma.rkey = op->r_key;
+
+ if (num_sge > rds_ibdev->max_sge) {
+ send->s_wr.num_sge = rds_ibdev->max_sge;
+ num_sge -= rds_ibdev->max_sge;
+ }
+ else
+ send->s_wr.num_sge = num_sge;
+
+ send->s_wr.next = NULL;
+
+ if (prev)
+ prev->s_wr.next = &send->s_wr;
+
+ for ( j = 0; j < num_sge && scat !=
&op->r_sg[op->r_count]; j++ ) {
+ len = sg_dma_len(scat);
+ send->s_sge[j].addr = sg_dma_address(scat);
+ send->s_sge[j].length = len;
+
+ sent += len;
+ rdsdebug("ic %p sent %d remote_addr %llu\n", ic,
sent, remote_addr);
+
+ remote_addr += sg_dma_len(scat);
+ scat++;
+ }
+
+ rdsdebug("send %p wr %p num_sge %u next %p\n", send,
+ &send->s_wr, send->s_wr.num_sge,
send->s_wr.next);
+
+ prev = send;
+ if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
+ send = ic->i_sends;
+ }
+
+ /* if we finished the message then send completion owns it */
+ if (scat == &op->r_sg[op->r_count]) {
+ prev->s_wr.send_flags = IB_SEND_SIGNALED |
IB_SEND_SOLICITED;
+ prev->s_op = op;
+ }
+
+ if (i < work_alloc) {
+ rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
+ work_alloc = i;
+ }
+
+ failed_wr = &first->s_wr;
+ ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
+ rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
+ first, &first->s_wr, ret, failed_wr);
+ BUG_ON(failed_wr != &first->s_wr);
+ if (ret) {
+ printk(KERN_WARNING "RDS/IB: rdma ib_post_send to
%u.%u.%u.%u "
+ "returned %d\n", NIPQUAD(conn->c_faddr), ret);
+ rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+ goto out;
+ }
+
+out:
+ return ret;
+}
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index ca181b0..f520f8a 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -65,6 +65,7 @@ struct rds_free_mr_args {
struct rds_rdma_args {
struct rds_iovec remote_vec;
+ u64 r_key;
u64 local_vec_addr;
u64 nr_local;
u64 flags;
@@ -256,7 +257,8 @@ int rds_get_mr(struct rds_sock *rs, char __user
*optval, int optlen)
ret = 0;
out:
kfree(pages);
- rds_mr_put(rs, mr);
+ if (mr)
+ rds_mr_put(rs, mr);
return ret;
}
@@ -264,8 +266,8 @@ int rds_free_mr(struct rds_sock *rs, char __user
*optval, int optlen)
{
struct rds_mr *mr;
struct rds_free_mr_args args;
- void *trans_private;
- int ret;
+ void *trans_private = NULL;
+ int ret = 0;
if (optlen != sizeof(struct rds_free_mr_args)) {
ret = -EINVAL;
@@ -301,8 +303,9 @@ int rds_free_mr(struct rds_sock *rs, char __user
*optval, int optlen)
args.flags & RDS_FREE_MR_ARGS_INVALIDATE ? 1 :
0,
mr->r_sg, mr->r_nents);
- rds_mr_put(rs, mr);
- ret = 0;
+ if (mr)
+ rds_mr_put(rs, mr);
+
out:
return ret;
}
@@ -316,7 +319,7 @@ int rds_barrier(struct rds_sock *rs, char __user
*optval, int optlen)
u64 next_seq;
if (optval) {
- if (optval != sizeof(__be32)) {
+ if (optlen != sizeof(__be32)) {
ret = -EINVAL;
goto out;
}
@@ -356,8 +359,11 @@ void rds_rdma_free_op(struct rds_rdma_op *ro)
{
unsigned int i;
- for (i = 0; i < ro->r_nents; i++)
+ for (i = 0; i < ro->r_nents; i++) {
+ if (!ro->r_write)
+ set_page_dirty_lock(ro->r_sg[i].page);
put_page(ro->r_sg[i].page);
+ }
kfree(ro);
}
@@ -426,6 +432,8 @@ static struct rds_rdma_op *rds_rdma_prepare(struct
rds_sock *rs,
}
op->r_write = args->flags & RDS_RDMA_ARGS_WRITE ? 1 : 0;
+ op->remote_addr = args->remote_vec.addr;
+ op->r_key = args->r_key;
nr_bytes = 0;
@@ -498,20 +506,27 @@ int rds_rdma_msghdr_parse(struct rds_sock *rs,
struct rds_message *rm,
struct msghdr *msg)
{
struct rds_rdma_op *op;
- struct cmsghdr *cmsg;
+ struct cmsghdr *cmsg;
int ret = -EINVAL;
- for (cmsg = CMSG_FIRSTHDR(msg);
- cmsg != NULL;
- cmsg = CMSG_NXTHDR((struct msghdr*)msg, cmsg)) {
- if (!CMSG_OK(msg, cmsg))
+ cmsg = CMSG_FIRSTHDR(msg);
+
+ /* Not an rdma header */
+ if (cmsg == NULL) {
+ ret = 0;
+ goto out;
+ }
+
+ for ( ; cmsg != NULL;
+ cmsg = CMSG_NXTHDR((struct msghdr*)msg, cmsg)) {
+ if (!CMSG_OK(msg, cmsg))
break;
- if (cmsg->cmsg_level != SOL_RDS)
- continue;
+ if (cmsg->cmsg_level != SOL_RDS)
+ continue;
if ((cmsg->cmsg_type != RDS_CMSG_RDMA_ARGS) ||
- (cmsg->cmsg_len != sizeof(struct rds_rdma_args)))
+ (cmsg->cmsg_len != sizeof(struct
rds_rdma_args)))
break;
op = rds_rdma_prepare(rs, CMSG_DATA(cmsg));
@@ -524,6 +539,7 @@ int rds_rdma_msghdr_parse(struct rds_sock *rs,
struct rds_message *rm,
break;
}
+out:
return ret;
}
diff --git a/net/rds/rdma.h b/net/rds/rdma.h
index 88b7de3..df55c6e 100644
--- a/net/rds/rdma.h
+++ b/net/rds/rdma.h
@@ -20,6 +20,7 @@ struct rds_mr {
struct rds_rdma_op {
u64 r_key;
+ u64 remote_addr;
unsigned r_write:1;
unsigned int r_nents;
unsigned int r_count;
More information about the rds-devel
mailing list