[rds-devel] [RFC] rds: iWARP RDMA enablement

Jon Mason jon at opengridcomputing.com
Fri Oct 10 14:17:54 PDT 2008


Hey Andy,
This patch contains all of the changes needed to get rds-rdma working on
iWARP (with one FIXME left).  This patch will apply to a stock OFED-1.4
kernel, and includes the patch I sent out previously with changes to
enable RDMA READs.  While not complete, I wanted to sendout code for
review to help me diagnose any coding, design, or style errors.

The remaining FIXME in the code is allowing for multiple rds connections
(i.e., QPs) from the same host.

This patch contains a stress bug that I have yet to determine root
cause.  Running rds-stress, large RDMA payloads may cause memory
corruption.  This is most likely caused by running over the bounds of
one of the RDS rings under stress, as my rds-simple tests run without
problems for long runs.

Let me know what you think.

Thanks,
Jon

Signed-Off-By: Jon Mason <jon at opengridcomputing.com>

diff --git a/net/rds/ib.c b/net/rds/ib.c
index 926de1e..437ef2a 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -43,11 +43,17 @@
 
 unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
 unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
+unsigned int fastreg_pool_size = RDS_FASTREG_POOL_SIZE;
+unsigned int fastreg_message_size = RDS_FASTREG_SIZE + 1; /* +1 allows for unaligned MRs */
 
 module_param(fmr_pool_size, int, 0444);
 MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
 module_param(fmr_message_size, int, 0444);
 MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer");
+module_param(fastreg_pool_size, int, 0444);
+MODULE_PARM_DESC(fastreg_pool_size, " Max number of fastreg MRs per device");
+module_param(fastreg_message_size, int, 0444);
+MODULE_PARM_DESC(fastreg_message_size, " Max size of a RDMA transfer (fastreg MRs)");
 
 struct list_head rds_ib_devices;
 
@@ -113,13 +119,17 @@ void rds_ib_add_one(struct ib_device *device)
 	} else
 		rds_ibdev->mr = NULL;
 
+	/* Tell the RDMA code to use the fastreg API */
+	if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)
+		rds_ibdev->use_fastreg = 1;
+ 
 	rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
 	if (IS_ERR(rds_ibdev->mr_pool)) {
 		rds_ibdev->mr_pool = NULL;
 		goto err_mr;
 	}
 
-	INIT_LIST_HEAD(&rds_ibdev->ipaddr_list);
+	INIT_LIST_HEAD(&rds_ibdev->cm_id_list);
 	INIT_LIST_HEAD(&rds_ibdev->conn_list);
 	list_add_tail(&rds_ibdev->list, &rds_ib_devices);
 
@@ -128,7 +138,7 @@ void rds_ib_add_one(struct ib_device *device)
 	goto free_attr;
 
 err_mr:
-	if (!rds_ibdev->dma_local_lkey)
+	if (rds_ibdev->mr)
 		ib_dereg_mr(rds_ibdev->mr);
 err_pd:
 	ib_dealloc_pd(rds_ibdev->pd);
