[rds-devel] RDS iWARP enablement
Jon Mason
jon at opengridcomputing.com
Tue Sep 16 09:31:04 PDT 2008
Hey Andy,
Per our conversation last week regarding your code migration from
net/rds to drivers/infiniband, I am sending you a quick tree dump of
what I currently have.
This patch contains all of the changes needed to get rds-rdma working on
iWARP (with a few FIXMEs left). This patch will apply to a stock
OFED-1.4 kernel and doesn't require any of the experimental patches that
Olaf had me on earlier. So, this should be much easier to
read/understand. Some of the changes in here are from Olaf's
experimental patches (giving credit where it is due). If there are any
questions, feel free to ask.
There are 2 major FIXMEs in the code, which I will be aiming to squash
sometime this week. They are removing dma_mrs for iWARP devices and
replacing them with a new fastreg_mr infrastructure, and allowing for
multiple rds connections (i.e., QPs) from the same host.
Let me know what you think.
Thanks,
Jon
diff --git a/net/rds/ib.c b/net/rds/ib.c
index c7fdc82..d5b20f9 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -42,11 +42,17 @@
unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
+unsigned int fastreg_pool_size = RDS_FASTREG_POOL_SIZE;
+unsigned int fastreg_message_size = RDS_FASTREG_SIZE + 1; /* +1 allows for unaligned MRs */
module_param(fmr_pool_size, int, 0444);
MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
module_param(fmr_message_size, int, 0444);
MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer");
+module_param(fastreg_pool_size, int, 0444);
+MODULE_PARM_DESC(fastreg_pool_size, " Max number of fastreg MRs per device");
+module_param(fastreg_message_size, int, 0444);
+MODULE_PARM_DESC(fastreg_message_size, " Max size of a RDMA transfer (fastreg MRs)");
struct list_head rds_ib_devices;
@@ -79,7 +85,14 @@ void rds_ib_add_one(struct ib_device *device)
spin_lock_init(&rds_ibdev->spinlock);
+#if IWARP_DMA_MR_REPLACEMENT
rds_ibdev->dma_local_lkey = !!(dev_attr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY);
+#else
+ /* FIXME - dmr_mr's do not work on iWARP. They need to be wholy replaced with
+ * fastreg_mr's. Until that task has been completed, simply disable dma_mr's
+ */
+ rds_ibdev->dma_local_lkey = 0;
+#endif
rds_ibdev->max_wrs = dev_attr->max_qp_wr;
rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
@@ -87,9 +100,7 @@ void rds_ib_add_one(struct ib_device *device)
rds_ibdev->fmr_page_size = 1 << rds_ibdev->fmr_page_shift;
rds_ibdev->fmr_page_mask = ~((u64) rds_ibdev->fmr_page_size - 1);
rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
- rds_ibdev->max_fmrs = dev_attr->max_fmr?
- min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
- fmr_pool_size;
+ rds_ibdev->max_fmrs = dev_attr->max_fmr;
rds_ibdev->dev = device;
rds_ibdev->pd = ib_alloc_pd(device);
@@ -112,13 +123,17 @@ void rds_ib_add_one(struct ib_device *device)
} else
rds_ibdev->mr = NULL;
+ /* Tell the RDMA code to use the fastreg API */
+ if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)
+ rds_ibdev->use_fastreg = 1;
+
rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
if (IS_ERR(rds_ibdev->mr_pool)) {
rds_ibdev->mr_pool = NULL;
goto err_mr;
}
- INIT_LIST_HEAD(&rds_ibdev->ipaddr_list);
+ INIT_LIST_HEAD(&rds_ibdev->cm_id_list);
list_add_tail(&rds_ibdev->list, &rds_ib_devices);
ib_set_client_data(device, &rds_ib_client, rds_ibdev);
@@ -139,21 +154,23 @@ free_attr:
void rds_ib_remove_one(struct ib_device *device)
{
struct rds_ib_device *rds_ibdev;
- struct rds_ib_ipaddr *i_ipaddr, *next;
+ struct rds_ib_cm_id *i_cm_id, *next;
rds_ibdev = ib_get_client_data(device, &rds_ib_client);
if (!rds_ibdev)
return;
- list_for_each_entry_safe(i_ipaddr, next, &rds_ibdev->ipaddr_list, list) {
- list_del(&i_ipaddr->list);
- kfree(i_ipaddr);
+ list_for_each_entry_safe(i_cm_id, next, &rds_ibdev->cm_id_list, list) {
+ list_del(&i_cm_id->list);
+ kfree(i_cm_id);
}
if (rds_ibdev->mr_pool)
rds_ib_destroy_mr_pool(rds_ibdev->mr_pool);
- ib_dereg_mr(rds_ibdev->mr);
+ if (rds_ibdev->mr)
+ ib_dereg_mr(rds_ibdev->mr);
+
ib_dealloc_pd(rds_ibdev->pd);
list_del(&rds_ibdev->list);
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 26c23cb..564bf39 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -8,7 +8,9 @@
#define RDS_IB_RESOLVE_TIMEOUT_MS 5000
#define RDS_FMR_SIZE 256
-#define RDS_FMR_POOL_SIZE 4096
+#define RDS_FMR_POOL_SIZE 2048
+#define RDS_FASTREG_SIZE 20
+#define RDS_FASTREG_POOL_SIZE 2048
#define RDS_IB_MAX_SGE 8
#define RDS_IB_RECV_SGE 2
@@ -49,9 +51,29 @@ struct rds_ib_connect_private {
__be32 dp_credit; /* non-zero enables flow ctl */
};
+struct rds_ib_scatterlist {
+ struct scatterlist * list;
+ unsigned int len;
+ int dma_len;
+ unsigned int dma_npages;
+ unsigned int bytes;
+};
+
+struct rds_ib_mapping {
+ spinlock_t m_lock;
+ struct list_head m_list;
+ struct rds_ib_mr * m_mr;
+ uint32_t m_rkey;
+ struct rds_ib_scatterlist m_sg;
+};
+
struct rds_ib_send_work {
struct rds_message *s_rm;
+
+ /* We should really put these into a union: */
struct rds_rdma_op *s_op;
+ struct rds_ib_mapping *s_mapping;
+
struct ib_send_wr s_wr;
struct ib_sge s_sge[RDS_IB_MAX_SGE];
unsigned long s_queued;
@@ -127,7 +149,7 @@ struct rds_ib_connection {
i_iwarp : 1, /* this is actually iWARP not IB */
i_fastreg : 1, /* device supports fastreg */
i_dma_local_lkey : 1;
-
+ unsigned int fastreg_posted;
/* Batched completions */
unsigned int i_unsignaled_wrs;
long i_unsignaled_bytes;
@@ -139,14 +161,14 @@ struct rds_ib_connection {
#define IB_SET_SEND_CREDITS(v) ((v) & 0xffff)
#define IB_SET_POST_CREDITS(v) ((v) << 16)
-struct rds_ib_ipaddr {
+struct rds_ib_cm_id {
struct list_head list;
- __be32 ipaddr;
+ struct rdma_cm_id *cm_id;
};
struct rds_ib_device {
struct list_head list;
- struct list_head ipaddr_list;
+ struct list_head cm_id_list;
struct ib_device *dev;
struct ib_pd *pd;
struct ib_mr *mr;
@@ -247,6 +269,8 @@ extern struct ib_client rds_ib_client;
extern unsigned int fmr_pool_size;
extern unsigned int fmr_message_size;
+extern unsigned int fastreg_pool_size;
+extern unsigned int fastreg_message_size;
/* ib_cm.c */
int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp);
@@ -262,12 +286,12 @@ void __rds_ib_conn_error(struct rds_connection *conn, const char *, ...);
__rds_ib_conn_error(conn, KERN_WARNING "RDS/IB: " fmt )
/* ib_rdma.c */
-int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
+int ib_update_cm_id_for_device(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_id);
struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_ib_connection *iinfo);
void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
- __be32 ip_addr, u32 *key_ret);
+ struct rds_sock *rs, u32 *key_ret);
void rds_ib_sync_mr(void *trans_private, int dir);
void rds_ib_free_mr(void *trans_private, int invalidate);
void rds_ib_flush_mrs(void);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 33d2b08..c08dff8 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -32,7 +32,6 @@
*/
#include <linux/kernel.h>
#include <linux/in.h>
-#include <linux/vmalloc.h>
#include "rds.h"
#include "ib.h"
@@ -140,7 +139,7 @@ static void rds_ib_connect_complete(struct rds_connection *conn, struct rdma_cm_
/* update ib_device with this local ipaddr */
rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
- ib_update_ipaddr_for_device(rds_ibdev, conn->c_laddr);
+ ib_update_cm_id_for_device(rds_ibdev, ic->i_cm_id);
/* If the peer gave us the last packet it saw, process this as if
* we had received a regular ACK. */
@@ -206,7 +205,7 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
break;
default:
- printk(KERN_WARNING "RDS/ib: unhandled QP event %u "
+ printk(KERN_WARNING "RDS/IB: unhandled QP event %u "
"on connection to %u.%u.%u.%u\n", event->event,
NIPQUAD(conn->c_faddr));
break;
@@ -214,6 +213,79 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
}
/*
+ * Create a QP
+ */
+static int rds_ib_init_qp_attrs(struct ib_qp_init_attr *attr,
+ struct rds_ib_device *rds_ibdev,
+ struct rds_ib_work_ring *send_ring,
+ void (*send_cq_handler)(struct ib_cq *, void *),
+ struct rds_ib_work_ring *recv_ring,
+ void (*recv_cq_handler)(struct ib_cq *, void *),
+ void *context)
+{
+ struct ib_device *dev = rds_ibdev->dev;
+ unsigned int send_size, recv_size;
+ int ret;
+
+ /* The offset of 1 is to accomodate the additional ACK WR. */
+ send_size = min_t(unsigned int, rds_ibdev->max_wrs, rds_ib_sysctl_max_send_wr + 1);
+ recv_size = min_t(unsigned int, rds_ibdev->max_wrs, rds_ib_sysctl_max_recv_wr + 1);
+ rds_ib_ring_resize(send_ring, send_size - 1);
+ rds_ib_ring_resize(recv_ring, recv_size - 1);
+
+ memset(attr, 0, sizeof(*attr));
+ attr->event_handler = rds_ib_qp_event_handler;
+ attr->qp_context = context;
+ attr->cap.max_send_wr = send_size;
+ attr->cap.max_recv_wr = recv_size;
+ attr->cap.max_send_sge = rds_ibdev->max_sge;
+ attr->cap.max_recv_sge = RDS_IB_RECV_SGE;
+ attr->sq_sig_type = IB_SIGNAL_REQ_WR;
+ attr->qp_type = IB_QPT_RC;
+
+ attr->send_cq = ib_create_cq(dev, send_cq_handler,
+ rds_ib_cq_event_handler,
+ context, send_size, 0);
+ if (IS_ERR(attr->send_cq)) {
+ ret = PTR_ERR(attr->send_cq);
+ attr->send_cq = NULL;
+ rdsdebug("ib_create_cq send failed: %d\n", ret);
+ goto out;
+ }
+
+ attr->recv_cq = ib_create_cq(dev, recv_cq_handler,
+ rds_ib_cq_event_handler,
+ context, recv_size, 0);
+ if (IS_ERR(attr->recv_cq)) {
+ ret = PTR_ERR(attr->recv_cq);
+ attr->recv_cq = NULL;
+ rdsdebug("ib_create_cq send failed: %d\n", ret);
+ goto out;
+ }
+
+ ret = ib_req_notify_cq(attr->send_cq, IB_CQ_NEXT_COMP);
+ if (ret) {
+ rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
+ goto out;
+ }
+
+ ret = ib_req_notify_cq(attr->recv_cq, IB_CQ_SOLICITED);
+ if (ret) {
+ rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
+ goto out;
+ }
+
+out:
+ if (ret) {
+ if (attr->send_cq)
+ ib_destroy_cq(attr->send_cq);
+ if (attr->recv_cq)
+ ib_destroy_cq(attr->recv_cq);
+ }
+ return ret;
+}
+
+/*
* This needs to be very careful to not leave IS_ERR pointers around for
* cleanup to trip over.
*/
@@ -238,60 +310,19 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
return -EOPNOTSUPP;
}
- if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1)
- rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1);
- if (rds_ibdev->max_wrs < ic->i_recv_ring.w_nr + 1)
- rds_ib_ring_resize(&ic->i_recv_ring, rds_ibdev->max_wrs - 1);
-
/* Protection domain and memory range */
ic->i_pd = rds_ibdev->pd;
ic->i_mr = rds_ibdev->mr;
- ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
- rds_ib_cq_event_handler, conn,
- ic->i_send_ring.w_nr + 1, 0);
- if (IS_ERR(ic->i_send_cq)) {
- ret = PTR_ERR(ic->i_send_cq);
- ic->i_send_cq = NULL;
- rdsdebug("ib_create_cq send failed: %d\n", ret);
- goto out;
- }
-
- ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler,
- rds_ib_cq_event_handler, conn,
- ic->i_recv_ring.w_nr, 0);
- if (IS_ERR(ic->i_recv_cq)) {
- ret = PTR_ERR(ic->i_recv_cq);
- ic->i_recv_cq = NULL;
- rdsdebug("ib_create_cq recv failed: %d\n", ret);
- goto out;
- }
-
- ret = ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
- if (ret) {
- rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
- goto out;
- }
-
- ret = ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
- if (ret) {
- rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
+ ret = rds_ib_init_qp_attrs(&attr, rds_ibdev,
+ &ic->i_send_ring, rds_ib_send_cq_comp_handler,
+ &ic->i_recv_ring, rds_ib_recv_cq_comp_handler,
+ conn);
+ if (ret < 0)
goto out;
- }
- /* XXX negotiate max send/recv with remote? */
- memset(&attr, 0, sizeof(attr));
- attr.event_handler = rds_ib_qp_event_handler;
- attr.qp_context = conn;
- /* + 1 to allow for the single ack message */
- attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1;
- attr.cap.max_recv_wr = ic->i_recv_ring.w_nr + 1;
- attr.cap.max_send_sge = rds_ibdev->max_sge;
- attr.cap.max_recv_sge = RDS_IB_RECV_SGE;
- attr.sq_sig_type = IB_SIGNAL_REQ_WR;
- attr.qp_type = IB_QPT_RC;
- attr.send_cq = ic->i_send_cq;
- attr.recv_cq = ic->i_recv_cq;
+ ic->i_send_cq = attr.send_cq;
+ ic->i_recv_cq = attr.recv_cq;
/*
* XXX this can fail if max_*_wr is too large? Are we supposed
@@ -437,8 +468,7 @@ static int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
*/
if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
if (rds_conn_state(conn) == RDS_CONN_UP) {
- rdsdebug("incoming connect while connecting\n");
- rds_conn_drop(conn);
+ rds_ib_conn_error(conn, "incoming connect while connecting\n");
rds_ib_stats_inc(s_ib_listen_closed_stale);
} else
if (rds_conn_state(conn) == RDS_CONN_CONNECTING) {
@@ -480,8 +510,7 @@ static int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
goto out;
}
- /* update ib_device with this local ipaddr */
- ib_update_ipaddr_for_device(rds_ibdev, dp->dp_saddr);
+ ib_update_cm_id_for_device(rds_ibdev, cm_id);
return 0;
@@ -582,7 +611,6 @@ static int rds_ib_cm_event_handler(struct rdma_cm_id *cm_id,
case RDMA_CM_EVENT_UNREACHABLE:
case RDMA_CM_EVENT_REJECTED:
case RDMA_CM_EVENT_DEVICE_REMOVAL:
- case RDMA_CM_EVENT_ADDR_CHANGE:
if (conn)
rds_conn_drop(conn);
break;
@@ -693,7 +721,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
/* Actually this may happen quite frequently, when
* an outgoing connect raced with an incoming connect.
*/
- rdsdebug("rds_ib_conn_shutdown: failed to disconnect,"
+ printk(KERN_DEBUG "rds_ib_conn_shutdown: failed to disconnect,"
" cm: %p err %d\n", ic->i_cm_id, err);
}
@@ -835,7 +863,7 @@ int __init rds_ib_listen_init(void)
cm_id = rdma_create_id(rds_ib_cm_event_handler, NULL, RDMA_PS_TCP);
if (IS_ERR(cm_id)) {
ret = PTR_ERR(cm_id);
- printk(KERN_ERR "RDS/ib: failed to setup listener, "
+ printk(KERN_ERR "RDS/IB: failed to setup listener, "
"rdma_create_id() returned %d\n", ret);
goto out;
}
@@ -850,14 +878,14 @@ int __init rds_ib_listen_init(void)
*/
ret = rdma_bind_addr(cm_id, (struct sockaddr *)&sin);
if (ret) {
- printk(KERN_ERR "RDS/ib: failed to setup listener, "
+ printk(KERN_ERR "RDS/IB: failed to setup listener, "
"rdma_bind_addr() returned %d\n", ret);
goto out;
}
ret = rdma_listen(cm_id, 128);
if (ret) {
- printk(KERN_ERR "RDS/ib: failed to setup listener, "
+ printk(KERN_ERR "RDS/IB: failed to setup listener, "
"rdma_listen() returned %d\n", ret);
goto out;
}
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 5a0fba3..de732b5 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -45,105 +45,318 @@ extern struct list_head rds_ib_devices;
struct rds_ib_mr {
struct rds_ib_device *device;
struct rds_ib_mr_pool *pool;
- struct ib_fmr *fmr;
- struct list_head list;
- unsigned int remap_count;
- struct scatterlist * sg;
- unsigned int sg_len;
- u64 * dma;
- int sg_dma_len;
+ struct ib_qp *qp;
+
+ union {
+ struct ib_fmr *fmr;
+ /* fastreg stuff and maybe others go here */
+ struct {
+ struct ib_mr *mr;
+ struct ib_fast_reg_page_list *page_list;
+ } fastreg;
+ } u;
+ struct rds_ib_mapping mapping;
+ unsigned int remap_count;
};
+#define fr_mr u.fastreg.mr
+#define fr_page_list u.fastreg.page_list
+
/*
* Our own little FMR pool
*/
struct rds_ib_mr_pool {
+ struct rds_ib_device *device; /* back ptr to the device that owns us */
+
struct mutex flush_lock; /* serialize fmr invalidate */
struct work_struct flush_worker; /* flush worker */
spinlock_t list_lock; /* protect variables below */
atomic_t item_count; /* total # of MRs */
atomic_t dirty_count; /* # dirty of MRs */
- struct list_head drop_list; /* MRs that have reached their max_maps limit */
- struct list_head free_list; /* unused MRs */
+ struct list_head dirty_list; /* dirty mappings */
struct list_head clean_list; /* unused & unamapped MRs */
atomic_t free_pinned; /* memory pinned by free MRs */
+ unsigned long max_message_size; /* in pages */
unsigned long max_items;
unsigned long max_items_soft;
unsigned long max_free_pinned;
struct ib_fmr_attr fmr_attr;
+
+ struct rds_ib_mr_pool_ops *op;
};
+struct rds_ib_mr_pool_ops {
+ int (*init)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
+ int (*map)(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
+ struct scatterlist *sg, unsigned int sg_len);
+ void (*free)(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+ unsigned int (*unmap)(struct rds_ib_mr_pool *, struct list_head *,
+ struct list_head *);
+ void (*destroy)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
+};
+
+
static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
+static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr,
+ struct scatterlist *sg, unsigned int nents);
+static void rds_ib_free_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static unsigned int rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool,
+ struct list_head *unmap_list,
+ struct list_head *kill_list);
+static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr,
+ struct scatterlist *sg, unsigned int nents);
+static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
+ struct list_head *unmap_list,
+ struct list_head *kill_list);
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+
+static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
+ .init = rds_ib_init_fmr,
+ .map = rds_ib_map_fmr,
+ .free = rds_ib_free_fmr,
+ .unmap = rds_ib_unmap_fmr_list,
+ .destroy = rds_ib_destroy_fmr,
+};
+
+static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
+ .init = rds_ib_init_fastreg,
+ .map = rds_ib_map_fastreg,
+ .free = rds_ib_free_fastreg,
+ .unmap = rds_ib_unmap_fastreg_list,
+ .destroy = rds_ib_destroy_fastreg,
+};
-int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
+int ib_update_cm_id_for_device(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_id)
{
- struct rds_ib_ipaddr *i_ipaddr;
+ struct rds_ib_cm_id *i_cm_id;
spin_lock_irq(&rds_ibdev->spinlock);
- list_for_each_entry(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
- if (i_ipaddr->ipaddr == ipaddr) {
+ list_for_each_entry(i_cm_id, &rds_ibdev->cm_id_list, list) {
+ if (i_cm_id->cm_id == cm_id) {
spin_unlock_irq(&rds_ibdev->spinlock);
return 0;
}
}
spin_unlock_irq(&rds_ibdev->spinlock);
- i_ipaddr = kmalloc(sizeof *i_ipaddr, GFP_KERNEL);
- if (!i_ipaddr)
+ i_cm_id = kmalloc(sizeof *i_cm_id, GFP_KERNEL);
+ if (!i_cm_id)
return -ENOMEM;
- i_ipaddr->ipaddr = ipaddr;
+ i_cm_id->cm_id = cm_id;
spin_lock_irq(&rds_ibdev->spinlock);
- list_add_tail(&i_ipaddr->list, &rds_ibdev->ipaddr_list);
+ list_add_tail(&i_cm_id->list, &rds_ibdev->cm_id_list);
spin_unlock_irq(&rds_ibdev->spinlock);
return 0;
}
-struct rds_ib_device* ib_get_device(__be32 ipaddr)
+static int ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibdev, struct ib_qp **qp)
{
- struct rds_ib_device *rds_ibdev;
- struct rds_ib_ipaddr *i_ipaddr;
-
- list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
- spin_lock_irq(&rds_ibdev->spinlock);
- list_for_each_entry(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
- if (i_ipaddr->ipaddr == ipaddr) {
- spin_unlock_irq(&rds_ibdev->spinlock);
- return rds_ibdev;
+ struct rds_ib_device *ibdev;
+ struct rds_ib_cm_id *i_cm_id;
+
+ list_for_each_entry(ibdev, &rds_ib_devices, list) {
+ spin_lock_irq(&ibdev->spinlock);
+ list_for_each_entry(i_cm_id, &ibdev->cm_id_list, list) {
+ struct sockaddr_in *src_addr, *dst_addr;
+
+ src_addr = (struct sockaddr_in *)&i_cm_id->cm_id->route.addr.src_addr;
+ dst_addr = (struct sockaddr_in *)&i_cm_id->cm_id->route.addr.dst_addr;
+
+ rdsdebug("%s: local ipaddr = %x port %d, remote ipaddr = %x port %d"
+ "....looking for %x port %d, remote ipaddr = %x port %d\n",
+ __func__,
+ src_addr->sin_addr.s_addr,
+ src_addr->sin_port,
+ dst_addr->sin_addr.s_addr,
+ dst_addr->sin_port,
+ rs->rs_bound_addr,
+ rs->rs_bound_port,
+ rs->rs_conn_addr,
+ rs->rs_conn_port);
+#if 0
+ if (src_addr->sin_addr.s_addr == rs->rs_bound_addr &&
+ src_addr->sin_port == rs->rs_bound_port &&
+ dst_addr->sin_addr.s_addr == rs->rs_conn_addr &&
+ dst_addr->sin_port == rs->rs_conn_port) {
+#else
+ /* FIXME - needs to compare the local and remote ipaddr/port tuple, but the
+ * ipaddr is the only available infomation in the rds_sock (as the rest are
+ * zero'ed. It doesn't appear to be properly populated during connection
+ * setup...
+ */
+ if (src_addr->sin_addr.s_addr == rs->rs_bound_addr) {
+#endif
+ spin_unlock_irq(&ibdev->spinlock);
+ *rds_ibdev = ibdev;
+ *qp = i_cm_id->cm_id->qp;
+ return 0;
}
}
- spin_unlock_irq(&rds_ibdev->spinlock);
+ spin_unlock_irq(&ibdev->spinlock);
}
- return NULL;
+ return 1;
}
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+static void rds_ib_set_scatterlist(struct rds_ib_scatterlist *sg,
+ struct scatterlist *list, unsigned int sg_len)
{
- struct rds_ib_mr_pool *pool;
+ sg->list = list;
+ sg->len = sg_len;
+ sg->dma_len = 0;
+ sg->dma_npages = 0;
+ sg->bytes = 0;
+}
+
+static int rds_ib_drop_scatterlist(struct rds_ib_device *rds_ibdev,
+ struct rds_ib_scatterlist *sg)
+{
+ int unpinned = 0;
+
+ if (sg->dma_len) {
+ ib_dma_unmap_sg(rds_ibdev->dev,
+ sg->list, sg->len,
+ DMA_BIDIRECTIONAL);
+ sg->dma_len = 0;
+ }
+
+ /* Release the s/g list */
+ if (sg->len) {
+ unsigned int i;
+
+ for (i = 0; i < sg->len; ++i) {
+ struct page *page = sg_page(&sg->list[i]);
+
+ /* FIXME we need a way to tell a r/w MR
+ * from a r/o MR */
+ set_page_dirty(page);
+ put_page(page);
+ }
+
+ unpinned = sg->len;
+ sg->len = 0;
+
+ kfree(sg->list);
+ sg->list = NULL;
+ }
+
+ return unpinned;
+}
+
+static u64 *rds_ib_map_scatterlist(struct rds_ib_device *rds_ibdev,
+ struct rds_ib_scatterlist *sg,
+ unsigned int dma_page_shift)
+{
+ struct ib_device *dev = rds_ibdev->dev;
+ u64 *dma_pages = NULL;
+ u64 dma_mask;
+ unsigned int dma_page_size;
+ int i, j, ret;
- /* For now, disable all RDMA service on iWARP. This check will
- * go away when we have a working patch. */
- if (rds_ibdev->dev->node_type == RDMA_NODE_RNIC)
- return NULL;
+ dma_page_size = 1 << dma_page_shift;
+ dma_mask = dma_page_size - 1;
+
+ WARN_ON(sg->dma_len);
+
+ sg->dma_len = ib_dma_map_sg(dev, sg->list, sg->len, DMA_BIDIRECTIONAL);
+ if (unlikely(!sg->dma_len)) {
+ printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
+ return ERR_PTR(-EBUSY);
+ }
+
+ sg->bytes = 0;
+ sg->dma_npages = 0;
+
+ ret = -EINVAL;
+ for (i = 0; i < sg->dma_len; ++i) {
+ unsigned int dma_len = ib_sg_dma_len(dev, &sg->list[i]);
+ u64 dma_addr = ib_sg_dma_address(dev, &sg->list[i]);
+ u64 end_addr;
+
+ sg->bytes += dma_len;
+
+ end_addr = dma_addr + dma_len;
+ if (dma_addr & dma_mask) {
+ if (i > 0)
+ goto out_unmap;
+ dma_addr &= ~dma_mask;
+ }
+ if (end_addr & dma_mask) {
+ if (i < sg->dma_len - 1)
+ goto out_unmap;
+ end_addr = (end_addr + dma_mask) & ~dma_mask;
+ }
+
+ sg->dma_npages += (end_addr - dma_addr) >> dma_page_shift;
+ }
+
+ /* Now gather the dma addrs into one list */
+ if (sg->dma_npages > fmr_message_size)
+ goto out_unmap;
+
+ dma_pages = kmalloc(sizeof(u64) * sg->dma_npages, GFP_ATOMIC);
+ if (!dma_pages) {
+ ret = -ENOMEM;
+ goto out_unmap;
+ }
+
+ for (i = j = 0; i < sg->dma_len; ++i) {
+ unsigned int dma_len = ib_sg_dma_len(dev, &sg->list[i]);
+ u64 dma_addr = ib_sg_dma_address(dev, &sg->list[i]);
+ u64 end_addr;
+
+ end_addr = dma_addr + dma_len;
+ dma_addr &= ~dma_mask;
+ for (; dma_addr < end_addr; dma_addr += dma_page_size)
+ dma_pages[j++] = dma_addr;
+ }
+
+ return dma_pages;
+
+out_unmap:
+ ib_dma_unmap_sg(rds_ibdev->dev, sg->list, sg->len, DMA_BIDIRECTIONAL);
+ sg->dma_len = 0;
+ if (dma_pages)
+ kfree(dma_pages);
+ return ERR_PTR(ret);
+}
+
+
+static struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
+ unsigned int message_size, unsigned int pool_size,
+ struct rds_ib_mr_pool_ops *ops)
+{
+ struct rds_ib_mr_pool *pool;
pool = kzalloc(sizeof(*pool), GFP_KERNEL);
if (!pool)
return ERR_PTR(-ENOMEM);
- INIT_LIST_HEAD(&pool->free_list);
- INIT_LIST_HEAD(&pool->drop_list);
+ pool->op = ops;
+ pool->device = rds_ibdev;
+ INIT_LIST_HEAD(&pool->dirty_list);
INIT_LIST_HEAD(&pool->clean_list);
mutex_init(&pool->flush_lock);
spin_lock_init(&pool->list_lock);
INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
+ pool->max_message_size = message_size;
+ pool->max_items = pool_size;
+ pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
+
pool->fmr_attr.max_pages = fmr_message_size;
pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
@@ -154,8 +367,44 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
* items more aggressively.
* Make sure that max_items > max_items_soft > max_items / 2
*/
- pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
- pool->max_items = rds_ibdev->max_fmrs;
+ pool->max_items_soft = pool->max_items * 3 / 4;
+
+ return pool;
+}
+
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+{
+ struct rds_ib_mr_pool *pool;
+ unsigned int pool_size;
+
+ if (!rds_ibdev->use_fastreg) {
+ /* Use FMRs to implement memory registrations */
+ pool_size = fmr_pool_size;
+
+ if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
+ pool_size = rds_ibdev->max_fmrs;
+
+ pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
+ &rds_ib_fmr_pool_ops);
+
+ if (!IS_ERR(pool)) {
+ pool->fmr_attr.max_pages = pool->max_message_size;
+ pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
+ pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
+ }
+ } else {
+ /* Use fastregs to implement memory registrations */
+ pool_size = fastreg_pool_size;
+
+ pool = __rds_ib_create_mr_pool(rds_ibdev,
+ fastreg_message_size,
+ pool_size,
+ &rds_ib_fastreg_pool_ops);
+
+ if (IS_ERR(pool)) {
+ printk("__rds_ib_create_mr_pool error\n");
+ }
+ }
return pool;
}
@@ -184,8 +433,8 @@ static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool)
spin_lock_irqsave(&pool->list_lock, flags);
if (!list_empty(&pool->clean_list)) {
- ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, list);
- list_del_init(&ibmr->list);
+ ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, mapping.m_list);
+ list_del_init(&ibmr->mapping.m_list);
}
spin_unlock_irqrestore(&pool->list_lock, flags);
@@ -232,114 +481,26 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
goto out_no_cigar;
}
- ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
- (IB_ACCESS_LOCAL_WRITE |
- IB_ACCESS_REMOTE_READ |
- IB_ACCESS_REMOTE_WRITE),
- &pool->fmr_attr);
- if (IS_ERR(ibmr->fmr)) {
- err = PTR_ERR(ibmr->fmr);
- ibmr->fmr = NULL;
- printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
+ spin_lock_init(&ibmr->mapping.m_lock);
+ INIT_LIST_HEAD(&ibmr->mapping.m_list);
+ ibmr->mapping.m_mr = ibmr;
+
+ err = pool->op->init(pool, ibmr);
+ if (err)
goto out_no_cigar;
- }
rds_ib_stats_inc(s_ib_rdma_mr_alloc);
return ibmr;
out_no_cigar:
if (ibmr) {
- if (ibmr->fmr)
- ib_dealloc_fmr(ibmr->fmr);
+ pool->op->destroy(pool, ibmr);
kfree(ibmr);
}
atomic_dec(&pool->item_count);
return ERR_PTR(err);
}
-static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
- struct scatterlist *sg, unsigned int nents)
-{
- struct ib_device *dev = rds_ibdev->dev;
- struct scatterlist *scat = sg;
- u64 io_addr = 0;
- u64 *dma_pages;
- u32 len;
- int page_cnt, sg_dma_len;
- int i, j;
- int ret;
-
- sg_dma_len = ib_dma_map_sg(dev, sg, nents,
- DMA_BIDIRECTIONAL);
- if (unlikely(!sg_dma_len)) {
- printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
- return -EBUSY;
- }
-
- len = 0;
- page_cnt = 0;
-
- for (i = 0; i < sg_dma_len; ++i) {
- unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
- u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
-
- if (dma_addr & ~rds_ibdev->fmr_page_mask) {
- if (i > 0)
- return -EINVAL;
- else
- ++page_cnt;
- }
- if ((dma_addr + dma_len) & ~rds_ibdev->fmr_page_mask) {
- if (i < sg_dma_len - 1)
- return -EINVAL;
- else
- ++page_cnt;
- }
-
- len += dma_len;
- }
-
- page_cnt += len >> rds_ibdev->fmr_page_shift;
- if (page_cnt > fmr_message_size)
- return -EINVAL;
-
- dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC);
- if (!dma_pages)
- return -ENOMEM;
-
- page_cnt = 0;
- for (i = 0; i < sg_dma_len; ++i) {
- unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
- u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
-
- for (j = 0; j < dma_len; j += rds_ibdev->fmr_page_size)
- dma_pages[page_cnt++] =
- (dma_addr & rds_ibdev->fmr_page_mask) + j;
- }
-
- ret = ib_map_phys_fmr(ibmr->fmr,
- dma_pages, page_cnt, io_addr);
- if (ret)
- goto out;
-
- /* Success - we successfully remapped the MR, so we can
- * safely tear down the old mapping. */
- rds_ib_teardown_mr(ibmr);
-
- ibmr->sg = scat;
- ibmr->sg_len = nents;
- ibmr->sg_dma_len = sg_dma_len;
- ibmr->remap_count++;
-
- rds_ib_stats_inc(s_ib_rdma_mr_used);
- ret = 0;
-
-out:
- kfree(dma_pages);
-
- return ret;
-}
-
void rds_ib_sync_mr(void *trans_private, int direction)
{
struct rds_ib_mr *ibmr = trans_private;
@@ -347,51 +508,21 @@ void rds_ib_sync_mr(void *trans_private, int direction)
switch (direction) {
case DMA_FROM_DEVICE:
- ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,
- ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+ ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->mapping.m_sg.list,
+ ibmr->mapping.m_sg.dma_len, DMA_BIDIRECTIONAL);
break;
case DMA_TO_DEVICE:
- ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg,
- ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+ ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->mapping.m_sg.list,
+ ibmr->mapping.m_sg.dma_len, DMA_BIDIRECTIONAL);
break;
}
}
-static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
-{
- struct rds_ib_device *rds_ibdev = ibmr->device;
-
- if (ibmr->sg_dma_len) {
- ib_dma_unmap_sg(rds_ibdev->dev,
- ibmr->sg, ibmr->sg_len,
- DMA_BIDIRECTIONAL);
- ibmr->sg_dma_len = 0;
- }
-
- /* Release the s/g list */
- if (ibmr->sg_len) {
- unsigned int i;
-
- for (i = 0; i < ibmr->sg_len; ++i) {
- struct page *page = sg_page(&ibmr->sg[i]);
-
- /* FIXME we need a way to tell a r/w MR
- * from a r/o MR */
- set_page_dirty(page);
- put_page(page);
- }
- kfree(ibmr->sg);
-
- ibmr->sg = NULL;
- ibmr->sg_len = 0;
- }
-}
-
void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
{
- unsigned int pinned = ibmr->sg_len;
+ unsigned int pinned;
- __rds_ib_teardown_mr(ibmr);
+ pinned = rds_ib_drop_scatterlist(ibmr->device, &ibmr->mapping.m_sg);
if (pinned) {
struct rds_ib_device *rds_ibdev = ibmr->device;
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
@@ -424,8 +555,7 @@ int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
{
struct rds_ib_mr *ibmr, *next;
LIST_HEAD(unmap_list);
- LIST_HEAD(fmr_list);
- unsigned long unpinned = 0;
+ LIST_HEAD(kill_list);
unsigned long flags;
unsigned int nfreed = 0, ncleaned = 0, free_goal;
int ret = 0;
@@ -435,49 +565,50 @@ int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
mutex_lock(&pool->flush_lock);
spin_lock_irqsave(&pool->list_lock, flags);
- /* Get the list of all MRs to be dropped. Ordering matters -
- * we want to put drop_list ahead of free_list. */
- list_splice_init(&pool->free_list, &unmap_list);
- list_splice_init(&pool->drop_list, &unmap_list);
+ /* Get the list of all mappings to be destroyed */
+ list_splice_init(&pool->dirty_list, &unmap_list);
if (free_all)
- list_splice_init(&pool->clean_list, &unmap_list);
+ list_splice_init(&pool->clean_list, &kill_list);
spin_unlock_irqrestore(&pool->list_lock, flags);
free_goal = rds_ib_flush_goal(pool, free_all);
- if (list_empty(&unmap_list))
- goto out;
-
- /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
- list_for_each_entry(ibmr, &unmap_list, list)
- list_add(&ibmr->fmr->list, &fmr_list);
- ret = ib_unmap_fmr(&fmr_list);
- if (ret)
- printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+ /* Batched invalidate of dirty MRs.
+ * For FMR based MRs, the mappings on the unmap list are
+ * actually members of an ibmr (ibmr->mapping). They either
+ * migrate to the kill_list, or have been cleaned and should be
+ * moved to the clean_list.
+ * For fastregs, they will be dynamically allocated, and
+ * will be destroyed by the unmap function.
+ */
+ if (!list_empty(&unmap_list)) {
+ ncleaned = pool->op->unmap(pool, &unmap_list, &kill_list);
+ /* If we've been asked to destroy all MRs, move those
+ * that were simply cleaned to the kill list */
+ if (free_all)
+ list_splice_init(&unmap_list, &kill_list);
+ }
- /* Now we can destroy the DMA mapping and unpin any pages */
- list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
- unpinned += ibmr->sg_len;
- __rds_ib_teardown_mr(ibmr);
- if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
- rds_ib_stats_inc(s_ib_rdma_mr_free);
- list_del(&ibmr->list);
- ib_dealloc_fmr(ibmr->fmr);
- kfree(ibmr);
- nfreed++;
- }
- ncleaned++;
+ /* Destroy any MRs that are past their best before date */
+ list_for_each_entry_safe(ibmr, next, &kill_list, mapping.m_list) {
+ rds_ib_stats_inc(s_ib_rdma_mr_free);
+ list_del(&ibmr->mapping.m_list);
+ pool->op->destroy(pool, ibmr);
+ kfree(ibmr);
+ nfreed++;
}
- spin_lock_irqsave(&pool->list_lock, flags);
- list_splice(&unmap_list, &pool->clean_list);
- spin_unlock_irqrestore(&pool->list_lock, flags);
+ /* Anything that remains are laundered ibmrs, which we can add
+ * back to the clean list. */
+ if (!list_empty(&unmap_list)) {
+ spin_lock_irqsave(&pool->list_lock, flags);
+ list_splice(&unmap_list, &pool->clean_list);
+ spin_unlock_irqrestore(&pool->list_lock, flags);
+ }
- atomic_sub(unpinned, &pool->free_pinned);
atomic_sub(ncleaned, &pool->dirty_count);
atomic_sub(nfreed, &pool->item_count);
-out:
mutex_unlock(&pool->flush_lock);
return ret;
}
@@ -494,22 +625,13 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
struct rds_ib_mr *ibmr = trans_private;
struct rds_ib_device *rds_ibdev = ibmr->device;
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
- unsigned long flags;
- rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
+ rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->mapping.m_sg.len);
if (!pool)
return;
/* Return it to the pool's free list */
- spin_lock_irqsave(&pool->list_lock, flags);
- if (ibmr->remap_count >= pool->fmr_attr.max_maps) {
- list_add(&ibmr->list, &pool->drop_list);
- } else {
- list_add(&ibmr->list, &pool->free_list);
- }
- atomic_add(ibmr->sg_len, &pool->free_pinned);
- atomic_inc(&pool->dirty_count);
- spin_unlock_irqrestore(&pool->list_lock, flags);
+ pool->op->free(pool, ibmr);
/* If we've pinned too many pages, request a flush */
if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned
@@ -540,19 +662,21 @@ void rds_ib_flush_mrs(void)
}
void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
- __be32 ip_addr, u32 *key_ret)
+ struct rds_sock *rs, u32 *key_ret)
{
struct rds_ib_device *rds_ibdev;
+ struct rds_ib_mr_pool *pool;
struct rds_ib_mr *ibmr = NULL;
+ struct ib_qp *qp;
int ret;
- rds_ibdev = ib_get_device(ip_addr);
- if (!rds_ibdev) {
+ ret = ib_get_device(rs, &rds_ibdev, &qp);
+ if (ret || !qp) {
ret = -ENODEV;
goto out;
}
- if (!rds_ibdev->mr_pool) {
+ if (!(pool = rds_ibdev->mr_pool)) {
ret = -ENODEV;
goto out;
}
@@ -561,13 +685,14 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
if (IS_ERR(ibmr))
return ibmr;
- ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents);
+ ibmr->qp = qp;
+ ibmr->device = rds_ibdev;
+
+ ret = pool->op->map(pool, ibmr, sg, nents);
if (ret == 0)
- *key_ret = ibmr->fmr->rkey;
+ *key_ret = rds_ibdev->dev->node_type == RDMA_NODE_RNIC ? ibmr->fr_mr->rkey : ibmr->u.fmr->rkey;
else
- printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
-
- ibmr->device = rds_ibdev;
+ printk(KERN_WARNING "RDS/IB: failed to map mr (errno=%d)\n", ret);
out:
if (ret) {
@@ -577,3 +702,357 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
}
return ibmr;
}
+
+/*
+ * This is the code that implements RDS memory registrations
+ * through FMRs.
+ */
+static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr)
+{
+ struct rds_ib_device *rds_ibdev = pool->device;
+ struct ib_fmr *fmr;
+
+ fmr = ib_alloc_fmr(rds_ibdev->pd,
+ (IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_READ |
+ IB_ACCESS_REMOTE_WRITE),
+ &pool->fmr_attr);
+ if (IS_ERR(fmr)) {
+ int err = PTR_ERR(fmr);
+
+ printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
+ return err;
+ }
+
+ ibmr->u.fmr = fmr;
+ return 0;
+}
+
+static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
+ struct scatterlist *sg, unsigned int nents)
+{
+ struct rds_ib_device *rds_ibdev = pool->device;
+ struct rds_ib_scatterlist ibsg;
+ u64 *dma_pages;
+ int ret;
+
+ rds_ib_set_scatterlist(&ibsg, sg, nents);
+
+ dma_pages = rds_ib_map_scatterlist(rds_ibdev, &ibsg, rds_ibdev->fmr_page_shift);
+ if (IS_ERR(dma_pages))
+ return PTR_ERR(dma_pages);
+
+ ret = ib_map_phys_fmr(ibmr->u.fmr, dma_pages, ibsg.dma_npages, 0);
+ if (ret) {
+ rds_ib_drop_scatterlist(rds_ibdev, &ibsg);
+ goto out;
+ }
+
+ /* Success - we successfully remapped the MR, so we can
+ * safely tear down the old mapping. */
+ rds_ib_teardown_mr(ibmr);
+
+ ibmr->mapping.m_sg = ibsg;
+ ibmr->remap_count++;
+
+ rds_ib_stats_inc(s_ib_rdma_mr_used);
+ ret = 0;
+
+out:
+ kfree(dma_pages);
+
+ return ret;
+}
+
+static void rds_ib_free_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr)
+{
+ unsigned long flags;
+
+ /* MRs that have reached their maximum remap count get queued
+ * to the head of the list.
+ */
+ spin_lock_irqsave(&pool->list_lock, flags);
+ if (ibmr->remap_count >= pool->fmr_attr.max_maps) {
+ list_add(&ibmr->mapping.m_list, &pool->dirty_list);
+ } else {
+ list_add_tail(&ibmr->mapping.m_list, &pool->dirty_list);
+ }
+ atomic_add(ibmr->mapping.m_sg.len, &pool->free_pinned);
+ atomic_inc(&pool->dirty_count);
+ spin_unlock_irqrestore(&pool->list_lock, flags);
+}
+
+static unsigned int rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool,
+ struct list_head *unmap_list,
+ struct list_head *kill_list)
+{
+ struct rds_ib_mapping *mapping, *next;
+ struct rds_ib_mr *ibmr;
+ LIST_HEAD(fmr_list);
+ unsigned long unpinned = 0, ncleaned = 0;
+ int ret;
+
+ /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
+ list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
+ ibmr = mapping->m_mr;
+
+ list_add(&ibmr->u.fmr->list, &fmr_list);
+ }
+ ret = ib_unmap_fmr(&fmr_list);
+ if (ret)
+ printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+
+ /* Now we can destroy the DMA mapping and unpin any pages */
+ list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
+ ibmr = mapping->m_mr;
+
+ unpinned += rds_ib_drop_scatterlist(ibmr->device, &mapping->m_sg);
+ if (ibmr->remap_count >= pool->fmr_attr.max_maps)
+ list_move(&mapping->m_list, kill_list);
+ ncleaned++;
+ }
+
+ atomic_sub(unpinned, &pool->free_pinned);
+ return ncleaned;
+}
+
+static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr)
+{
+ if (ibmr->u.fmr)
+ ib_dealloc_fmr(ibmr->u.fmr);
+ ibmr->u.fmr = NULL;
+}
+
+/*
+ * iWARP fastreg handling
+ *
+ * The life cycle of a fastreg registration is a bit different from
+ * FMRs.
+ * The idea behind fastreg is to have one MR, to which we bind different
+ * mappings over time. To avoid stalling on the expensive map and invalidate
+ * operations, these operations are pipelined on the same send queue on
+ * which we want to send the message containing the r_key.
+ *
+ * This creates a bit of a problem for us, as we do not have the destination
+ * IP in GET_MR, so the connection must be setup prior to the GET_MR call for
+ * RDMA to be correctly setup. If a fastreg request is present, rds_ib_xmit
+ * will try to queue a LOCAL_INV (if needed) and a FAST_REG_MR work request
+ * before queuing the SEND. When completions for these arrive, they are
+ * dispatched to the MR has a bit set showing that RDMa can be performed.
+ *
+ * There is another interesting aspect that's related to invalidation.
+ * The application can request that a mapping is invalidated in FREE_MR.
+ * The expectation there is that this invalidation step includes ALL
+ * PREVIOUSLY FREED MRs.
+ */
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr)
+{
+ struct rds_ib_device *rds_ibdev = pool->device;
+ struct rds_ib_mapping *mapping = &ibmr->mapping;
+ struct ib_fast_reg_page_list *page_list = NULL;
+ struct ib_mr *mr;
+ int err;
+
+ mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->max_message_size);
+ if (IS_ERR(mr)) {
+ err = PTR_ERR(mr);
+
+ printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
+ return err;
+ }
+
+ page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, mapping->m_sg.dma_npages);
+ if (IS_ERR(page_list)) {
+ err = PTR_ERR(page_list);
+
+ printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_page_list failed (err=%d)\n", err);
+ ib_dereg_mr(mr);
+ return err;
+ }
+
+ ibmr->fr_page_list = page_list;
+ ibmr->fr_mr = mr;
+ return 0;
+}
+
+static int rds_ib_rdma_fastreg_inv(struct rds_ib_mr *ibmr)
+{
+ struct ib_send_wr i_wr, *failed_wr;
+ int ret;
+
+ memset(&i_wr, 0, sizeof(i_wr));
+ i_wr.opcode = IB_WR_LOCAL_INV;
+ i_wr.ex.invalidate_rkey = ibmr->fr_mr->rkey;
+ i_wr.send_flags = IB_SEND_SIGNALED;
+
+ failed_wr = &i_wr;
+ ret = ib_post_send(ibmr->qp, &i_wr, &failed_wr);
+ BUG_ON(failed_wr != &i_wr);
+ if (ret) {
+ printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
+ __func__, __LINE__, ret);
+ goto out;
+ }
+out:
+ return ret;
+}
+
+static int rds_ib_rdma_build_fastreg(struct ib_qp *qp, struct rds_ib_mapping *mapping)
+{
+ struct rds_ib_mr *ibmr = mapping->m_mr;
+ struct ib_send_wr f_wr, *failed_wr;
+ int ret;
+
+ /*
+ * Perform a WR for the fast_reg_mr. Each individual page
+ * in the sg list is added to the fast reg page list and placed
+ * inside the fast_reg_mr WR. The key used is a rolling 8bit
+ * counter, which should guarantee uniqueness.
+ */
+
+ ib_update_fast_reg_key(ibmr->fr_mr, ibmr->remap_count++);
+ mapping->m_rkey = ibmr->fr_mr->rkey;
+
+ memset(&f_wr, 0, sizeof(f_wr));
+ f_wr.opcode = IB_WR_FAST_REG_MR;
+ f_wr.wr.fast_reg.length = mapping->m_sg.bytes;
+ f_wr.wr.fast_reg.rkey = mapping->m_rkey;
+ f_wr.wr.fast_reg.page_list = ibmr->fr_page_list;
+ f_wr.wr.fast_reg.page_list_len = mapping->m_sg.dma_len;
+ f_wr.wr.fast_reg.page_shift = ibmr->device->fmr_page_shift;
+ f_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_READ |
+ IB_ACCESS_REMOTE_WRITE;
+ f_wr.wr.fast_reg.iova_start = 0;
+ f_wr.send_flags = IB_SEND_SIGNALED;
+
+ failed_wr = &f_wr;
+ ret = ib_post_send(qp, &f_wr, &failed_wr);
+ BUG_ON(failed_wr != &f_wr);
+ if (ret) {
+ printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
+ __func__, __LINE__, ret);
+ goto out;
+ }
+
+out:
+ return ret;
+}
+
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr,
+ struct scatterlist *sg,
+ unsigned int sg_len)
+{
+ struct rds_ib_device *rds_ibdev = pool->device;
+ struct rds_ib_mapping *mapping = &ibmr->mapping;
+ u64 *dma_pages;
+ int i, ret;
+
+ rds_ib_set_scatterlist(&mapping->m_sg, sg, sg_len);
+
+ dma_pages = rds_ib_map_scatterlist(rds_ibdev,
+ &mapping->m_sg,
+ rds_ibdev->fmr_page_shift);
+ if (IS_ERR(dma_pages)) {
+ ret = PTR_ERR(dma_pages);
+ dma_pages = NULL;
+ goto out;
+ }
+
+ if (mapping->m_sg.dma_len > pool->max_message_size) {
+ ret = -EMSGSIZE;
+ goto out;
+ }
+
+ for (i = 0; i < mapping->m_sg.dma_npages; ++i)
+ ibmr->fr_page_list->page_list[i] = dma_pages[i];
+
+ rds_ib_rdma_build_fastreg(ibmr->qp, mapping);
+
+ rds_ib_stats_inc(s_ib_rdma_mr_used);
+ ret = 0;
+
+out:
+ kfree(dma_pages);
+
+ return ret;
+}
+
+/*
+ * "Free" a fastreg MR.
+ */
+static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr)
+{
+ unsigned long flags;
+
+ if (!ibmr->mapping.m_sg.dma_len)
+ return;
+
+ rds_ib_rdma_fastreg_inv(ibmr);
+
+ /* Try to post the LOCAL_INV WR to the queue. */
+ spin_lock_irqsave(&pool->list_lock, flags);
+
+ list_add_tail(&ibmr->mapping.m_list, &pool->dirty_list);
+ atomic_add(ibmr->mapping.m_sg.len, &pool->free_pinned);
+ atomic_inc(&pool->dirty_count);
+
+ spin_unlock_irqrestore(&pool->list_lock, flags);
+}
+
+static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
+ struct list_head *unmap_list,
+ struct list_head *kill_list)
+{
+ struct rds_ib_mapping *mapping, *next;
+ unsigned int ncleaned = 0;
+ LIST_HEAD(laundered);
+
+ /* Batched invalidation of fastreg MRs.
+ * Why do we do it this way, even though we could pipeline unmap
+ * and remap? The reason is the application semantics - when the
+ * application requests an invalidation of MRs, it expects all
+ * previously released R_Keys to become invalid.
+ *
+ * If we implement MR reuse naively, we risk memory corruption
+ * (this has actually been observed). So the default behavior
+ * requires that a MR goes through an explicit unmap operation before
+ * we can reuse it again.
+ *
+ * We could probably improve on this a little, by allowing immediate
+ * reuse of a MR on the same socket (eg you could add small
+ * cache of unused MRs to strct rds_socket - GET_MR could grab one
+ * of these without requiring an explicit invalidate).
+ */
+ while (!list_empty(unmap_list)) {
+ unsigned long flags;
+
+ spin_lock_irqsave(&pool->list_lock, flags);
+ list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
+ list_move(&mapping->m_list, &laundered);
+ ncleaned++;
+ }
+ spin_unlock_irqrestore(&pool->list_lock, flags);
+ }
+
+ /* Move all laundered mappings back to the unmap list.
+ * We do not kill any WRs right now - it doesn't seem the
+ * fastreg API has a max_remap limit. */
+ list_splice_init(&laundered, unmap_list);
+
+ return ncleaned;
+}
+
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr)
+{
+ if (ibmr->u.fastreg.page_list)
+ ib_free_fast_reg_page_list(ibmr->u.fastreg.page_list);
+ if (ibmr->u.fastreg.mr)
+ ib_dereg_mr(ibmr->u.fastreg.mr);
+}
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index 1da664e..6738758 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -402,9 +402,10 @@ static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
unsigned long flags;
spin_lock_irqsave(&ic->i_ack_lock, flags);
- ic->i_ack_next = seq;
if (ack_required)
set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
+ if (seq > ic->i_ack_next)
+ ic->i_ack_next = seq;
spin_unlock_irqrestore(&ic->i_ack_lock, flags);
}
@@ -749,13 +750,10 @@ static void rds_ib_process_recv(struct rds_connection *conn,
if (ibinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
rds_ib_cong_recv(conn, ibinc);
- else {
+ else
rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
&ibinc->ii_inc, GFP_ATOMIC,
KM_SOFTIRQ0);
- state->ack_next = be64_to_cpu(hdr->h_sequence);
- state->ack_next_valid = 1;
- }
/* Evaluate the ACK_REQUIRED flag *after* we received
* the complete frame, and after bumping the next_rx
@@ -765,6 +763,8 @@ static void rds_ib_process_recv(struct rds_connection *conn,
state->ack_required = 1;
}
+ state->ack_next = be64_to_cpu(hdr->h_sequence);
+ state->ack_next_valid = 1;
rds_inc_put(&ibinc->ii_inc);
}
}
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 6317ce3..798af51 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -135,7 +135,9 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
send->s_rm = NULL;
send->s_op = NULL;
+ send->s_mapping = NULL;
+ send->s_wr.next = NULL;
send->s_wr.wr_id = i;
send->s_wr.sg_list = send->s_sge;
send->s_wr.num_sge = 1;
@@ -192,12 +194,22 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
}
- while (ib_poll_cq(cq, 1, &wc) > 0 ) {
+ while (ib_poll_cq(cq, 1, &wc) > 0) {
rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
(unsigned long long)wc.wr_id, wc.status, wc.byte_len,
be32_to_cpu(wc.ex.imm_data));
rds_ib_stats_inc(s_ib_tx_cq_event);
+ if (wc.opcode == IB_WC_LOCAL_INV) {
+ ic->fastreg_posted = 0;
+ continue;
+ }
+
+ if (wc.opcode == IB_WC_FAST_REG_MR) {
+ ic->fastreg_posted = 1;
+ continue;
+ }
+
if (wc.wr_id == RDS_IB_ACK_WR_ID) {
if (ic->i_ack_queued + HZ/2 < jiffies)
rds_ib_stats_inc(s_ib_tx_stalled);
@@ -475,6 +487,14 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
BUG_ON(off % RDS_FRAG_SIZE);
BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
+ /* Fastreg support */
+ if (rds_rdma_cookie_key(rm->m_rdma_cookie)
+ && ic->i_fastreg
+ && !ic->fastreg_posted) {
+ ret = -EAGAIN;
+ goto out;
+ }
+
/* FIXME we may overallocate here */
if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
i = 1;
diff --git a/net/rds/message.c b/net/rds/message.c
index 9269b9a..ddeb95b 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -71,6 +71,8 @@ static void rds_message_purge(struct rds_message *rm)
if (rm->m_rdma_op)
rds_rdma_free_op(rm->m_rdma_op);
+ if (rm->m_rdma_mr)
+ rds_mr_put(rm->m_rdma_mr);
}
void rds_message_inc_purge(struct rds_incoming *inc)
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index 1f1039e..9e12e87 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -116,11 +116,8 @@ static void rds_destroy_mr(struct rds_mr *mr)
mr->r_trans->free_mr(trans_private, mr->r_invalidate);
}
-static void rds_mr_put(struct rds_mr *mr)
+void __rds_put_mr_final(struct rds_mr *mr)
{
- if (!atomic_dec_and_test(&mr->r_refcount))
- return;
-
rds_destroy_mr(mr);
kfree(mr);
}
@@ -169,7 +166,7 @@ static int rds_pin_pages(unsigned long user_addr, unsigned int nr_pages,
}
static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
- u64 *cookie_ret)
+ u64 *cookie_ret, struct rds_mr **mr_ret)
{
struct rds_mr *mr = NULL, *found;
unsigned int nr_pages;
@@ -257,8 +254,7 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
* s/g list is now owned by the MR.
* Note that dma_map() implies that pending writes are
* flushed to RAM, so no dma_sync is needed here. */
- trans_private = rs->rs_transport->get_mr(sg, nents,
- rs->rs_bound_addr,
+ trans_private = rs->rs_transport->get_mr(sg, nents, rs,
&mr->r_key);
if (IS_ERR(trans_private)) {
@@ -297,6 +293,11 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
rdsdebug("RDS: get_mr key is %x\n", mr->r_key);
+ if (mr_ret) {
+ atomic_inc(&mr->r_refcount);
+ *mr_ret = mr;
+ }
+
ret = 0;
out:
if (pages)
@@ -317,7 +318,7 @@ int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen)
sizeof(struct rds_get_mr_args)))
return -EFAULT;
- return __rds_rdma_map(rs, &args, NULL);
+ return __rds_rdma_map(rs, &args, NULL, NULL);
}
/*
@@ -655,7 +656,7 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
if (mr) {
mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
- rds_mr_put(mr);
+ rm->m_rdma_mr = mr;
}
return err;
}
@@ -673,5 +674,5 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
|| rm->m_rdma_cookie != 0)
return -EINVAL;
- return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie);
+ return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->m_rdma_mr);
}
diff --git a/net/rds/rdma.h b/net/rds/rdma.h
index b1734a0..b6249e4 100644
--- a/net/rds/rdma.h
+++ b/net/rds/rdma.h
@@ -74,4 +74,11 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
void rds_rdma_free_op(struct rds_rdma_op *ro);
void rds_rdma_send_complete(struct rds_message *rm, int);
+extern void __rds_put_mr_final(struct rds_mr *mr);
+static inline void rds_mr_put(struct rds_mr *mr)
+{
+ if (atomic_dec_and_test(&mr->r_refcount))
+ __rds_put_mr_final(mr);
+}
+
#endif
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 235c951..fee481e 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -297,6 +297,7 @@ struct rds_message {
struct rds_sock *m_rs;
struct rds_rdma_op *m_rdma_op;
rds_rdma_cookie_t m_rdma_cookie;
+ struct rds_mr *m_rdma_mr;
unsigned int m_nents;
unsigned int m_count;
struct scatterlist m_sg[0];
@@ -373,7 +374,7 @@ struct rds_transport {
unsigned int avail);
void (*exit)(void);
void *(*get_mr)(struct scatterlist *sg, unsigned long nr_sg,
- __be32 ip_addr, u32 *key_ret);
+ struct rds_sock *rs, u32 *key_ret);
void (*sync_mr)(void *trans_private, int direction);
void (*free_mr)(void *trans_private, int invalidate);
void (*flush_mrs)(void);
diff --git a/net/rds/send.c b/net/rds/send.c
index b9d98c8..87237be 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -765,6 +765,9 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
if (cmsg->cmsg_level != SOL_RDS)
continue;
+ /* As a side effect, RDMA_DEST and RDMA_MAP will set
+ * rm->m_rdma_cookie and rm->m_rdma_mr.
+ */
switch (cmsg->cmsg_type) {
case RDS_CMSG_RDMA_ARGS:
ret = rds_cmsg_rdma_args(rs, rm, cmsg);
More information about the rds-devel
mailing list