@@ -141,15 +151,15 @@ free_attr:
 void rds_ib_remove_one(struct ib_device *device)
 {
 	struct rds_ib_device *rds_ibdev;
-	struct rds_ib_ipaddr *i_ipaddr, *i_next;
+	struct rds_ib_cm_id *i_cm_id, *next;
 
 	rds_ibdev = ib_get_client_data(device, &rds_ib_client);
 	if (!rds_ibdev)
 		return;
 
-	list_for_each_entry_safe(i_ipaddr, i_next, &rds_ibdev->ipaddr_list, list) {
-		list_del(&i_ipaddr->list);
-		kfree(i_ipaddr);
+	list_for_each_entry_safe(i_cm_id, next, &rds_ibdev->cm_id_list, list) {
+		list_del(&i_cm_id->list);
+		kfree(i_cm_id);
 	}
 
 	rds_ib_remove_conns(rds_ibdev);
@@ -157,7 +167,7 @@ void rds_ib_remove_one(struct ib_device *device)
 	if (rds_ibdev->mr_pool)
 		rds_ib_destroy_mr_pool(rds_ibdev->mr_pool);
 
-	if (!rds_ibdev->dma_local_lkey)
+	if (rds_ibdev->mr)
 		ib_dereg_mr(rds_ibdev->mr);
 
 	while (ib_dealloc_pd(rds_ibdev->pd)) {
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 382c396..efba6fa 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -9,6 +9,8 @@
 
 #define RDS_FMR_SIZE			256
 #define RDS_FMR_POOL_SIZE		4096
+#define RDS_FASTREG_SIZE		20
+#define RDS_FASTREG_POOL_SIZE		2048
 
 #define RDS_IB_MAX_SGE			8
 #define RDS_IB_RECV_SGE 		2
@@ -49,9 +51,32 @@ struct rds_ib_connect_private {
 	__be32			dp_credit;		/* non-zero enables flow ctl */
 };
 
+struct rds_ib_scatterlist {
+	struct scatterlist	*list;
+	unsigned int		len;
+	int			dma_len;
+	unsigned int		dma_npages;
+	unsigned int		bytes;
+};
+
+struct rds_ib_mapping {
+	spinlock_t		m_lock;
+	struct list_head	m_list;
+	struct rds_ib_mr	*m_mr;
+	uint32_t		m_rkey;
+	struct rds_ib_scatterlist m_sg;
+};
+
 struct rds_ib_send_work {
 	struct rds_message	*s_rm;
+
+	/* We should really put these into a union: */
 	struct rds_rdma_op	*s_op;
+	struct rds_ib_mapping	*s_mapping;
+	struct ib_mr		*s_mr;
+	struct ib_fast_reg_page_list *s_page_list;
+	unsigned char		s_remap_count;
+
 	struct ib_send_wr	s_wr;
 	struct ib_sge		s_sge[RDS_IB_MAX_SGE];
 	unsigned long		s_queued;
@@ -126,8 +151,8 @@ struct rds_ib_connection {
 	unsigned int		i_flowctl : 1,	/* enable/disable flow ctl */
 				i_iwarp   : 1,	/* this is actually iWARP not IB */
 				i_fastreg : 1,	/* device supports fastreg */
-				i_dma_local_lkey : 1;
-
+				i_dma_local_lkey : 1,
+				i_fastreg_posted : 1; /* fastreg posted on this connection */
 	/* Batched completions */
 	unsigned int		i_unsignaled_wrs;
 	long			i_unsignaled_bytes;
@@ -139,9 +164,9 @@ struct rds_ib_connection {
 #define IB_SET_SEND_CREDITS(v)	((v) & 0xffff)
 #define IB_SET_POST_CREDITS(v)	((v) << 16)
 
-struct rds_ib_ipaddr {
+struct rds_ib_cm_id {
 	struct list_head	list;
-	__be32			ipaddr;
+	struct rdma_cm_id	*cm_id;
 };
 
 struct rds_ib_devconn {
@@ -151,7 +176,7 @@ struct rds_ib_devconn {
 
 struct rds_ib_device {
 	struct list_head	list;
-	struct list_head	ipaddr_list;
+	struct list_head	cm_id_list;
 	struct list_head	conn_list;
 	struct ib_device	*dev;
 	struct ib_pd		*pd;
@@ -253,6 +278,8 @@ extern struct ib_client rds_ib_client;
 
 extern unsigned int fmr_pool_size;
 extern unsigned int fmr_message_size;
+extern unsigned int fastreg_pool_size;
+extern unsigned int fastreg_message_size;
 
 /* ib_cm.c */
 int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp);
@@ -268,14 +295,14 @@ void __rds_ib_conn_error(struct rds_connection *conn, const char *, ...);
 	__rds_ib_conn_error(conn, KERN_WARNING "RDS/IB: " fmt )
 
 /* ib_rdma.c */
-int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
+int rds_ib_update_cm_id(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_id);
 int rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn);
 void rds_ib_remove_conns(struct rds_ib_device *rds_ibdev);
 struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
 void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_ib_connection *iinfo);
 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
-		    __be32 ip_addr, u32 *key_ret);
+		    struct rds_sock *rs, u32 *key_ret);
 void rds_ib_sync_mr(void *trans_private, int dir);
 void rds_ib_free_mr(void *trans_private, int invalidate);
 void rds_ib_flush_mrs(void);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 5b47d72..ffa9f39 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -32,7 +32,6 @@
  */
 #include <linux/kernel.h>
 #include <linux/in.h>
-#include <linux/vmalloc.h>
 
 #include "rds.h"
 #include "ib.h"
@@ -140,7 +139,7 @@ static void rds_ib_connect_complete(struct rds_connection *conn, struct rdma_cm_
 
 	/* update ib_device with this local ipaddr & conn */
 	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
-	err = rds_ib_update_ipaddr(rds_ibdev, conn->c_laddr);
+	err = rds_ib_update_cm_id(rds_ibdev, ic->i_cm_id);
 	if (err)
 		printk(KERN_ERR "rds_ib_update_ipaddr failed (%d)\n", err);
 	err = rds_ib_add_conn(rds_ibdev, conn);
@@ -210,8 +209,12 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
 		case IB_EVENT_COMM_EST:
 			rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
 			break;
+		case IB_EVENT_QP_REQ_ERR:
+			printk("Hit IB_EVENT_QP_REQ_ERR\n");
+			ic->i_cm_id->qp = NULL;
+			break;
 		default:
-			printk(KERN_WARNING "RDS/ib: unhandled QP event %u "
+			printk(KERN_WARNING "RDS/IB: unhandled QP event %u "
 			       "on connection to %u.%u.%u.%u\n", event->event,
 			       NIPQUAD(conn->c_faddr));
 			break;
@@ -219,6 +222,79 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
 }
 
 /*
+ * Create a QP
+ */
+static int rds_ib_init_qp_attrs(struct ib_qp_init_attr *attr,
+		struct rds_ib_device *rds_ibdev,
+		struct rds_ib_work_ring *send_ring,
+		void (*send_cq_handler)(struct ib_cq *, void *),
+		struct rds_ib_work_ring *recv_ring,
+		void (*recv_cq_handler)(struct ib_cq *, void *),
+		void *context)
+{
+	struct ib_device *dev = rds_ibdev->dev;
+	unsigned int send_size, recv_size;
+	int ret;
+
+	/* The offset of 1 is to accomodate the additional ACK WR. */
+	send_size = min_t(unsigned int, rds_ibdev->max_wrs, rds_ib_sysctl_max_send_wr + 1);
+	recv_size = min_t(unsigned int, rds_ibdev->max_wrs, rds_ib_sysctl_max_recv_wr + 1);
+	rds_ib_ring_resize(send_ring, send_size - 1);
+	rds_ib_ring_resize(recv_ring, recv_size - 1);
+
+	memset(attr, 0, sizeof(*attr));
+	attr->event_handler = rds_ib_qp_event_handler;
+	attr->qp_context = context;
+	attr->cap.max_send_wr = send_size;
+	attr->cap.max_recv_wr = recv_size;
+	attr->cap.max_send_sge = rds_ibdev->max_sge;
+	attr->cap.max_recv_sge = RDS_IB_RECV_SGE;
+	attr->sq_sig_type = IB_SIGNAL_REQ_WR;
+	attr->qp_type = IB_QPT_RC;
+
+	attr->send_cq = ib_create_cq(dev, send_cq_handler,
+				     rds_ib_cq_event_handler,
+				     context, send_size, 0);
+	if (IS_ERR(attr->send_cq)) {
+		ret = PTR_ERR(attr->send_cq);
+		attr->send_cq = NULL;
+		rdsdebug("ib_create_cq send failed: %d\n", ret);
+		goto out;
+	}
+
+	attr->recv_cq = ib_create_cq(dev, recv_cq_handler,
+				     rds_ib_cq_event_handler,
+				     context, recv_size, 0);
+	if (IS_ERR(attr->recv_cq)) {
+		ret = PTR_ERR(attr->recv_cq);
+		attr->recv_cq = NULL;
+		rdsdebug("ib_create_cq send failed: %d\n", ret);
+		goto out;
+	}
+
+	ret = ib_req_notify_cq(attr->send_cq, IB_CQ_NEXT_COMP);
+	if (ret) {
+		rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
+		goto out;
+	}
+
+	ret = ib_req_notify_cq(attr->recv_cq, IB_CQ_SOLICITED);
+	if (ret) {
+		rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
+		goto out;
+	}
+
+out:
+	if (ret) {
+		if (attr->send_cq)
+			ib_destroy_cq(attr->send_cq);
+		if (attr->recv_cq)
+			ib_destroy_cq(attr->recv_cq);
+	}
+	return ret;
+}
+
+/*
  * This needs to be very careful to not leave IS_ERR pointers around for
  * cleanup to trip over.
  */
@@ -243,60 +319,19 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
 		return -EOPNOTSUPP;
 	}
 
-	if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1)
-		rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1);
-	if (rds_ibdev->max_wrs < ic->i_recv_ring.w_nr + 1)
-		rds_ib_ring_resize(&ic->i_recv_ring, rds_ibdev->max_wrs - 1);
-
 	/* Protection domain and memory range */
 	ic->i_pd = rds_ibdev->pd;
 	ic->i_mr = rds_ibdev->mr;
 
-	ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
-				     rds_ib_cq_event_handler, conn,
-				     ic->i_send_ring.w_nr + 1, 0);
-	if (IS_ERR(ic->i_send_cq)) {
-		ret = PTR_ERR(ic->i_send_cq);
-		ic->i_send_cq = NULL;
-		rdsdebug("ib_create_cq send failed: %d\n", ret);
-		goto out;
-	}
-
-	ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler,
-				     rds_ib_cq_event_handler, conn,
-				     ic->i_recv_ring.w_nr, 0);
-	if (IS_ERR(ic->i_recv_cq)) {
-		ret = PTR_ERR(ic->i_recv_cq);
-		ic->i_recv_cq = NULL;
-		rdsdebug("ib_create_cq recv failed: %d\n", ret);
-		goto out;
-	}
-
-	ret = ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
-	if (ret) {
-		rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
+	ret = rds_ib_init_qp_attrs(&attr, rds_ibdev,
+			&ic->i_send_ring, rds_ib_send_cq_comp_handler,
+			&ic->i_recv_ring, rds_ib_recv_cq_comp_handler,
+			conn);
+	if (ret < 0)
 		goto out;
-	}
-
-	ret = ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
-	if (ret) {
-		rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
-		goto out;
-	}
 
-	/* XXX negotiate max send/recv with remote? */
-	memset(&attr, 0, sizeof(attr));
-	attr.event_handler = rds_ib_qp_event_handler;
-	attr.qp_context = conn;
-	/* + 1 to allow for the single ack message */
-	attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1;
-	attr.cap.max_recv_wr = ic->i_recv_ring.w_nr + 1;
-	attr.cap.max_send_sge = rds_ibdev->max_sge;
-	attr.cap.max_recv_sge = RDS_IB_RECV_SGE;
-	attr.sq_sig_type = IB_SIGNAL_REQ_WR;
-	attr.qp_type = IB_QPT_RC;
-	attr.send_cq = ic->i_send_cq;
-	attr.recv_cq = ic->i_recv_cq;
+	ic->i_send_cq = attr.send_cq;
+	ic->i_recv_cq = attr.recv_cq;
 
 	/* 
 	 * XXX this can fail if max_*_wr is too large?  Are we supposed
@@ -487,7 +522,7 @@ static int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
 
 	/* update ib_device with this local ipaddr & conn */
 	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
-	err = rds_ib_update_ipaddr(rds_ibdev, dp->dp_saddr);
+	err = rds_ib_update_cm_id(rds_ibdev, cm_id);
 	if (err) {
 		rds_ib_conn_error(conn, "rds_ib_update_ipaddr failed (%d)\n", err);
 		goto out;
@@ -853,7 +888,7 @@ int __init rds_ib_listen_init(void)
 	cm_id = rdma_create_id(rds_ib_cm_event_handler, NULL, RDMA_PS_TCP);
 	if (IS_ERR(cm_id)) {
 		ret = PTR_ERR(cm_id);
-		printk(KERN_ERR "RDS/ib: failed to setup listener, "
+		printk(KERN_ERR "RDS/IB: failed to setup listener, "
 		       "rdma_create_id() returned %d\n", ret);
 		goto out;
 	}
@@ -868,14 +903,14 @@ int __init rds_ib_listen_init(void)
 	 */
 	ret = rdma_bind_addr(cm_id, (struct sockaddr *)&sin);
 	if (ret) {
-		printk(KERN_ERR "RDS/ib: failed to setup listener, "
+		printk(KERN_ERR "RDS/IB: failed to setup listener, "
 		       "rdma_bind_addr() returned %d\n", ret);
 		goto out;
 	}
 
 	ret = rdma_listen(cm_id, 128);
 	if (ret) {
-		printk(KERN_ERR "RDS/ib: failed to setup listener, "
+		printk(KERN_ERR "RDS/IB: failed to setup listener, "
 		       "rdma_listen() returned %d\n", ret);
 		goto out;
 	}
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 89e293a..89d1b24 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -45,100 +45,203 @@ extern struct list_head rds_ib_devices;
 struct rds_ib_mr {
 	struct rds_ib_device	*device;
 	struct rds_ib_mr_pool	*pool;
-	struct ib_fmr		*fmr;
-	struct list_head	list;
-	unsigned int		remap_count;
-
-	struct scatterlist *	sg;
-	unsigned int		sg_len;
-	u64 *			dma;
-	int			sg_dma_len;
+
+	struct ib_qp *qp;
+
+	union {
+	    struct ib_fmr	*fmr;
+	    /* fastreg stuff and maybe others go here */
+	    struct {
+		struct ib_mr	*mr;
+		struct ib_fast_reg_page_list *page_list;
+	    } fastreg;
+	} u;
+	struct rds_ib_mapping	mapping;
+	unsigned char		remap_count;
 };
 
+#define fr_mr			u.fastreg.mr
+#define fr_page_list		u.fastreg.page_list
+
 /*
  * Our own little FMR pool
  */
 struct rds_ib_mr_pool {
+	struct rds_ib_device	*device;		/* back ptr to the device that owns us */
+
 	struct mutex		flush_lock;		/* serialize fmr invalidate */
 	struct work_struct	flush_worker;		/* flush worker */
 
 	spinlock_t		list_lock;		/* protect variables below */
 	atomic_t		item_count;		/* total # of MRs */
 	atomic_t		dirty_count;		/* # dirty of MRs */
-	struct list_head	drop_list;		/* MRs that have reached their max_maps limit */
-	struct list_head	free_list;		/* unused MRs */
+	struct list_head	dirty_list;		/* dirty mappings */
 	struct list_head	clean_list;		/* unused & unamapped MRs */
 	atomic_t		free_pinned;		/* memory pinned by free MRs */
+	unsigned long		max_message_size;	/* in pages */
 	unsigned long		max_items;
 	unsigned long		max_items_soft;
 	unsigned long		max_free_pinned;
 	struct ib_fmr_attr	fmr_attr;
+
+	struct rds_ib_mr_pool_ops *op;
+};
+
+struct rds_ib_mr_pool_ops {
+	int			(*init)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
+	int			(*map)(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
+					struct scatterlist *sg, unsigned int sg_len);
+	void			(*free)(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+	unsigned int		(*unmap)(struct rds_ib_mr_pool *, struct list_head *,
+					struct list_head *);
+	void			(*destroy)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
 };
 
+
 static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
 static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
 static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
+static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
+			  struct rds_ib_mr *ibmr,
+			  struct scatterlist *sg, unsigned int nents);
+static void rds_ib_free_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static unsigned int rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool,
+			struct list_head *unmap_list,
+			struct list_head *kill_list);
+static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+			  struct rds_ib_mr *ibmr,
+	       		  struct scatterlist *sg, unsigned int nents);
+static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
+			struct list_head *unmap_list,
+			struct list_head *kill_list);
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+
+static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
+	.init		= rds_ib_init_fmr,
+	.map		= rds_ib_map_fmr,
+	.free		= rds_ib_free_fmr,
+	.unmap		= rds_ib_unmap_fmr_list,
+	.destroy	= rds_ib_destroy_fmr,
+};
 
-static struct rds_ib_device* rds_ib_get_device(__be32 ipaddr)
-{
-	struct rds_ib_device *rds_ibdev;
-	struct rds_ib_ipaddr *i_ipaddr;
+static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
+	.init		= rds_ib_init_fastreg,
+	.map		= rds_ib_map_fastreg,
+	.free		= rds_ib_free_fastreg,
+	.unmap		= rds_ib_unmap_fastreg_list,
+	.destroy	= rds_ib_destroy_fastreg,
+};
 
-	list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
-		spin_lock_irq(&rds_ibdev->spinlock);
-		list_for_each_entry(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
-			if (i_ipaddr->ipaddr == ipaddr) {
-				spin_unlock_irq(&rds_ibdev->spinlock);
-				return rds_ibdev;
+static int rds_ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibdev, struct ib_qp **qp)
+{
+	struct rds_ib_device *ibdev;
+	struct rds_ib_cm_id *i_cm_id;
+
+	*rds_ibdev = NULL;
+	*qp = NULL;
+
+	list_for_each_entry(ibdev, &rds_ib_devices, list) {
+		spin_lock_irq(&ibdev->spinlock);
+		list_for_each_entry(i_cm_id, &ibdev->cm_id_list, list) {
+			struct sockaddr_in *src_addr, *dst_addr;
+
+			src_addr = (struct sockaddr_in *)&i_cm_id->cm_id->route.addr.src_addr;
+			dst_addr = (struct sockaddr_in *)&i_cm_id->cm_id->route.addr.dst_addr;
+
+			rdsdebug("%s: local ipaddr = %x port %d, remote ipaddr = %x port %d"
+				 "....looking for %x port %d, remote ipaddr = %x port %d\n",
+				 __func__,
+				src_addr->sin_addr.s_addr,
+				src_addr->sin_port,
+				dst_addr->sin_addr.s_addr,
+				dst_addr->sin_port,
+				rs->rs_bound_addr,
+				rs->rs_bound_port,
+				rs->rs_conn_addr,
+				rs->rs_conn_port);
+#if WORKING_TUPLE_DETECTION 
+			if (src_addr->sin_addr.s_addr == rs->rs_bound_addr &&
+			    src_addr->sin_port == rs->rs_bound_port &&
+			    dst_addr->sin_addr.s_addr == rs->rs_conn_addr && 
+			    dst_addr->sin_port == rs->rs_conn_port) {
+#else
+			/* FIXME - needs to compare the local and remote ipaddr/port tuple, but the
+			 * ipaddr is the only available infomation in the rds_sock (as the rest are
+			 * zero'ed.  It doesn't appear to be properly populated during connection
+			 * setup...
+			 */
+			if (src_addr->sin_addr.s_addr == rs->rs_bound_addr) {
+#endif
+				spin_unlock_irq(&ibdev->spinlock);
+				*rds_ibdev = ibdev;
+				*qp = i_cm_id->cm_id->qp;
+				return 0;
 			}
 		}
-		spin_unlock_irq(&rds_ibdev->spinlock);
+		spin_unlock_irq(&ibdev->spinlock);
 	}
 
-	return NULL;
+	return 1;
 }
 
-static int rds_ib_add_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
+static int rds_ib_add_cm_id(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_id)
 {
-	struct rds_ib_ipaddr *i_ipaddr;
+	struct rds_ib_cm_id *i_cm_id;
 
-	i_ipaddr = kmalloc(sizeof *i_ipaddr, GFP_KERNEL);
-	if (!i_ipaddr)
+	i_cm_id = kmalloc(sizeof *i_cm_id, GFP_KERNEL);
+	if (!i_cm_id)
 		return -ENOMEM;
 
-	i_ipaddr->ipaddr = ipaddr;
+	i_cm_id->cm_id = cm_id;
 
 	spin_lock_irq(&rds_ibdev->spinlock);
-	list_add_tail(&i_ipaddr->list, &rds_ibdev->ipaddr_list);
+	list_add_tail(&i_cm_id->list, &rds_ibdev->cm_id_list);
 	spin_unlock_irq(&rds_ibdev->spinlock);
 
 	return 0;
 }
 
-static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
+static void rds_ib_remove_cm_id(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_id)
 {
-	struct rds_ib_ipaddr *i_ipaddr, *next;
+	struct rds_ib_cm_id *i_cm_id;
 
 	spin_lock_irq(&rds_ibdev->spinlock);
-	list_for_each_entry_safe(i_ipaddr, next, &rds_ibdev->ipaddr_list, list) {
-		if (i_ipaddr->ipaddr == ipaddr) {
-			list_del(&i_ipaddr->list);
-			kfree(i_ipaddr);
+	list_for_each_entry(i_cm_id, &rds_ibdev->cm_id_list, list) {
+		if (i_cm_id->cm_id == cm_id) {
+                        list_del(&i_cm_id->list);
+                        kfree(i_cm_id);
 			break;
 		}
 	}
 	spin_unlock_irq(&rds_ibdev->spinlock);
 }
 
-int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
+
+int rds_ib_update_cm_id(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_id)
 {
-	struct rds_ib_device *rds_ibdev_old;
+	struct sockaddr_in *src_addr, *dst_addr;
+        struct rds_ib_device *rds_ibdev_old;
+	struct rds_sock rs;
+	struct ib_qp *qp;
+	int rc;
+
+	src_addr = (struct sockaddr_in *)&cm_id->route.addr.src_addr;
+	dst_addr = (struct sockaddr_in *)&cm_id->route.addr.dst_addr;
+
+	rs.rs_bound_addr = src_addr->sin_addr.s_addr;
+	rs.rs_bound_port = src_addr->sin_port;
+	rs.rs_conn_addr = dst_addr->sin_addr.s_addr;
+	rs.rs_conn_port = dst_addr->sin_port;
 
-	rds_ibdev_old = rds_ib_get_device(ipaddr);
-	if (rds_ibdev_old)
-		rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr);
+        rc = rds_ib_get_device(&rs, &rds_ibdev_old, &qp);
+	if (rc)
+		rds_ib_remove_cm_id(rds_ibdev, cm_id);
 
-	return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
+	return rds_ib_add_cm_id(rds_ibdev, cm_id);
 }
 
 int rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn)
@@ -172,26 +275,152 @@ void rds_ib_remove_conns(struct rds_ib_device *rds_ibdev)
 	spin_unlock_irq(&rds_ibdev->spinlock);
 }
 
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+static void rds_ib_set_scatterlist(struct rds_ib_scatterlist *sg,
+		struct scatterlist *list, unsigned int sg_len)
 {
-	struct rds_ib_mr_pool *pool;
+	sg->list = list;
+	sg->len = sg_len;
+	sg->dma_len = 0;
+	sg->dma_npages = 0;
+	sg->bytes = 0;
+}
+
+static int rds_ib_drop_scatterlist(struct rds_ib_device *rds_ibdev,
+		struct rds_ib_scatterlist *sg)
+{
+	int unpinned = 0;
+
+	if (sg->dma_len) {
+		ib_dma_unmap_sg(rds_ibdev->dev,
+				sg->list, sg->len,
+				DMA_BIDIRECTIONAL);
+		sg->dma_len = 0;
+	}
+
+	/* Release the s/g list */
+	if (sg->len) {
+		unsigned int i;
+
+		for (i = 0; i < sg->len; ++i) {
+			struct page *page = sg_page(&sg->list[i]);
+
+			/* FIXME we need a way to tell a r/w MR
+			 * from a r/o MR */
+			set_page_dirty(page);
+			put_page(page);
+		}
+
+		unpinned = sg->len;
+		sg->len = 0;
+
+		kfree(sg->list);
+		sg->list = NULL;
+	}
+
+	return unpinned;
+}
+
+static u64 *rds_ib_map_scatterlist(struct rds_ib_device *rds_ibdev,
+			struct rds_ib_scatterlist *sg,
+			unsigned int dma_page_shift)
+{
+	struct ib_device *dev = rds_ibdev->dev;
+	u64 *dma_pages = NULL;
+	u64 dma_mask;
+	unsigned int dma_page_size;
+	int i, j, ret;
 
-	/* For now, disable all RDMA service on iWARP. This check will
-	 * go away when we have a working patch. */
-	if (rds_ibdev->dev->node_type == RDMA_NODE_RNIC)
-		return NULL;
+	dma_page_size = 1 << dma_page_shift;
+	dma_mask = dma_page_size - 1;
+
+	WARN_ON(sg->dma_len);
+
+	sg->dma_len = ib_dma_map_sg(dev, sg->list, sg->len, DMA_BIDIRECTIONAL);
+	if (unlikely(!sg->dma_len)) {
+	        printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
+		return ERR_PTR(-EBUSY);
+	}
+
+	sg->bytes = 0;
+	sg->dma_npages = 0;
+
+	ret = -EINVAL;
+	for (i = 0; i < sg->dma_len; ++i) {
+		unsigned int dma_len = ib_sg_dma_len(dev, &sg->list[i]);
+		u64 dma_addr = ib_sg_dma_address(dev, &sg->list[i]);
+		u64 end_addr;
+
+		sg->bytes += dma_len;
+
+		end_addr = dma_addr + dma_len;
+		if (dma_addr & dma_mask) {
+			if (i > 0)
+				goto out_unmap;
+			dma_addr &= ~dma_mask;
+		}
+		if (end_addr & dma_mask) {
+			if (i < sg->dma_len - 1)
+				goto out_unmap;
+			end_addr = (end_addr + dma_mask) & ~dma_mask;
+		}
+
+		sg->dma_npages += (end_addr - dma_addr) >> dma_page_shift;
+	}
+
+	/* Now gather the dma addrs into one list */
+	if (sg->dma_npages > fmr_message_size)
+		goto out_unmap;
+
+	dma_pages = kmalloc(sizeof(u64) * sg->dma_npages, GFP_ATOMIC);
+	if (!dma_pages) {
+		ret = -ENOMEM;
+		goto out_unmap;
+	}
+
+	for (i = j = 0; i < sg->dma_len; ++i) {
+		unsigned int dma_len = ib_sg_dma_len(dev, &sg->list[i]);
+		u64 dma_addr = ib_sg_dma_address(dev, &sg->list[i]);
+		u64 end_addr;
+
+		end_addr = dma_addr + dma_len;
+		dma_addr &= ~dma_mask;
+		for (; dma_addr < end_addr; dma_addr += dma_page_size)
+			dma_pages[j++] = dma_addr;
+	}
+
+	return dma_pages;
+
+out_unmap:
+	ib_dma_unmap_sg(rds_ibdev->dev, sg->list, sg->len, DMA_BIDIRECTIONAL);
+	sg->dma_len = 0;
+	if (dma_pages)
+		kfree(dma_pages);
+	return ERR_PTR(ret);
+}
+
+
+static struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
+		unsigned int message_size, unsigned int pool_size,
+		struct rds_ib_mr_pool_ops *ops)
+{
+	struct rds_ib_mr_pool *pool;
 
 	pool = kzalloc(sizeof(*pool), GFP_KERNEL);
 	if (!pool)
 		return ERR_PTR(-ENOMEM);
 
-	INIT_LIST_HEAD(&pool->free_list);
-	INIT_LIST_HEAD(&pool->drop_list);
+	pool->op = ops;
+	pool->device = rds_ibdev;
+	INIT_LIST_HEAD(&pool->dirty_list);
 	INIT_LIST_HEAD(&pool->clean_list);
 	mutex_init(&pool->flush_lock);
 	spin_lock_init(&pool->list_lock);
 	INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
 
+	pool->max_message_size = message_size;
+	pool->max_items = pool_size;
+	pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
+
 	pool->fmr_attr.max_pages = fmr_message_size;
 	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
 	pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
@@ -202,8 +431,44 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
 	 * items more aggressively.
 	 * Make sure that max_items > max_items_soft > max_items / 2
 	 */
-	pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
-	pool->max_items = rds_ibdev->max_fmrs;
+	pool->max_items_soft = pool->max_items * 3 / 4;
+
+	return pool;
+}
+
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+{
+	struct rds_ib_mr_pool *pool;
+	unsigned int pool_size;
+
+	if (!rds_ibdev->use_fastreg) {
+		/* Use FMRs to implement memory registrations */
+		pool_size = fmr_pool_size;
+
+		if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
+			pool_size = rds_ibdev->max_fmrs;
+
+		pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
+					&rds_ib_fmr_pool_ops);
+
+		if (!IS_ERR(pool)) {
+			pool->fmr_attr.max_pages = pool->max_message_size;
+			pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
+			pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
+		}
+	} else {
+		/* Use fastregs to implement memory registrations */
+		pool_size = fastreg_pool_size;
+
+		pool = __rds_ib_create_mr_pool(rds_ibdev,
+					fastreg_message_size,
+					pool_size,
+					&rds_ib_fastreg_pool_ops);
+
+		if (IS_ERR(pool)) {
+			printk("__rds_ib_create_mr_pool error\n");
+		}
+	}
 
 	return pool;
 }
@@ -232,15 +497,15 @@ static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool)
 
 	spin_lock_irqsave(&pool->list_lock, flags);
 	if (!list_empty(&pool->clean_list)) {
-		ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, list);
-		list_del_init(&ibmr->list);
+		ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, mapping.m_list);
+		list_del_init(&ibmr->mapping.m_list);
 	}
 	spin_unlock_irqrestore(&pool->list_lock, flags);
 
 	return ibmr;
 }
 
-static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
+static struct rds_ib_mr *rds_ib_alloc_mr(struct rds_ib_device *rds_ibdev)
 {
 	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
 	struct rds_ib_mr *ibmr = NULL;
@@ -280,114 +545,26 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
 		goto out_no_cigar;
 	}
 
-	ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
-			(IB_ACCESS_LOCAL_WRITE |
-			 IB_ACCESS_REMOTE_READ |
-			 IB_ACCESS_REMOTE_WRITE),
-			&pool->fmr_attr);
-	if (IS_ERR(ibmr->fmr)) {
-		err = PTR_ERR(ibmr->fmr);
-		ibmr->fmr = NULL;
-		printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
+	spin_lock_init(&ibmr->mapping.m_lock);
+	INIT_LIST_HEAD(&ibmr->mapping.m_list);
+	ibmr->mapping.m_mr = ibmr;
+
+	err = pool->op->init(pool, ibmr);
+	if (err)
 		goto out_no_cigar;
-	}
 
 	rds_ib_stats_inc(s_ib_rdma_mr_alloc);
 	return ibmr;
 
 out_no_cigar:
 	if (ibmr) {
-		if (ibmr->fmr)
-			ib_dealloc_fmr(ibmr->fmr);
+		pool->op->destroy(pool, ibmr);
 		kfree(ibmr);
 	}
 	atomic_dec(&pool->item_count);
 	return ERR_PTR(err);
 }
 
-static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
-	       struct scatterlist *sg, unsigned int nents)
-{
-	struct ib_device *dev = rds_ibdev->dev;
-	struct scatterlist *scat = sg;
-	u64 io_addr = 0;
-	u64 *dma_pages;
-	u32 len;
-	int page_cnt, sg_dma_len;
-	int i, j;
-	int ret;
-
-	sg_dma_len = ib_dma_map_sg(dev, sg, nents,
-				 DMA_BIDIRECTIONAL);
-	if (unlikely(!sg_dma_len)) {
-	        printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
-		return -EBUSY;
-	}
-
-	len = 0;
-	page_cnt = 0;
-
-	for (i = 0; i < sg_dma_len; ++i) {
-		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
-		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
-	
-		if (dma_addr & ~rds_ibdev->fmr_page_mask) {
-			if (i > 0)
-				return -EINVAL;
-			else
-				++page_cnt;
-		}
-		if ((dma_addr + dma_len) & ~rds_ibdev->fmr_page_mask) {
-			if (i < sg_dma_len - 1)
-				return -EINVAL;
-			else
-				++page_cnt;
-		}
-
-		len += dma_len;
-	}
-
-	page_cnt += len >> rds_ibdev->fmr_page_shift;
-	if (page_cnt > fmr_message_size)
-		return -EINVAL;
-
-	dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC);
-	if (!dma_pages)
-		return -ENOMEM;
-
-	page_cnt = 0;
-	for (i = 0; i < sg_dma_len; ++i) {
-		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
-		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
-	
-		for (j = 0; j < dma_len; j += rds_ibdev->fmr_page_size)
-			dma_pages[page_cnt++] = 
-				(dma_addr & rds_ibdev->fmr_page_mask) + j;
-	}
-				
-	ret = ib_map_phys_fmr(ibmr->fmr,
-				   dma_pages, page_cnt, io_addr);	
-	if (ret)
-		goto out;
-
-	/* Success - we successfully remapped the MR, so we can
-	 * safely tear down the old mapping. */
-	rds_ib_teardown_mr(ibmr);
-
-	ibmr->sg = scat;
-	ibmr->sg_len = nents;
-	ibmr->sg_dma_len = sg_dma_len;
-	ibmr->remap_count++;
-
-	rds_ib_stats_inc(s_ib_rdma_mr_used);
-	ret = 0;
-
-out:
-	kfree(dma_pages);
-
-	return ret;
-}
-
 void rds_ib_sync_mr(void *trans_private, int direction)
 {
 	struct rds_ib_mr *ibmr = trans_private;
@@ -395,51 +572,21 @@ void rds_ib_sync_mr(void *trans_private, int direction)
 
 	switch (direction) {
 	case DMA_FROM_DEVICE:
-		ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,
-			ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+		ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->mapping.m_sg.list,
+			ibmr->mapping.m_sg.dma_len, DMA_BIDIRECTIONAL);
 		break;
 	case DMA_TO_DEVICE:
-		ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg,
-			ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+		ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->mapping.m_sg.list,
+			ibmr->mapping.m_sg.dma_len, DMA_BIDIRECTIONAL);
 		break;
 	}
 }
 
-static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
-{
-	struct rds_ib_device *rds_ibdev = ibmr->device;
-
-	if (ibmr->sg_dma_len) {
-		ib_dma_unmap_sg(rds_ibdev->dev,
-				ibmr->sg, ibmr->sg_len,
-				DMA_BIDIRECTIONAL);
-		ibmr->sg_dma_len = 0;
-	}
-
-	/* Release the s/g list */
-	if (ibmr->sg_len) {
-		unsigned int i;
-
-		for (i = 0; i < ibmr->sg_len; ++i) {
-			struct page *page = sg_page(&ibmr->sg[i]);
-
-			/* FIXME we need a way to tell a r/w MR
-			 * from a r/o MR */
-			set_page_dirty(page);
-			put_page(page);
-		}
-		kfree(ibmr->sg);
-
-		ibmr->sg = NULL;
-		ibmr->sg_len = 0;
-	}
-}
-
 void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 {
-	unsigned int pinned = ibmr->sg_len;
+	unsigned int pinned;
 
-	__rds_ib_teardown_mr(ibmr);
+	pinned = rds_ib_drop_scatterlist(ibmr->device, &ibmr->mapping.m_sg);
 	if (pinned) {
 		struct rds_ib_device *rds_ibdev = ibmr->device;
 		struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
@@ -472,8 +619,7 @@ int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
 {
 	struct rds_ib_mr *ibmr, *next;
 	LIST_HEAD(unmap_list);
-	LIST_HEAD(fmr_list);
-	unsigned long unpinned = 0;
+	LIST_HEAD(kill_list);
 	unsigned long flags;
 	unsigned int nfreed = 0, ncleaned = 0, free_goal;
 	int ret = 0;
@@ -483,49 +629,50 @@ int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
 	mutex_lock(&pool->flush_lock);
 
 	spin_lock_irqsave(&pool->list_lock, flags);
-	/* Get the list of all MRs to be dropped. Ordering matters -
-	 * we want to put drop_list ahead of free_list. */
-	list_splice_init(&pool->free_list, &unmap_list);
-	list_splice_init(&pool->drop_list, &unmap_list);
+	/* Get the list of all mappings to be destroyed */
+	list_splice_init(&pool->dirty_list, &unmap_list);
 	if (free_all)
-		list_splice_init(&pool->clean_list, &unmap_list);
+		list_splice_init(&pool->clean_list, &kill_list);
 	spin_unlock_irqrestore(&pool->list_lock, flags);
 
 	free_goal = rds_ib_flush_goal(pool, free_all);
 
-	if (list_empty(&unmap_list))
-		goto out;
-
-	/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
-	list_for_each_entry(ibmr, &unmap_list, list)
-		list_add(&ibmr->fmr->list, &fmr_list);
-	ret = ib_unmap_fmr(&fmr_list);
-	if (ret)
-		printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+	/* Batched invalidate of dirty MRs.
+	 * For FMR based MRs, the mappings on the unmap list are
+	 * actually members of an ibmr (ibmr->mapping). They either
+	 * migrate to the kill_list, or have been cleaned and should be
+	 * moved to the clean_list.
+	 * For fastregs, they will be dynamically allocated, and
+	 * will be destroyed by the unmap function.
+	 */
+	if (!list_empty(&unmap_list)) {
+		ncleaned = pool->op->unmap(pool, &unmap_list, &kill_list);
+		/* If we've been asked to destroy all MRs, move those
+		 * that were simply cleaned to the kill list */
+		if (free_all)
+			list_splice_init(&unmap_list, &kill_list);
+	}
 
-	/* Now we can destroy the DMA mapping and unpin any pages */
-	list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
-		unpinned += ibmr->sg_len;
-		__rds_ib_teardown_mr(ibmr);
-		if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
-			rds_ib_stats_inc(s_ib_rdma_mr_free);
-			list_del(&ibmr->list);
-			ib_dealloc_fmr(ibmr->fmr);
-			kfree(ibmr);
-			nfreed++;
-		}
-		ncleaned++;
+	/* Destroy any MRs that are past their best before date */
+	list_for_each_entry_safe(ibmr, next, &kill_list, mapping.m_list) {
+		rds_ib_stats_inc(s_ib_rdma_mr_free);
+		list_del(&ibmr->mapping.m_list);
+		pool->op->destroy(pool, ibmr);
+		kfree(ibmr);
+		nfreed++;
 	}
 
-	spin_lock_irqsave(&pool->list_lock, flags);
-	list_splice(&unmap_list, &pool->clean_list);
-	spin_unlock_irqrestore(&pool->list_lock, flags);
+	/* Anything that remains are laundered ibmrs, which we can add
+	 * back to the clean list. */
+	if (!list_empty(&unmap_list)) {
+		spin_lock_irqsave(&pool->list_lock, flags);
+		list_splice(&unmap_list, &pool->clean_list);
+		spin_unlock_irqrestore(&pool->list_lock, flags);
+	}
 
-	atomic_sub(unpinned, &pool->free_pinned);
 	atomic_sub(ncleaned, &pool->dirty_count);
 	atomic_sub(nfreed, &pool->item_count);
 
-out:
 	mutex_unlock(&pool->flush_lock);
 	return ret;
 }
@@ -540,24 +687,14 @@ void rds_ib_mr_pool_flush_worker(struct work_struct *work)
 void rds_ib_free_mr(void *trans_private, int invalidate)
 {
 	struct rds_ib_mr *ibmr = trans_private;
-	struct rds_ib_device *rds_ibdev = ibmr->device;
-	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
-	unsigned long flags;
+	struct rds_ib_mr_pool *pool = ibmr->device->mr_pool;
 
-	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
+	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->mapping.m_sg.len);
 	if (!pool)
 		return;
 
 	/* Return it to the pool's free list */
-	spin_lock_irqsave(&pool->list_lock, flags);
-	if (ibmr->remap_count >= pool->fmr_attr.max_maps) {
-		list_add(&ibmr->list, &pool->drop_list);
-	} else {
-		list_add(&ibmr->list, &pool->free_list);
-	}
-	atomic_add(ibmr->sg_len, &pool->free_pinned);
-	atomic_inc(&pool->dirty_count);
-	spin_unlock_irqrestore(&pool->list_lock, flags);
+	pool->op->free(pool, ibmr);
 
 	/* If we've pinned too many pages, request a flush */
 	if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned
@@ -588,36 +725,39 @@ void rds_ib_flush_mrs(void)
 }
 
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
-		    __be32 ip_addr, u32 *key_ret)
+		    struct rds_sock *rs, u32 *key_ret)
 {
 	struct rds_ib_device *rds_ibdev;
+	struct rds_ib_mr_pool *pool;
 	struct rds_ib_mr *ibmr = NULL;
+	struct ib_qp *qp;
 	int ret;
 
-	rds_ibdev = rds_ib_get_device(ip_addr);
-	if (!rds_ibdev) {
+	ret = rds_ib_get_device(rs, &rds_ibdev, &qp);
+	if (ret || !qp) {
 		ret = -ENODEV;
 		goto out;
 	}
 
-	if (!rds_ibdev->mr_pool) {
+	if (!(pool = rds_ibdev->mr_pool)) {
 		ret = -ENODEV;
 		goto out;
 	}
 
-	ibmr = rds_ib_alloc_fmr(rds_ibdev);
+	ibmr = rds_ib_alloc_mr(rds_ibdev);
 	if (IS_ERR(ibmr))
 		return ibmr;
 
-	ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents);
+	ibmr->qp = qp;
+	ibmr->device = rds_ibdev;
+
+	ret = pool->op->map(pool, ibmr, sg, nents);
 	if (ret == 0)
-		*key_ret = ibmr->fmr->rkey;
+		*key_ret = rds_ibdev->dev->node_type == RDMA_NODE_RNIC ? ibmr->fr_mr->rkey : ibmr->u.fmr->rkey;
 	else
-		printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
-
-	ibmr->device = rds_ibdev;
+		printk(KERN_WARNING "RDS/IB: failed to map mr (errno=%d)\n", ret);
 
- out:   
+out:   
 	if (ret) {
 	         if (ibmr) 
 		         rds_ib_free_mr(ibmr, 0);
@@ -625,3 +765,359 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 	}
 	return ibmr;
 }
+
+/*
+ * This is the code that implements RDS memory registrations
+ * through FMRs.
+ */
+static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool,
+			struct rds_ib_mr *ibmr)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct ib_fmr *fmr;
+
+	fmr = ib_alloc_fmr(rds_ibdev->pd,
+			(IB_ACCESS_LOCAL_WRITE |
+			 IB_ACCESS_REMOTE_READ |
+			 IB_ACCESS_REMOTE_WRITE),
+			&pool->fmr_attr);
+	if (IS_ERR(fmr)) {
+		int err = PTR_ERR(fmr);
+
+		printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
+		return err;
+	}
+
+	ibmr->u.fmr = fmr;
+	return 0;
+}
+
+static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
+	       struct scatterlist *sg, unsigned int nents)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct rds_ib_scatterlist ibsg;
+	u64 *dma_pages;
+	int ret;
+
+	rds_ib_set_scatterlist(&ibsg, sg, nents);
+
+	dma_pages = rds_ib_map_scatterlist(rds_ibdev, &ibsg, rds_ibdev->fmr_page_shift);
+	if (IS_ERR(dma_pages))
+		return PTR_ERR(dma_pages);
+
+	ret = ib_map_phys_fmr(ibmr->u.fmr, dma_pages, ibsg.dma_npages, 0);
+	if (ret) {
+		rds_ib_drop_scatterlist(rds_ibdev, &ibsg);
+		goto out;
+	}
+
+	/* Success - we successfully remapped the MR, so we can
+	 * safely tear down the old mapping. */
+	rds_ib_teardown_mr(ibmr);
+
+	ibmr->mapping.m_sg = ibsg;
+	ibmr->remap_count++;
+
+	rds_ib_stats_inc(s_ib_rdma_mr_used);
+	ret = 0;
+
+out:
+	kfree(dma_pages);
+
+	return ret;
+}
+
+static void rds_ib_free_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr)
+{
+	unsigned long flags;
+
+	/* MRs that have reached their maximum remap count get queued
+	 * to the head of the list.
+	 */
+	spin_lock_irqsave(&pool->list_lock, flags);
+	if (ibmr->remap_count >= pool->fmr_attr.max_maps) {
+		list_add(&ibmr->mapping.m_list, &pool->dirty_list);
+	} else {
+		list_add_tail(&ibmr->mapping.m_list, &pool->dirty_list);
+	}
+	atomic_add(ibmr->mapping.m_sg.len, &pool->free_pinned);
+	atomic_inc(&pool->dirty_count);
+	spin_unlock_irqrestore(&pool->list_lock, flags);
+}
+
+static unsigned int rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool,
+			       struct list_head *unmap_list,
+			       struct list_head *kill_list)
+{
+	struct rds_ib_mapping *mapping, *next;
+	struct rds_ib_mr *ibmr;
+	LIST_HEAD(fmr_list);
+	unsigned long unpinned = 0, ncleaned = 0;
+	int ret;
+
+	/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
+	list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
+		ibmr = mapping->m_mr;
+
+		list_add(&ibmr->u.fmr->list, &fmr_list);
+	}
+	ret = ib_unmap_fmr(&fmr_list);
+	if (ret)
+		printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+
+	/* Now we can destroy the DMA mapping and unpin any pages */
+	list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
+		ibmr = mapping->m_mr;
+
+		unpinned += rds_ib_drop_scatterlist(ibmr->device, &mapping->m_sg);
+		if (ibmr->remap_count >= pool->fmr_attr.max_maps)
+			list_move(&mapping->m_list, kill_list);
+		ncleaned++;
+	}
+
+	atomic_sub(unpinned, &pool->free_pinned);
+	return ncleaned;
+}
+
+static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool,
+			       struct rds_ib_mr *ibmr)
+{
+	if (ibmr->u.fmr)
+		ib_dealloc_fmr(ibmr->u.fmr);
+	ibmr->u.fmr = NULL;
+}
+
+/*
+ * iWARP fastreg handling
+ *
+ * The life cycle of a fastreg registration is a bit different from
+ * FMRs.
+ * The idea behind fastreg is to have one MR, to which we bind different
+ * mappings over time. To avoid stalling on the expensive map and invalidate
+ * operations, these operations are pipelined on the same send queue on
+ * which we want to send the message containing the r_key.
+ *
+ * This creates a bit of a problem for us, as we do not have the destination
+ * IP in GET_MR, so the connection must be setup prior to the GET_MR call for
+ * RDMA to be correctly setup.  If a fastreg request is present, rds_ib_xmit
+ * will try to queue a LOCAL_INV (if needed) and a FAST_REG_MR work request
+ * before queuing the SEND. When completions for these arrive, they are
+ * dispatched to the MR has a bit set showing that RDMa can be performed.
+ *
+ * There is another interesting aspect that's related to invalidation.
+ * The application can request that a mapping is invalidated in FREE_MR.
+ * The expectation there is that this invalidation step includes ALL
+ * PREVIOUSLY FREED MRs.
+ */
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
+				struct rds_ib_mr *ibmr)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct rds_ib_mapping *mapping = &ibmr->mapping;
+	struct ib_fast_reg_page_list *page_list = NULL;
+	struct ib_mr *mr;
+	int err;
+
+	mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->max_message_size);
+	if (IS_ERR(mr)) {
+		err = PTR_ERR(mr);
+
+		printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
+		return err;
+	}
+
+	page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, mapping->m_sg.dma_npages);
+	if (IS_ERR(page_list)) {
+		err = PTR_ERR(page_list);
+
+		printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_page_list failed (err=%d)\n", err);
+		ib_dereg_mr(mr);
+		return err;
+	}
+
+	ibmr->fr_page_list = page_list;
+	ibmr->fr_mr = mr;
+	return 0;
+}
+
+static int rds_ib_rdma_build_fastreg(struct ib_qp *qp, struct rds_ib_mapping *mapping)
+{
+	struct rds_ib_mr *ibmr = mapping->m_mr;
+	struct ib_send_wr f_wr, *failed_wr;
+	int ret;
+
+	/*
+	 * Perform a WR for the fast_reg_mr. Each individual page
+	 * in the sg list is added to the fast reg page list and placed
+	 * inside the fast_reg_mr WR.  The key used is a rolling 8bit
+	 * counter, which should guarantee uniqueness.
+	 */
+	ib_update_fast_reg_key(ibmr->fr_mr, ibmr->remap_count++);
+	mapping->m_rkey = ibmr->fr_mr->rkey;
+
+	memset(&f_wr, 0, sizeof(f_wr));
+	f_wr.opcode = IB_WR_FAST_REG_MR;
+	f_wr.wr.fast_reg.length = mapping->m_sg.bytes;
+	f_wr.wr.fast_reg.rkey = mapping->m_rkey;
+	f_wr.wr.fast_reg.page_list = ibmr->fr_page_list;
+	f_wr.wr.fast_reg.page_list_len = mapping->m_sg.dma_len;
+	f_wr.wr.fast_reg.page_shift = ibmr->device->fmr_page_shift;
+	f_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
+				IB_ACCESS_REMOTE_READ |
+				IB_ACCESS_REMOTE_WRITE;
+	f_wr.wr.fast_reg.iova_start = 0;
+	f_wr.send_flags = IB_SEND_SIGNALED;
+
+	failed_wr = &f_wr;
+	ret = ib_post_send(qp, &f_wr, &failed_wr);
+	BUG_ON(failed_wr != &f_wr);
+	if (ret) {
+		printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
+			__func__, __LINE__, ret);
+		goto out;
+	}
+
+out:
+	return ret;
+}
+
+int rds_ib_rdma_fastreg_inv(struct rds_ib_mr *ibmr)
+{
+	struct ib_send_wr s_wr, *failed_wr;
+	int ret;
+
+	if (!ibmr->qp || !ibmr->fr_mr)
+		goto out;
+
+	memset(&s_wr, 0, sizeof(s_wr));
+	s_wr.opcode = IB_WR_LOCAL_INV;
+	s_wr.ex.invalidate_rkey = ibmr->fr_mr->rkey;
+	s_wr.send_flags = IB_SEND_SIGNALED;
+
+	failed_wr = &s_wr;
+	ret = ib_post_send(ibmr->qp, &s_wr, &failed_wr);
+	if (ret) {
+		printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
+			__func__, __LINE__, ret);
+		goto out;
+	}
+out:
+	return ret;
+}
+
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+			struct rds_ib_mr *ibmr,
+	       		struct scatterlist *sg,
+			unsigned int sg_len)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct rds_ib_mapping *mapping = &ibmr->mapping;
+	u64 *dma_pages;
+	int i, ret;
+
+	rds_ib_set_scatterlist(&mapping->m_sg, sg, sg_len);
+
+	dma_pages = rds_ib_map_scatterlist(rds_ibdev,
+				&mapping->m_sg,
+				rds_ibdev->fmr_page_shift);
+	if (IS_ERR(dma_pages)) {
+		ret = PTR_ERR(dma_pages);
+		dma_pages = NULL;
+		goto out;
+	}
+
+	if (mapping->m_sg.dma_len > pool->max_message_size) {
+		printk("mapping->m_sg.dma_len > pool->max_message_size\n");
+		ret = -EMSGSIZE;
+		goto out;
+	}
+
+	for (i = 0; i < mapping->m_sg.dma_npages; ++i)
+		ibmr->fr_page_list->page_list[i] = dma_pages[i];
+
+	rds_ib_rdma_build_fastreg(ibmr->qp, mapping);
+
+	rds_ib_stats_inc(s_ib_rdma_mr_used);
+	ret = 0;
+
+out:
+	kfree(dma_pages);
+
+	return ret;
+}
+
+/*
+ * "Free" a fastreg MR.
+ */
+static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool,
+		struct rds_ib_mr *ibmr)
+{
+	unsigned long flags;
+
+	if (!ibmr->mapping.m_sg.dma_len)
+		return;
+
+	rds_ib_rdma_fastreg_inv(ibmr);
+
+	/* Try to post the LOCAL_INV WR to the queue. */
+	spin_lock_irqsave(&pool->list_lock, flags);
+
+	list_add_tail(&ibmr->mapping.m_list, &pool->dirty_list);
+	atomic_add(ibmr->mapping.m_sg.len, &pool->free_pinned);
+	atomic_inc(&pool->dirty_count);
+
+	spin_unlock_irqrestore(&pool->list_lock, flags);
+}
+
+static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
+				struct list_head *unmap_list,
+				struct list_head *kill_list)
+{
+	struct rds_ib_mapping *mapping, *next;
+	unsigned int ncleaned = 0;
+	LIST_HEAD(laundered);
+
+	/* Batched invalidation of fastreg MRs.
+	 * Why do we do it this way, even though we could pipeline unmap
+	 * and remap? The reason is the application semantics - when the
+	 * application requests an invalidation of MRs, it expects all
+	 * previously released R_Keys to become invalid.
+	 *
+	 * If we implement MR reuse naively, we risk memory corruption
+	 * (this has actually been observed). So the default behavior
+	 * requires that a MR goes through an explicit unmap operation before
+	 * we can reuse it again.
+	 *
+	 * We could probably improve on this a little, by allowing immediate
+	 * reuse of a MR on the same socket (eg you could add small
+	 * cache of unused MRs to strct rds_socket - GET_MR could grab one
+	 * of these without requiring an explicit invalidate).
+	 */
+	while (!list_empty(unmap_list)) {
+		unsigned long flags;
+
+		spin_lock_irqsave(&pool->list_lock, flags);
+		list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
+			list_move(&mapping->m_list, &laundered);
+			ncleaned++;
+		}
+		spin_unlock_irqrestore(&pool->list_lock, flags);
+	}
+
+	/* Move all laundered mappings back to the unmap list.
+	 * We do not kill any WRs right now - it doesn't seem the
+	 * fastreg API has a max_remap limit. */
+	list_splice_init(&laundered, unmap_list);
+
+	return ncleaned;
+}
+
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool,
+		struct rds_ib_mr *ibmr)
+{
+	if (ibmr->u.fastreg.page_list)
+		ib_free_fast_reg_page_list(ibmr->u.fastreg.page_list);
+	if (ibmr->u.fastreg.mr)
+		ib_dereg_mr(ibmr->u.fastreg.mr);
+}
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index 1da664e..2ca48d5 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -97,12 +97,12 @@ void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
 		sge = rds_ib_data_sge(ic, recv->r_sge);
 		sge->addr = 0;
 		sge->length = RDS_FRAG_SIZE;
-		sge->lkey = rds_ib_local_dma_lkey(ic);
+		sge->lkey = 0;
 
 		sge = rds_ib_header_sge(ic, recv->r_sge);
 		sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
 		sge->length = sizeof(struct rds_header);
-		sge->lkey = rds_ib_local_dma_lkey(ic);
+		sge->lkey = 0;
 	}
 }
 
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 9058aaf..ecefc04 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -135,7 +135,9 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
 
 		send->s_rm = NULL;
 		send->s_op = NULL;
+		send->s_mapping = NULL;
 
+		send->s_wr.next = NULL;
 		send->s_wr.wr_id = i;
 		send->s_wr.sg_list = send->s_sge;
 		send->s_wr.num_sge = 1;
@@ -144,12 +146,29 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
 		send->s_wr.ex.imm_data = 0;
 
 		sge = rds_ib_data_sge(ic, send->s_sge);
-		sge->lkey = rds_ib_local_dma_lkey(ic);
+		sge->lkey = 0;
 
 		sge = rds_ib_header_sge(ic, send->s_sge);
 		sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header));
 		sge->length = sizeof(struct rds_header);
-		sge->lkey = rds_ib_local_dma_lkey(ic);
+		sge->lkey = 0;
+
+		if (ic->i_iwarp) {
+			send->s_mr = ib_alloc_fast_reg_mr(ic->i_pd, fmr_message_size);
+			if (IS_ERR(send->s_mr)) {
+				printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_mr failed\n");
+				break;
+			}
+
+			send->s_page_list = ib_alloc_fast_reg_page_list(ic->i_cm_id->device, RDS_IB_MAX_SGE);
+			if (IS_ERR(send->s_page_list)) {
+				printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_page_list failed\n");
+				break;
+			}
+		} else {
+			send->s_mr = NULL;
+			send->s_page_list = NULL;
+		}
 	}
 }
 
@@ -165,6 +184,11 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
 			rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
 		if (send->s_op)
 			rds_ib_send_unmap_rdma(ic, send->s_op);
+		if (send->s_mr)
+			ib_dereg_mr(send->s_mr);
+		if (send->s_page_list)
+			ib_free_fast_reg_page_list(send->s_page_list);
+
 	}
 }
 
@@ -192,12 +216,27 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 		rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
 	}
 
-	while (ib_poll_cq(cq, 1, &wc) > 0 ) {
+	while (ib_poll_cq(cq, 1, &wc) > 0) {
 		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
 			 be32_to_cpu(wc.ex.imm_data));
 		rds_ib_stats_inc(s_ib_tx_cq_event);
 
+		if (wc.status != IB_WC_SUCCESS) {
+			printk("WC Error:  status = %d opcode = %d\n", wc.status, wc.opcode);
+			break;
+		}
+
+		if (wc.opcode == IB_WC_LOCAL_INV && wc.wr_id == 0) { 
+			ic->i_fastreg_posted = 0;
+			continue;
+		}
+
+		if (wc.opcode == IB_WC_FAST_REG_MR && wc.wr_id == 0) {
+			ic->i_fastreg_posted = 1;
+			continue;
+		}
+
 		if (wc.wr_id == RDS_IB_ACK_WR_ID) {
 			if (ic->i_ack_queued + HZ/2 < jiffies)
 				rds_ib_stats_inc(s_ib_tx_stalled);
@@ -218,8 +257,10 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 				if (send->s_rm)
 					rds_ib_send_unmap_rm(ic, send, wc.status);
 				break;
+			case IB_WR_FAST_REG_MR:
 			case IB_WR_RDMA_WRITE:
 			case IB_WR_RDMA_READ:
+			case IB_WR_RDMA_READ_WITH_INV:
 				/* Nothing to be done - the SG list will be unmapped
 				 * when the SEND completes. */
 				break;
@@ -475,6 +516,14 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 	BUG_ON(off % RDS_FRAG_SIZE);
 	BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
 
+	/* Fastreg support */
+	if (rds_rdma_cookie_key(rm->m_rdma_cookie)
+	 && ic->i_fastreg
+	 && !ic->i_fastreg_posted) {
+		ret = -EAGAIN;
+		goto out;
+	}
+
 	/* FIXME we may overallocate here */
 	if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
 		i = 1;
@@ -483,6 +532,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 
 	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
 	if (work_alloc == 0) {
+		printk("%s line %d: ENOMEM\n", __func__, __LINE__);
 		set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
 		rds_ib_stats_inc(s_ib_tx_ring_full);
 		ret = -ENOMEM;
@@ -702,6 +752,40 @@ out:
 	return ret;
 }
 
+static int rds_ib_build_send_fastreg(struct rds_ib_device *rds_ibdev, struct rds_ib_connection *ic, struct rds_ib_send_work *send, int nent, int len, u64 sg_addr)
+{
+	struct ib_send_wr *failed_wr;
+	int ret;
+
+	/*
+	 * Perform a WR for the fast_reg_mr. Each individual page
+	 * in the sg list is added to the fast reg page list and placed
+	 * inside the fast_reg_mr WR.
+	 */
+	send->s_wr.opcode = IB_WR_FAST_REG_MR;
+	send->s_wr.wr.fast_reg.length = len;
+	send->s_wr.wr.fast_reg.rkey = send->s_mr->rkey;
+	send->s_wr.wr.fast_reg.page_list = send->s_page_list;
+	send->s_wr.wr.fast_reg.page_list_len = nent;
+	send->s_wr.wr.fast_reg.page_shift = rds_ibdev->fmr_page_shift;
+	send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_REMOTE_WRITE; 
+	send->s_wr.wr.fast_reg.iova_start = sg_addr;
+
+	failed_wr = &send->s_wr;
+	ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
+	BUG_ON(failed_wr != &send->s_wr);
+	if (ret) {
+		printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
+			__func__, __LINE__, ret);
+		goto out;
+	}
+
+	ib_update_fast_reg_key(send->s_mr, send->s_remap_count++);
+
+out:
+	return ret;
+}
+
 int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
@@ -713,7 +797,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 	struct scatterlist *scat;
 	unsigned long len;
 	u64 remote_addr = op->r_remote_addr;
-	u32 pos;
+	u32 pos, fr_pos;
 	u32 work_alloc;
 	u32 i;
 	u32 j;
@@ -738,6 +822,18 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 		op->r_mapped = 1;
 	}
 
+	if (!op->r_write && ic->i_iwarp) {
+		/* Alloc space on the send queue for the fastreg */
+		work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &fr_pos);
+		if (work_alloc != 1) {
+			printk("%s line %d: ENOMEM\n", __func__, __LINE__);
+			rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+			rds_ib_stats_inc(s_ib_tx_ring_full);
+			ret = -ENOMEM;
+			goto out;
+		}
+	}
+
 	/*
 	 * Instead of knowing how to return a partial rdma read/write we insist that there
 	 * be enough work requests to send the entire message.
@@ -746,6 +842,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 
 	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
 	if (work_alloc != i) {
+		printk("%s line %d: ENOMEM\n", __func__, __LINE__);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
 		rds_ib_stats_inc(s_ib_tx_ring_full);
 		ret = -ENOMEM;
@@ -759,9 +856,10 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 	sent = 0;
 	num_sge = op->r_count;
 
-	for ( i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++ ) {
+	for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) {
 		send->s_wr.send_flags = 0;
 		send->s_queued = jiffies;
+
 		/*
 		 * We want to delay signaling completions just enough to get
 		 * the batching benefits but not so much that we create dead time on the wire.
@@ -771,7 +869,17 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 			send->s_wr.send_flags = IB_SEND_SIGNALED;
 		}
 
-		send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
+		/* To avoid the need to have the plumbing to invalidate the fastreg_mr used
+		 * for local access after RDS is finished with it, using
+		 * IB_WR_RDMA_READ_WITH_INV will invalidate it after the read has completed.
+		 */
+		if (op->r_write)
+			send->s_wr.opcode = IB_WR_RDMA_WRITE;
+		else if (ic->i_iwarp)
+			send->s_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
+		else
+			send->s_wr.opcode = IB_WR_RDMA_READ;
+
 		send->s_wr.wr.rdma.remote_addr = remote_addr;
 		send->s_wr.wr.rdma.rkey = op->r_key;
 		send->s_op = op;
@@ -779,8 +887,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 		if (num_sge > rds_ibdev->max_sge) {
 			send->s_wr.num_sge = rds_ibdev->max_sge;
 			num_sge -= rds_ibdev->max_sge;
-		}
-		else
+		} else
 			send->s_wr.num_sge = num_sge;
 
 		send->s_wr.next = NULL;
@@ -792,15 +899,25 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 			len = sg_dma_len(scat);
 			send->s_sge[j].addr = sg_dma_address(scat);
 			send->s_sge[j].length = len;
-			send->s_sge[j].lkey = rds_ib_local_dma_lkey(ic);
+
+			if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV)
+				send->s_page_list->page_list[j] = sg_dma_address(scat);
+			else
+				send->s_sge[j].lkey = rds_ib_local_dma_lkey(ic);
 
 			sent += len;
 			rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
-
 			remote_addr += sg_dma_len(scat);
+
 			scat++;
 		}
 
+		if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV) {
+			send->s_wr.num_sge = 1;
+			send->s_sge[0].addr = conn->c_xmit_rm->m_rs->rs_user_addr;
+			send->s_sge[0].lkey = ((struct rds_ib_send_work)ic->i_sends[fr_pos]).s_mr->lkey;
+		}
+
 		rdsdebug("send %p wr %p num_sge %u next %p\n", send,
 			&send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 
@@ -809,6 +926,15 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 			send = ic->i_sends;
 	}
 
+	/* On iWARP, local memory access by a remote system (ie, RDMA Read) is not
+	 * recommended.  Putting the lkey on the wire is a security hole, as it can
+	 * allow for memory access to all of memory on the remote system.  Some
+	 * adapters do not allow using the lkey for this at all.  To bypass this use a
+	 * fastreg_mr (or possibly a dma_mr)
+	 */
+	if (!op->r_write && ic->i_iwarp)
+		rds_ib_build_send_fastreg(rds_ibdev, ic, &ic->i_sends[fr_pos], op->r_count, sent, conn->c_xmit_rm->m_rs->rs_user_addr);
+
 	/* if we finished the message then send completion owns it */
 	if (scat == &op->r_sg[op->r_count]) {
 		prev->s_wr.send_flags = IB_SEND_SIGNALED;
@@ -831,12 +957,6 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 		goto out;
 	}
 
-	if (unlikely(failed_wr != &first->s_wr)) {
-		printk(KERN_WARNING "RDS/IB: ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
-		BUG_ON(failed_wr != &first->s_wr);
-	}
-
-
 out:
 	return ret;
 }
diff --git a/net/rds/message.c b/net/rds/message.c
index 9269b9a..ddeb95b 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -71,6 +71,8 @@ static void rds_message_purge(struct rds_message *rm)
 
 	if (rm->m_rdma_op)
 		rds_rdma_free_op(rm->m_rdma_op);
+	if (rm->m_rdma_mr)
+		rds_mr_put(rm->m_rdma_mr);
 }
 
 void rds_message_inc_purge(struct rds_incoming *inc)
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index 1f1039e..4d26246 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -116,11 +116,8 @@ static void rds_destroy_mr(struct rds_mr *mr)
 		mr->r_trans->free_mr(trans_private, mr->r_invalidate);
 }
 
-static void rds_mr_put(struct rds_mr *mr)
+void __rds_put_mr_final(struct rds_mr *mr)
 {
-	if (!atomic_dec_and_test(&mr->r_refcount))
-		return;
-
 	rds_destroy_mr(mr);
 	kfree(mr);
 }
@@ -169,7 +166,7 @@ static int rds_pin_pages(unsigned long user_addr, unsigned int nr_pages,
 }
 
 static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
-				u64 *cookie_ret)
+				u64 *cookie_ret, struct rds_mr **mr_ret)
 {
 	struct rds_mr *mr = NULL, *found;
 	unsigned int nr_pages;
@@ -257,8 +254,7 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 	 * s/g list is now owned by the MR.
 	 * Note that dma_map() implies that pending writes are
 	 * flushed to RAM, so no dma_sync is needed here. */
-	trans_private = rs->rs_transport->get_mr(sg, nents,
-						 rs->rs_bound_addr, 
+	trans_private = rs->rs_transport->get_mr(sg, nents, rs, 
 						 &mr->r_key);
 
 	if (IS_ERR(trans_private)) {
@@ -296,6 +292,10 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 	BUG_ON(found && found != mr);
 
 	rdsdebug("RDS: get_mr key is %x\n", mr->r_key);
+	if (mr_ret) {
+		atomic_inc(&mr->r_refcount);
+		*mr_ret = mr;
+	}
 
 	ret = 0;
 out:
@@ -317,7 +317,7 @@ int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen)
 			   sizeof(struct rds_get_mr_args)))
 		return -EFAULT;
 
-	return __rds_rdma_map(rs, &args, NULL);
+	return __rds_rdma_map(rs, &args, NULL, NULL);
 }
 
 /*
@@ -542,6 +542,8 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
 			goto out;
 		}
 
+		rs->rs_user_addr = vec.addr;
+
 		/* did the user change the vec under us? */
 		if (nr > max_pages || op->r_nents + nr > nr_pages) {
 			ret = -EINVAL;
@@ -655,7 +657,7 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
 
 	if (mr) {
 		mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
-		rds_mr_put(mr);
+		rm->m_rdma_mr = mr;
 	}
 	return err;
 }
@@ -673,5 +675,5 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
 	 || rm->m_rdma_cookie != 0)
 		return -EINVAL;
 
-	return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie);
+	return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->m_rdma_mr);
 }
diff --git a/net/rds/rdma.h b/net/rds/rdma.h
index b1734a0..4878db6 100644
--- a/net/rds/rdma.h
+++ b/net/rds/rdma.h
@@ -22,7 +22,7 @@ struct rds_mr {
 	 * bit field here, but we need to use test_and_set_bit.
 	 */
 	unsigned long		r_state;
-	struct rds_sock *	r_sock;		/* back pointer to the socket that owns us */
+	struct rds_sock		*r_sock; /* back pointer to the socket that owns us */
 	struct rds_transport	*r_trans;
 	void			*r_trans_private;
 };
@@ -74,4 +74,11 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
 void rds_rdma_free_op(struct rds_rdma_op *ro);
 void rds_rdma_send_complete(struct rds_message *rm, int);
 
+extern void __rds_put_mr_final(struct rds_mr *mr);
+static inline void rds_mr_put(struct rds_mr *mr)
+{
+	if (atomic_dec_and_test(&mr->r_refcount))
+		__rds_put_mr_final(mr);
+}
+
 #endif
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 235c951..68726ee 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -122,7 +122,7 @@ struct rds_connection {
 	__be32			c_laddr;
 	__be32			c_faddr;
 	unsigned int		c_loopback : 1;
-	struct rds_connection *	c_passive;
+	struct rds_connection	*c_passive;
 
 	struct rds_cong_map	*c_lcong;
 	struct rds_cong_map	*c_fcong;
@@ -297,6 +297,7 @@ struct rds_message {
 	struct rds_sock		*m_rs;
 	struct rds_rdma_op	*m_rdma_op;
 	rds_rdma_cookie_t	m_rdma_cookie;
+	struct rds_mr		*m_rdma_mr;
 	unsigned int		m_nents;
 	unsigned int		m_count;
 	struct scatterlist	m_sg[0];
@@ -373,7 +374,7 @@ struct rds_transport {
 					unsigned int avail);
 	void (*exit)(void);
 	void *(*get_mr)(struct scatterlist *sg, unsigned long nr_sg,
-			__be32 ip_addr, u32 *key_ret);
+			struct rds_sock *rs, u32 *key_ret);
 	void (*sync_mr)(void *trans_private, int direction);
 	void (*free_mr)(void *trans_private, int invalidate);
 	void (*flush_mrs)(void);
@@ -387,6 +388,7 @@ struct rds_sock {
 	struct sock		*rs_sk;
 #endif
 
+	u64			rs_user_addr;
 	/*
 	 * bound_addr used for both incoming and outgoing, no INADDR_ANY
 	 * support.
diff --git a/net/rds/send.c b/net/rds/send.c
index 20d3e52..406ff64 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -772,6 +772,9 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
 		if (cmsg->cmsg_level != SOL_RDS)
 			continue;
 
+		/* As a side effect, RDMA_DEST and RDMA_MAP will set
+		 * rm->m_rdma_cookie and rm->m_rdma_mr.
+		 */
 		switch (cmsg->cmsg_type) {
 		case RDS_CMSG_RDMA_ARGS:
 			ret = rds_cmsg_rdma_args(rs, rm, cmsg);



More information about the rds-devel mailing list