[rds-devel] RDS iWARP enablement

Jon Mason jon at opengridcomputing.com
Tue Sep 16 09:31:04 PDT 2008


Hey Andy,
Per our conversation last week regarding your code migration from
net/rds to drivers/infiniband, I am sending you a quick tree dump of
what I currently have.

This patch contains all of the changes needed to get rds-rdma working on
iWARP (with a few FIXMEs left).  This patch will apply to a stock
OFED-1.4 kernel and doesn't require any of the experimental patches that
Olaf had me on earlier.  So, this should be much easier to
read/understand.  Some of the changes in here are from Olaf's
experimental patches (giving credit where it is due).  If there are any
questions, feel free to ask.  

There are 2 major FIXMEs in the code, which I will be aiming to squash
sometime this week.  They are removing dma_mrs for iWARP devices and
replacing them with a new fastreg_mr infrastructure, and allowing for
multiple rds connections (i.e., QPs) from the same host.

Let me know what you think.

Thanks,
Jon

diff --git a/net/rds/ib.c b/net/rds/ib.c
index c7fdc82..d5b20f9 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -42,11 +42,17 @@
 
 unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
 unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
+unsigned int fastreg_pool_size = RDS_FASTREG_POOL_SIZE;
+unsigned int fastreg_message_size = RDS_FASTREG_SIZE + 1; /* +1 allows for unaligned MRs */
 
 module_param(fmr_pool_size, int, 0444);
 MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
 module_param(fmr_message_size, int, 0444);
 MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer");
+module_param(fastreg_pool_size, int, 0444);
+MODULE_PARM_DESC(fastreg_pool_size, " Max number of fastreg MRs per device");
+module_param(fastreg_message_size, int, 0444);
+MODULE_PARM_DESC(fastreg_message_size, " Max size of a RDMA transfer (fastreg MRs)");
 
 struct list_head rds_ib_devices;
 
@@ -79,7 +85,14 @@ void rds_ib_add_one(struct ib_device *device)
 
 	spin_lock_init(&rds_ibdev->spinlock);
 
+#if IWARP_DMA_MR_REPLACEMENT
 	rds_ibdev->dma_local_lkey = !!(dev_attr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY);
+#else
+	/* FIXME - dmr_mr's do not work on iWARP.  They need to be wholy replaced with
+	 * fastreg_mr's.  Until that task has been completed, simply disable dma_mr's
+	 */
+	rds_ibdev->dma_local_lkey = 0;
+#endif
 	rds_ibdev->max_wrs = dev_attr->max_qp_wr;
 	rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
 
@@ -87,9 +100,7 @@ void rds_ib_add_one(struct ib_device *device)
 	rds_ibdev->fmr_page_size  = 1 << rds_ibdev->fmr_page_shift;
 	rds_ibdev->fmr_page_mask  = ~((u64) rds_ibdev->fmr_page_size - 1);
 	rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
-	rds_ibdev->max_fmrs = dev_attr->max_fmr?
-			min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
-			fmr_pool_size;
+	rds_ibdev->max_fmrs = dev_attr->max_fmr;
 
 	rds_ibdev->dev = device;
 	rds_ibdev->pd = ib_alloc_pd(device);
@@ -112,13 +123,17 @@ void rds_ib_add_one(struct ib_device *device)
 	} else
 		rds_ibdev->mr = NULL;
 
+	/* Tell the RDMA code to use the fastreg API */
+	if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)
+		rds_ibdev->use_fastreg = 1;
+
 	rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
 	if (IS_ERR(rds_ibdev->mr_pool)) {
 		rds_ibdev->mr_pool = NULL;
 		goto err_mr;
 	}
 
-	INIT_LIST_HEAD(&rds_ibdev->ipaddr_list);
+	INIT_LIST_HEAD(&rds_ibdev->cm_id_list);
 	list_add_tail(&rds_ibdev->list, &rds_ib_devices);
 
 	ib_set_client_data(device, &rds_ib_client, rds_ibdev);
@@ -139,21 +154,23 @@ free_attr:
 void rds_ib_remove_one(struct ib_device *device)
 {
 	struct rds_ib_device *rds_ibdev;
-	struct rds_ib_ipaddr *i_ipaddr, *next;
+	struct rds_ib_cm_id *i_cm_id, *next;
 
 	rds_ibdev = ib_get_client_data(device, &rds_ib_client);
 	if (!rds_ibdev)
 		return;
 
-	list_for_each_entry_safe(i_ipaddr, next, &rds_ibdev->ipaddr_list, list) {
-		list_del(&i_ipaddr->list);
-		kfree(i_ipaddr);
+	list_for_each_entry_safe(i_cm_id, next, &rds_ibdev->cm_id_list, list) {
+		list_del(&i_cm_id->list);
+		kfree(i_cm_id);
 	}
 
 	if (rds_ibdev->mr_pool)
 		rds_ib_destroy_mr_pool(rds_ibdev->mr_pool);
 
-	ib_dereg_mr(rds_ibdev->mr);
+	if (rds_ibdev->mr)
+		ib_dereg_mr(rds_ibdev->mr);
+
 	ib_dealloc_pd(rds_ibdev->pd);
 	
 	list_del(&rds_ibdev->list);
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 26c23cb..564bf39 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -8,7 +8,9 @@
 #define RDS_IB_RESOLVE_TIMEOUT_MS	5000
 
 #define RDS_FMR_SIZE			256
-#define RDS_FMR_POOL_SIZE		4096
+#define RDS_FMR_POOL_SIZE		2048
+#define RDS_FASTREG_SIZE		20
+#define RDS_FASTREG_POOL_SIZE		2048
 
 #define RDS_IB_MAX_SGE			8
 #define RDS_IB_RECV_SGE 		2
@@ -49,9 +51,29 @@ struct rds_ib_connect_private {
 	__be32			dp_credit;		/* non-zero enables flow ctl */
 };
 
+struct rds_ib_scatterlist {
+	struct scatterlist *	list;
+	unsigned int		len;
+	int			dma_len;
+	unsigned int		dma_npages;
+	unsigned int		bytes;
+};
+
+struct rds_ib_mapping {
+	spinlock_t		m_lock;
+	struct list_head	m_list;
+	struct rds_ib_mr *	m_mr;
+	uint32_t		m_rkey;
+	struct rds_ib_scatterlist m_sg;
+};
+
 struct rds_ib_send_work {
 	struct rds_message	*s_rm;
+
+	/* We should really put these into a union: */
 	struct rds_rdma_op	*s_op;
+	struct rds_ib_mapping	*s_mapping;
+
 	struct ib_send_wr	s_wr;
 	struct ib_sge		s_sge[RDS_IB_MAX_SGE];
 	unsigned long		s_queued;
@@ -127,7 +149,7 @@ struct rds_ib_connection {
 				i_iwarp   : 1,	/* this is actually iWARP not IB */
 				i_fastreg : 1,	/* device supports fastreg */
 				i_dma_local_lkey : 1;
-
+	unsigned int		fastreg_posted;
 	/* Batched completions */
 	unsigned int		i_unsignaled_wrs;
 	long			i_unsignaled_bytes;
@@ -139,14 +161,14 @@ struct rds_ib_connection {
 #define IB_SET_SEND_CREDITS(v)	((v) & 0xffff)
 #define IB_SET_POST_CREDITS(v)	((v) << 16)
 
-struct rds_ib_ipaddr {
+struct rds_ib_cm_id {
 	struct list_head	list;
-	__be32			ipaddr;
+	struct rdma_cm_id	*cm_id;
 };
 
 struct rds_ib_device {
 	struct list_head	list;
-	struct list_head	ipaddr_list;
+	struct list_head	cm_id_list;
 	struct ib_device	*dev;
 	struct ib_pd		*pd;
 	struct ib_mr		*mr;
@@ -247,6 +269,8 @@ extern struct ib_client rds_ib_client;
 
 extern unsigned int fmr_pool_size;
 extern unsigned int fmr_message_size;
+extern unsigned int fastreg_pool_size;
+extern unsigned int fastreg_message_size;
 
 /* ib_cm.c */
 int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp);
@@ -262,12 +286,12 @@ void __rds_ib_conn_error(struct rds_connection *conn, const char *, ...);
 	__rds_ib_conn_error(conn, KERN_WARNING "RDS/IB: " fmt )
 
 /* ib_rdma.c */
-int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
+int ib_update_cm_id_for_device(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_id);
 struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
 void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_ib_connection *iinfo);
 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
-		    __be32 ip_addr, u32 *key_ret);
+		    struct rds_sock *rs, u32 *key_ret);
 void rds_ib_sync_mr(void *trans_private, int dir);
 void rds_ib_free_mr(void *trans_private, int invalidate);
 void rds_ib_flush_mrs(void);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 33d2b08..c08dff8 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -32,7 +32,6 @@
  */
 #include <linux/kernel.h>
 #include <linux/in.h>
-#include <linux/vmalloc.h>
 
 #include "rds.h"
 #include "ib.h"
@@ -140,7 +139,7 @@ static void rds_ib_connect_complete(struct rds_connection *conn, struct rdma_cm_
 
 	/* update ib_device with this local ipaddr */
 	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
-	ib_update_ipaddr_for_device(rds_ibdev, conn->c_laddr);
+	ib_update_cm_id_for_device(rds_ibdev, ic->i_cm_id);
 
 	/* If the peer gave us the last packet it saw, process this as if
 	 * we had received a regular ACK. */
@@ -206,7 +205,7 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
 			rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
 			break;
 		default:
-			printk(KERN_WARNING "RDS/ib: unhandled QP event %u "
+			printk(KERN_WARNING "RDS/IB: unhandled QP event %u "
 			       "on connection to %u.%u.%u.%u\n", event->event,
 			       NIPQUAD(conn->c_faddr));
 			break;
@@ -214,6 +213,79 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
 }
 
 /*
+ * Create a QP
+ */
+static int rds_ib_init_qp_attrs(struct ib_qp_init_attr *attr,
+		struct rds_ib_device *rds_ibdev,
+		struct rds_ib_work_ring *send_ring,
+		void (*send_cq_handler)(struct ib_cq *, void *),
+		struct rds_ib_work_ring *recv_ring,
+		void (*recv_cq_handler)(struct ib_cq *, void *),
+		void *context)
+{
+	struct ib_device *dev = rds_ibdev->dev;
+	unsigned int send_size, recv_size;
+	int ret;
+
+	/* The offset of 1 is to accomodate the additional ACK WR. */
+	send_size = min_t(unsigned int, rds_ibdev->max_wrs, rds_ib_sysctl_max_send_wr + 1);
+	recv_size = min_t(unsigned int, rds_ibdev->max_wrs, rds_ib_sysctl_max_recv_wr + 1);
+	rds_ib_ring_resize(send_ring, send_size - 1);
+	rds_ib_ring_resize(recv_ring, recv_size - 1);
+
+	memset(attr, 0, sizeof(*attr));
+	attr->event_handler = rds_ib_qp_event_handler;
+	attr->qp_context = context;
+	attr->cap.max_send_wr = send_size;
+	attr->cap.max_recv_wr = recv_size;
+	attr->cap.max_send_sge = rds_ibdev->max_sge;
+	attr->cap.max_recv_sge = RDS_IB_RECV_SGE;
+	attr->sq_sig_type = IB_SIGNAL_REQ_WR;
+	attr->qp_type = IB_QPT_RC;
+
+	attr->send_cq = ib_create_cq(dev, send_cq_handler,
+				     rds_ib_cq_event_handler,
+				     context, send_size, 0);
+	if (IS_ERR(attr->send_cq)) {
+		ret = PTR_ERR(attr->send_cq);
+		attr->send_cq = NULL;
+		rdsdebug("ib_create_cq send failed: %d\n", ret);
+		goto out;
+	}
+
+	attr->recv_cq = ib_create_cq(dev, recv_cq_handler,
+				     rds_ib_cq_event_handler,
+				     context, recv_size, 0);
+	if (IS_ERR(attr->recv_cq)) {
+		ret = PTR_ERR(attr->recv_cq);
+		attr->recv_cq = NULL;
+		rdsdebug("ib_create_cq send failed: %d\n", ret);
+		goto out;
+	}
+
+	ret = ib_req_notify_cq(attr->send_cq, IB_CQ_NEXT_COMP);
+	if (ret) {
+		rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
+		goto out;
+	}
+
+	ret = ib_req_notify_cq(attr->recv_cq, IB_CQ_SOLICITED);
+	if (ret) {
+		rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
+		goto out;
+	}
+
+out:
+	if (ret) {
+		if (attr->send_cq)
+			ib_destroy_cq(attr->send_cq);
+		if (attr->recv_cq)
+			ib_destroy_cq(attr->recv_cq);
+	}
+	return ret;
+}
+
+/*
  * This needs to be very careful to not leave IS_ERR pointers around for
  * cleanup to trip over.
  */
@@ -238,60 +310,19 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
 		return -EOPNOTSUPP;
 	}
 
-	if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1)
-		rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1);
-	if (rds_ibdev->max_wrs < ic->i_recv_ring.w_nr + 1)
-		rds_ib_ring_resize(&ic->i_recv_ring, rds_ibdev->max_wrs - 1);
-
 	/* Protection domain and memory range */
 	ic->i_pd = rds_ibdev->pd;
 	ic->i_mr = rds_ibdev->mr;
 
-	ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
-				     rds_ib_cq_event_handler, conn,
-				     ic->i_send_ring.w_nr + 1, 0);
-	if (IS_ERR(ic->i_send_cq)) {
-		ret = PTR_ERR(ic->i_send_cq);
-		ic->i_send_cq = NULL;
-		rdsdebug("ib_create_cq send failed: %d\n", ret);
-		goto out;
-	}
-
-	ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler,
-				     rds_ib_cq_event_handler, conn,
-				     ic->i_recv_ring.w_nr, 0);
-	if (IS_ERR(ic->i_recv_cq)) {
-		ret = PTR_ERR(ic->i_recv_cq);
-		ic->i_recv_cq = NULL;
-		rdsdebug("ib_create_cq recv failed: %d\n", ret);
-		goto out;
-	}
-
-	ret = ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
-	if (ret) {
-		rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
-		goto out;
-	}
-
-	ret = ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
-	if (ret) {
-		rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
+	ret = rds_ib_init_qp_attrs(&attr, rds_ibdev,
+			&ic->i_send_ring, rds_ib_send_cq_comp_handler,
+			&ic->i_recv_ring, rds_ib_recv_cq_comp_handler,
+			conn);
+	if (ret < 0)
 		goto out;
-	}
 
-	/* XXX negotiate max send/recv with remote? */
-	memset(&attr, 0, sizeof(attr));
-	attr.event_handler = rds_ib_qp_event_handler;
-	attr.qp_context = conn;
-	/* + 1 to allow for the single ack message */
-	attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1;
-	attr.cap.max_recv_wr = ic->i_recv_ring.w_nr + 1;
-	attr.cap.max_send_sge = rds_ibdev->max_sge;
-	attr.cap.max_recv_sge = RDS_IB_RECV_SGE;
-	attr.sq_sig_type = IB_SIGNAL_REQ_WR;
-	attr.qp_type = IB_QPT_RC;
-	attr.send_cq = ic->i_send_cq;
-	attr.recv_cq = ic->i_recv_cq;
+	ic->i_send_cq = attr.send_cq;
+	ic->i_recv_cq = attr.recv_cq;
 
 	/* 
 	 * XXX this can fail if max_*_wr is too large?  Are we supposed
@@ -437,8 +468,7 @@ static int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
 	 */
 	if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
 		if (rds_conn_state(conn) == RDS_CONN_UP) {
-			rdsdebug("incoming connect while connecting\n");
-			rds_conn_drop(conn);
+			rds_ib_conn_error(conn, "incoming connect while connecting\n");
 			rds_ib_stats_inc(s_ib_listen_closed_stale);
 		} else
 		if (rds_conn_state(conn) == RDS_CONN_CONNECTING) {
@@ -480,8 +510,7 @@ static int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
  		goto out;
  	}
 
-	/* update ib_device with this local ipaddr */
-	ib_update_ipaddr_for_device(rds_ibdev, dp->dp_saddr);
+	ib_update_cm_id_for_device(rds_ibdev, cm_id);
 
  	return 0;
 
@@ -582,7 +611,6 @@ static int rds_ib_cm_event_handler(struct rdma_cm_id *cm_id,
 		case RDMA_CM_EVENT_UNREACHABLE:
 		case RDMA_CM_EVENT_REJECTED:
 		case RDMA_CM_EVENT_DEVICE_REMOVAL:
-		case RDMA_CM_EVENT_ADDR_CHANGE:
 			if (conn)
 				rds_conn_drop(conn);
 			break;
@@ -693,7 +721,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
 			/* Actually this may happen quite frequently, when
 			 * an outgoing connect raced with an incoming connect.
 			 */
-			rdsdebug("rds_ib_conn_shutdown: failed to disconnect,"
+			printk(KERN_DEBUG "rds_ib_conn_shutdown: failed to disconnect,"
 				   " cm: %p err %d\n", ic->i_cm_id, err);
 		}
 
@@ -835,7 +863,7 @@ int __init rds_ib_listen_init(void)
 	cm_id = rdma_create_id(rds_ib_cm_event_handler, NULL, RDMA_PS_TCP);
 	if (IS_ERR(cm_id)) {
 		ret = PTR_ERR(cm_id);
-		printk(KERN_ERR "RDS/ib: failed to setup listener, "
+		printk(KERN_ERR "RDS/IB: failed to setup listener, "
 		       "rdma_create_id() returned %d\n", ret);
 		goto out;
 	}
@@ -850,14 +878,14 @@ int __init rds_ib_listen_init(void)
 	 */
 	ret = rdma_bind_addr(cm_id, (struct sockaddr *)&sin);
 	if (ret) {
-		printk(KERN_ERR "RDS/ib: failed to setup listener, "
+		printk(KERN_ERR "RDS/IB: failed to setup listener, "
 		       "rdma_bind_addr() returned %d\n", ret);
 		goto out;
 	}
 
 	ret = rdma_listen(cm_id, 128);
 	if (ret) {
-		printk(KERN_ERR "RDS/ib: failed to setup listener, "
+		printk(KERN_ERR "RDS/IB: failed to setup listener, "
 		       "rdma_listen() returned %d\n", ret);
 		goto out;
 	}
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 5a0fba3..de732b5 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -45,105 +45,318 @@ extern struct list_head rds_ib_devices;
 struct rds_ib_mr {
 	struct rds_ib_device	*device;
 	struct rds_ib_mr_pool	*pool;
-	struct ib_fmr		*fmr;
-	struct list_head	list;
-	unsigned int		remap_count;
 
-	struct scatterlist *	sg;
-	unsigned int		sg_len;
-	u64 *			dma;
-	int			sg_dma_len;
+	struct ib_qp *qp;
+
+	union {
+	    struct ib_fmr	*fmr;
+	    /* fastreg stuff and maybe others go here */
+	    struct {
+		struct ib_mr	*mr;
+		struct ib_fast_reg_page_list *page_list;
+	    } fastreg;
+	} u;
+	struct rds_ib_mapping	mapping;
+	unsigned int		remap_count;
 };
 
+#define fr_mr			u.fastreg.mr
+#define fr_page_list		u.fastreg.page_list
+
 /*
  * Our own little FMR pool
  */
 struct rds_ib_mr_pool {
+	struct rds_ib_device	*device;		/* back ptr to the device that owns us */
+
 	struct mutex		flush_lock;		/* serialize fmr invalidate */
 	struct work_struct	flush_worker;		/* flush worker */
 
 	spinlock_t		list_lock;		/* protect variables below */
 	atomic_t		item_count;		/* total # of MRs */
 	atomic_t		dirty_count;		/* # dirty of MRs */
-	struct list_head	drop_list;		/* MRs that have reached their max_maps limit */
-	struct list_head	free_list;		/* unused MRs */
+	struct list_head	dirty_list;		/* dirty mappings */
 	struct list_head	clean_list;		/* unused & unamapped MRs */
 	atomic_t		free_pinned;		/* memory pinned by free MRs */
+	unsigned long		max_message_size;	/* in pages */
 	unsigned long		max_items;
 	unsigned long		max_items_soft;
 	unsigned long		max_free_pinned;
 	struct ib_fmr_attr	fmr_attr;
+
+	struct rds_ib_mr_pool_ops *op;
 };
 
+struct rds_ib_mr_pool_ops {
+	int			(*init)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
+	int			(*map)(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
+					struct scatterlist *sg, unsigned int sg_len);
+	void			(*free)(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+	unsigned int		(*unmap)(struct rds_ib_mr_pool *, struct list_head *,
+					struct list_head *);
+	void			(*destroy)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
+};
+
+
 static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
 static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
 static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
+static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
+			  struct rds_ib_mr *ibmr,
+			  struct scatterlist *sg, unsigned int nents);
+static void rds_ib_free_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static unsigned int rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool,
+			struct list_head *unmap_list,
+			struct list_head *kill_list);
+static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+			  struct rds_ib_mr *ibmr,
+	       		  struct scatterlist *sg, unsigned int nents);
+static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
+			struct list_head *unmap_list,
+			struct list_head *kill_list);
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+
+static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
+	.init		= rds_ib_init_fmr,
+	.map		= rds_ib_map_fmr,
+	.free		= rds_ib_free_fmr,
+	.unmap		= rds_ib_unmap_fmr_list,
+	.destroy	= rds_ib_destroy_fmr,
+};
+
+static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
+	.init		= rds_ib_init_fastreg,
+	.map		= rds_ib_map_fastreg,
+	.free		= rds_ib_free_fastreg,
+	.unmap		= rds_ib_unmap_fastreg_list,
+	.destroy	= rds_ib_destroy_fastreg,
+};
 
-int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
+int ib_update_cm_id_for_device(struct rds_ib_device *rds_ibdev, struct rdma_cm_id *cm_id)
 {
-	struct rds_ib_ipaddr *i_ipaddr;
+	struct rds_ib_cm_id *i_cm_id;
 
 	spin_lock_irq(&rds_ibdev->spinlock);
-	list_for_each_entry(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
-		if (i_ipaddr->ipaddr == ipaddr) {
+	list_for_each_entry(i_cm_id, &rds_ibdev->cm_id_list, list) {
+		if (i_cm_id->cm_id == cm_id) {
 			spin_unlock_irq(&rds_ibdev->spinlock);
 			return 0;
 		}
 	}
 	spin_unlock_irq(&rds_ibdev->spinlock);
 
-	i_ipaddr = kmalloc(sizeof *i_ipaddr, GFP_KERNEL);
-	if (!i_ipaddr)
+	i_cm_id = kmalloc(sizeof *i_cm_id, GFP_KERNEL);
+	if (!i_cm_id)
 		return -ENOMEM;
 
-	i_ipaddr->ipaddr = ipaddr;
+	i_cm_id->cm_id = cm_id;
 
 	spin_lock_irq(&rds_ibdev->spinlock);
-	list_add_tail(&i_ipaddr->list, &rds_ibdev->ipaddr_list);
+	list_add_tail(&i_cm_id->list, &rds_ibdev->cm_id_list);
 	spin_unlock_irq(&rds_ibdev->spinlock);
 
 	return 0;
 }
 
-struct rds_ib_device* ib_get_device(__be32 ipaddr)
+static int ib_get_device(struct rds_sock *rs, struct rds_ib_device **rds_ibdev, struct ib_qp **qp)
 {
-	struct rds_ib_device *rds_ibdev;
-	struct rds_ib_ipaddr *i_ipaddr;
-
-	list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
-		spin_lock_irq(&rds_ibdev->spinlock);
-		list_for_each_entry(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
-			if (i_ipaddr->ipaddr == ipaddr) {
-				spin_unlock_irq(&rds_ibdev->spinlock);
-				return rds_ibdev;
+	struct rds_ib_device *ibdev;
+	struct rds_ib_cm_id *i_cm_id;
+
+	list_for_each_entry(ibdev, &rds_ib_devices, list) {
+		spin_lock_irq(&ibdev->spinlock);
+		list_for_each_entry(i_cm_id, &ibdev->cm_id_list, list) {
+			struct sockaddr_in *src_addr, *dst_addr;
+
+			src_addr = (struct sockaddr_in *)&i_cm_id->cm_id->route.addr.src_addr;
+			dst_addr = (struct sockaddr_in *)&i_cm_id->cm_id->route.addr.dst_addr;
+
+			rdsdebug("%s: local ipaddr = %x port %d, remote ipaddr = %x port %d"
+				 "....looking for %x port %d, remote ipaddr = %x port %d\n",
+				 __func__,
+				src_addr->sin_addr.s_addr,
+				src_addr->sin_port,
+				dst_addr->sin_addr.s_addr,
+				dst_addr->sin_port,
+				rs->rs_bound_addr,
+				rs->rs_bound_port,
+				rs->rs_conn_addr,
+				rs->rs_conn_port);
+#if 0
+			if (src_addr->sin_addr.s_addr == rs->rs_bound_addr &&
+			    src_addr->sin_port == rs->rs_bound_port &&
+			    dst_addr->sin_addr.s_addr == rs->rs_conn_addr && 
+			    dst_addr->sin_port == rs->rs_conn_port) {
+#else
+			/* FIXME - needs to compare the local and remote ipaddr/port tuple, but the
+			 * ipaddr is the only available infomation in the rds_sock (as the rest are
+			 * zero'ed.  It doesn't appear to be properly populated during connection
+			 * setup...
+			 */
+			if (src_addr->sin_addr.s_addr == rs->rs_bound_addr) {
+#endif
+				spin_unlock_irq(&ibdev->spinlock);
+				*rds_ibdev = ibdev;
+				*qp = i_cm_id->cm_id->qp;
+				return 0;
 			}
 		}
-		spin_unlock_irq(&rds_ibdev->spinlock);
+		spin_unlock_irq(&ibdev->spinlock);
 	}
 
-	return NULL;
+	return 1;
 }
 
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+static void rds_ib_set_scatterlist(struct rds_ib_scatterlist *sg,
+		struct scatterlist *list, unsigned int sg_len)
 {
-	struct rds_ib_mr_pool *pool;
+	sg->list = list;
+	sg->len = sg_len;
+	sg->dma_len = 0;
+	sg->dma_npages = 0;
+	sg->bytes = 0;
+}
+
+static int rds_ib_drop_scatterlist(struct rds_ib_device *rds_ibdev,
+		struct rds_ib_scatterlist *sg)
+{
+	int unpinned = 0;
+
+	if (sg->dma_len) {
+		ib_dma_unmap_sg(rds_ibdev->dev,
+				sg->list, sg->len,
+				DMA_BIDIRECTIONAL);
+		sg->dma_len = 0;
+	}
+
+	/* Release the s/g list */
+	if (sg->len) {
+		unsigned int i;
+
+		for (i = 0; i < sg->len; ++i) {
+			struct page *page = sg_page(&sg->list[i]);
+
+			/* FIXME we need a way to tell a r/w MR
+			 * from a r/o MR */
+			set_page_dirty(page);
+			put_page(page);
+		}
+
+		unpinned = sg->len;
+		sg->len = 0;
+
+		kfree(sg->list);
+		sg->list = NULL;
+	}
+
+	return unpinned;
+}
+
+static u64 *rds_ib_map_scatterlist(struct rds_ib_device *rds_ibdev,
+			struct rds_ib_scatterlist *sg,
+			unsigned int dma_page_shift)
+{
+	struct ib_device *dev = rds_ibdev->dev;
+	u64 *dma_pages = NULL;
+	u64 dma_mask;
+	unsigned int dma_page_size;
+	int i, j, ret;
 
-	/* For now, disable all RDMA service on iWARP. This check will
-	 * go away when we have a working patch. */
-	if (rds_ibdev->dev->node_type == RDMA_NODE_RNIC)
-		return NULL;
+	dma_page_size = 1 << dma_page_shift;
+	dma_mask = dma_page_size - 1;
+
+	WARN_ON(sg->dma_len);
+
+	sg->dma_len = ib_dma_map_sg(dev, sg->list, sg->len, DMA_BIDIRECTIONAL);
+	if (unlikely(!sg->dma_len)) {
+	        printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
+		return ERR_PTR(-EBUSY);
+	}
+
+	sg->bytes = 0;
+	sg->dma_npages = 0;
+
+	ret = -EINVAL;
+	for (i = 0; i < sg->dma_len; ++i) {
+		unsigned int dma_len = ib_sg_dma_len(dev, &sg->list[i]);
+		u64 dma_addr = ib_sg_dma_address(dev, &sg->list[i]);
+		u64 end_addr;
+
+		sg->bytes += dma_len;
+
+		end_addr = dma_addr + dma_len;
+		if (dma_addr & dma_mask) {
+			if (i > 0)
+				goto out_unmap;
+			dma_addr &= ~dma_mask;
+		}
+		if (end_addr & dma_mask) {
+			if (i < sg->dma_len - 1)
+				goto out_unmap;
+			end_addr = (end_addr + dma_mask) & ~dma_mask;
+		}
+
+		sg->dma_npages += (end_addr - dma_addr) >> dma_page_shift;
+	}
+
+	/* Now gather the dma addrs into one list */
+	if (sg->dma_npages > fmr_message_size)
+		goto out_unmap;
+
+	dma_pages = kmalloc(sizeof(u64) * sg->dma_npages, GFP_ATOMIC);
+	if (!dma_pages) {
+		ret = -ENOMEM;
+		goto out_unmap;
+	}
+
+	for (i = j = 0; i < sg->dma_len; ++i) {
+		unsigned int dma_len = ib_sg_dma_len(dev, &sg->list[i]);
+		u64 dma_addr = ib_sg_dma_address(dev, &sg->list[i]);
+		u64 end_addr;
+
+		end_addr = dma_addr + dma_len;
+		dma_addr &= ~dma_mask;
+		for (; dma_addr < end_addr; dma_addr += dma_page_size)
+			dma_pages[j++] = dma_addr;
+	}
+
+	return dma_pages;
+
+out_unmap:
+	ib_dma_unmap_sg(rds_ibdev->dev, sg->list, sg->len, DMA_BIDIRECTIONAL);
+	sg->dma_len = 0;
+	if (dma_pages)
+		kfree(dma_pages);
+	return ERR_PTR(ret);
+}
+
+
+static struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
+		unsigned int message_size, unsigned int pool_size,
+		struct rds_ib_mr_pool_ops *ops)
+{
+	struct rds_ib_mr_pool *pool;
 
 	pool = kzalloc(sizeof(*pool), GFP_KERNEL);
 	if (!pool)
 		return ERR_PTR(-ENOMEM);
 
-	INIT_LIST_HEAD(&pool->free_list);
-	INIT_LIST_HEAD(&pool->drop_list);
+	pool->op = ops;
+	pool->device = rds_ibdev;
+	INIT_LIST_HEAD(&pool->dirty_list);
 	INIT_LIST_HEAD(&pool->clean_list);
 	mutex_init(&pool->flush_lock);
 	spin_lock_init(&pool->list_lock);
 	INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
 
+	pool->max_message_size = message_size;
+	pool->max_items = pool_size;
+	pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
+
 	pool->fmr_attr.max_pages = fmr_message_size;
 	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
 	pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
@@ -154,8 +367,44 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
 	 * items more aggressively.
 	 * Make sure that max_items > max_items_soft > max_items / 2
 	 */
-	pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
-	pool->max_items = rds_ibdev->max_fmrs;
+	pool->max_items_soft = pool->max_items * 3 / 4;
+
+	return pool;
+}
+
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+{
+	struct rds_ib_mr_pool *pool;
+	unsigned int pool_size;
+
+	if (!rds_ibdev->use_fastreg) {
+		/* Use FMRs to implement memory registrations */
+		pool_size = fmr_pool_size;
+
+		if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
+			pool_size = rds_ibdev->max_fmrs;
+
+		pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
+					&rds_ib_fmr_pool_ops);
+
+		if (!IS_ERR(pool)) {
+			pool->fmr_attr.max_pages = pool->max_message_size;
+			pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
+			pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
+		}
+	} else {
+		/* Use fastregs to implement memory registrations */
+		pool_size = fastreg_pool_size;
+
+		pool = __rds_ib_create_mr_pool(rds_ibdev,
+					fastreg_message_size,
+					pool_size,
+					&rds_ib_fastreg_pool_ops);
+
+		if (IS_ERR(pool)) {
+			printk("__rds_ib_create_mr_pool error\n");
+		}
+	}
 
 	return pool;
 }
@@ -184,8 +433,8 @@ static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool)
 
 	spin_lock_irqsave(&pool->list_lock, flags);
 	if (!list_empty(&pool->clean_list)) {
-		ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, list);
-		list_del_init(&ibmr->list);
+		ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, mapping.m_list);
+		list_del_init(&ibmr->mapping.m_list);
 	}
 	spin_unlock_irqrestore(&pool->list_lock, flags);
 
@@ -232,114 +481,26 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
 		goto out_no_cigar;
 	}
 
-	ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
-			(IB_ACCESS_LOCAL_WRITE |
-			 IB_ACCESS_REMOTE_READ |
-			 IB_ACCESS_REMOTE_WRITE),
-			&pool->fmr_attr);
-	if (IS_ERR(ibmr->fmr)) {
-		err = PTR_ERR(ibmr->fmr);
-		ibmr->fmr = NULL;
-		printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
+	spin_lock_init(&ibmr->mapping.m_lock);
+	INIT_LIST_HEAD(&ibmr->mapping.m_list);
+	ibmr->mapping.m_mr = ibmr;
+
+	err = pool->op->init(pool, ibmr);
+	if (err)
 		goto out_no_cigar;
-	}
 
 	rds_ib_stats_inc(s_ib_rdma_mr_alloc);
 	return ibmr;
 
 out_no_cigar:
 	if (ibmr) {
-		if (ibmr->fmr)
-			ib_dealloc_fmr(ibmr->fmr);
+		pool->op->destroy(pool, ibmr);
 		kfree(ibmr);
 	}
 	atomic_dec(&pool->item_count);
 	return ERR_PTR(err);
 }
 
-static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
-	       struct scatterlist *sg, unsigned int nents)
-{
-	struct ib_device *dev = rds_ibdev->dev;
-	struct scatterlist *scat = sg;
-	u64 io_addr = 0;
-	u64 *dma_pages;
-	u32 len;
-	int page_cnt, sg_dma_len;
-	int i, j;
-	int ret;
-
-	sg_dma_len = ib_dma_map_sg(dev, sg, nents,
-				 DMA_BIDIRECTIONAL);
-	if (unlikely(!sg_dma_len)) {
-	        printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
-		return -EBUSY;
-	}
-
-	len = 0;
-	page_cnt = 0;
-
-	for (i = 0; i < sg_dma_len; ++i) {
-		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
-		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
-	
-		if (dma_addr & ~rds_ibdev->fmr_page_mask) {
-			if (i > 0)
-				return -EINVAL;
-			else
-				++page_cnt;
-		}
-		if ((dma_addr + dma_len) & ~rds_ibdev->fmr_page_mask) {
-			if (i < sg_dma_len - 1)
-				return -EINVAL;
-			else
-				++page_cnt;
-		}
-
-		len += dma_len;
-	}
-
-	page_cnt += len >> rds_ibdev->fmr_page_shift;
-	if (page_cnt > fmr_message_size)
-		return -EINVAL;
-
-	dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC);
-	if (!dma_pages)
-		return -ENOMEM;
-
-	page_cnt = 0;
-	for (i = 0; i < sg_dma_len; ++i) {
-		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
-		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
-	
-		for (j = 0; j < dma_len; j += rds_ibdev->fmr_page_size)
-			dma_pages[page_cnt++] = 
-				(dma_addr & rds_ibdev->fmr_page_mask) + j;
-	}
-				
-	ret = ib_map_phys_fmr(ibmr->fmr,
-				   dma_pages, page_cnt, io_addr);	
-	if (ret)
-		goto out;
-
-	/* Success - we successfully remapped the MR, so we can
-	 * safely tear down the old mapping. */
-	rds_ib_teardown_mr(ibmr);
-
-	ibmr->sg = scat;
-	ibmr->sg_len = nents;
-	ibmr->sg_dma_len = sg_dma_len;
-	ibmr->remap_count++;
-
-	rds_ib_stats_inc(s_ib_rdma_mr_used);
-	ret = 0;
-
-out:
-	kfree(dma_pages);
-
-	return ret;
-}
-
 void rds_ib_sync_mr(void *trans_private, int direction)
 {
 	struct rds_ib_mr *ibmr = trans_private;
@@ -347,51 +508,21 @@ void rds_ib_sync_mr(void *trans_private, int direction)
 
 	switch (direction) {
 	case DMA_FROM_DEVICE:
-		ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,
-			ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+		ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->mapping.m_sg.list,
+			ibmr->mapping.m_sg.dma_len, DMA_BIDIRECTIONAL);
 		break;
 	case DMA_TO_DEVICE:
-		ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg,
-			ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+		ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->mapping.m_sg.list,
+			ibmr->mapping.m_sg.dma_len, DMA_BIDIRECTIONAL);
 		break;
 	}
 }
 
-static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
-{
-	struct rds_ib_device *rds_ibdev = ibmr->device;
-
-	if (ibmr->sg_dma_len) {
-		ib_dma_unmap_sg(rds_ibdev->dev,
-				ibmr->sg, ibmr->sg_len,
-				DMA_BIDIRECTIONAL);
-		ibmr->sg_dma_len = 0;
-	}
-
-	/* Release the s/g list */
-	if (ibmr->sg_len) {
-		unsigned int i;
-
-		for (i = 0; i < ibmr->sg_len; ++i) {
-			struct page *page = sg_page(&ibmr->sg[i]);
-
-			/* FIXME we need a way to tell a r/w MR
-			 * from a r/o MR */
-			set_page_dirty(page);
-			put_page(page);
-		}
-		kfree(ibmr->sg);
-
-		ibmr->sg = NULL;
-		ibmr->sg_len = 0;
-	}
-}
-
 void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 {
-	unsigned int pinned = ibmr->sg_len;
+	unsigned int pinned;
 
-	__rds_ib_teardown_mr(ibmr);
+	pinned = rds_ib_drop_scatterlist(ibmr->device, &ibmr->mapping.m_sg);
 	if (pinned) {
 		struct rds_ib_device *rds_ibdev = ibmr->device;
 		struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
@@ -424,8 +555,7 @@ int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
 {
 	struct rds_ib_mr *ibmr, *next;
 	LIST_HEAD(unmap_list);
-	LIST_HEAD(fmr_list);
-	unsigned long unpinned = 0;
+	LIST_HEAD(kill_list);
 	unsigned long flags;
 	unsigned int nfreed = 0, ncleaned = 0, free_goal;
 	int ret = 0;
@@ -435,49 +565,50 @@ int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
 	mutex_lock(&pool->flush_lock);
 
 	spin_lock_irqsave(&pool->list_lock, flags);
-	/* Get the list of all MRs to be dropped. Ordering matters -
-	 * we want to put drop_list ahead of free_list. */
-	list_splice_init(&pool->free_list, &unmap_list);
-	list_splice_init(&pool->drop_list, &unmap_list);
+	/* Get the list of all mappings to be destroyed */
+	list_splice_init(&pool->dirty_list, &unmap_list);
 	if (free_all)
-		list_splice_init(&pool->clean_list, &unmap_list);
+		list_splice_init(&pool->clean_list, &kill_list);
 	spin_unlock_irqrestore(&pool->list_lock, flags);
 
 	free_goal = rds_ib_flush_goal(pool, free_all);
 
-	if (list_empty(&unmap_list))
-		goto out;
-
-	/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
-	list_for_each_entry(ibmr, &unmap_list, list)
-		list_add(&ibmr->fmr->list, &fmr_list);
-	ret = ib_unmap_fmr(&fmr_list);
-	if (ret)
-		printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+	/* Batched invalidate of dirty MRs.
+	 * For FMR based MRs, the mappings on the unmap list are
+	 * actually members of an ibmr (ibmr->mapping). They either
+	 * migrate to the kill_list, or have been cleaned and should be
+	 * moved to the clean_list.
+	 * For fastregs, they will be dynamically allocated, and
+	 * will be destroyed by the unmap function.
+	 */
+	if (!list_empty(&unmap_list)) {
+		ncleaned = pool->op->unmap(pool, &unmap_list, &kill_list);
+		/* If we've been asked to destroy all MRs, move those
+		 * that were simply cleaned to the kill list */
+		if (free_all)
+			list_splice_init(&unmap_list, &kill_list);
+	}
 
-	/* Now we can destroy the DMA mapping and unpin any pages */
-	list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
-		unpinned += ibmr->sg_len;
-		__rds_ib_teardown_mr(ibmr);
-		if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
-			rds_ib_stats_inc(s_ib_rdma_mr_free);
-			list_del(&ibmr->list);
-			ib_dealloc_fmr(ibmr->fmr);
-			kfree(ibmr);
-			nfreed++;
-		}
-		ncleaned++;
+	/* Destroy any MRs that are past their best before date */
+	list_for_each_entry_safe(ibmr, next, &kill_list, mapping.m_list) {
+		rds_ib_stats_inc(s_ib_rdma_mr_free);
+		list_del(&ibmr->mapping.m_list);
+		pool->op->destroy(pool, ibmr);
+		kfree(ibmr);
+		nfreed++;
 	}
 
-	spin_lock_irqsave(&pool->list_lock, flags);
-	list_splice(&unmap_list, &pool->clean_list);
-	spin_unlock_irqrestore(&pool->list_lock, flags);
+	/* Anything that remains are laundered ibmrs, which we can add
+	 * back to the clean list. */
+	if (!list_empty(&unmap_list)) {
+		spin_lock_irqsave(&pool->list_lock, flags);
+		list_splice(&unmap_list, &pool->clean_list);
+		spin_unlock_irqrestore(&pool->list_lock, flags);
+	}
 
-	atomic_sub(unpinned, &pool->free_pinned);
 	atomic_sub(ncleaned, &pool->dirty_count);
 	atomic_sub(nfreed, &pool->item_count);
 
-out:
 	mutex_unlock(&pool->flush_lock);
 	return ret;
 }
@@ -494,22 +625,13 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
 	struct rds_ib_mr *ibmr = trans_private;
 	struct rds_ib_device *rds_ibdev = ibmr->device;
 	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
-	unsigned long flags;
 
-	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
+	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->mapping.m_sg.len);
 	if (!pool)
 		return;
 
 	/* Return it to the pool's free list */
-	spin_lock_irqsave(&pool->list_lock, flags);
-	if (ibmr->remap_count >= pool->fmr_attr.max_maps) {
-		list_add(&ibmr->list, &pool->drop_list);
-	} else {
-		list_add(&ibmr->list, &pool->free_list);
-	}
-	atomic_add(ibmr->sg_len, &pool->free_pinned);
-	atomic_inc(&pool->dirty_count);
-	spin_unlock_irqrestore(&pool->list_lock, flags);
+	pool->op->free(pool, ibmr);
 
 	/* If we've pinned too many pages, request a flush */
 	if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned
@@ -540,19 +662,21 @@ void rds_ib_flush_mrs(void)
 }
 
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
-		    __be32 ip_addr, u32 *key_ret)
+		    struct rds_sock *rs, u32 *key_ret)
 {
 	struct rds_ib_device *rds_ibdev;
+	struct rds_ib_mr_pool *pool;
 	struct rds_ib_mr *ibmr = NULL;
+	struct ib_qp *qp;
 	int ret;
 
-	rds_ibdev = ib_get_device(ip_addr);
-	if (!rds_ibdev) {
+	ret = ib_get_device(rs, &rds_ibdev, &qp);
+	if (ret || !qp) {
 		ret = -ENODEV;
 		goto out;
 	}
 
-	if (!rds_ibdev->mr_pool) {
+	if (!(pool = rds_ibdev->mr_pool)) {
 		ret = -ENODEV;
 		goto out;
 	}
@@ -561,13 +685,14 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 	if (IS_ERR(ibmr))
 		return ibmr;
 
-	ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents);
+	ibmr->qp = qp;
+	ibmr->device = rds_ibdev;
+
+	ret = pool->op->map(pool, ibmr, sg, nents);
 	if (ret == 0)
-		*key_ret = ibmr->fmr->rkey;
+		*key_ret = rds_ibdev->dev->node_type == RDMA_NODE_RNIC ? ibmr->fr_mr->rkey : ibmr->u.fmr->rkey;
 	else
-		printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
-
-	ibmr->device = rds_ibdev;
+		printk(KERN_WARNING "RDS/IB: failed to map mr (errno=%d)\n", ret);
 
  out:   
 	if (ret) {
@@ -577,3 +702,357 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 	}
 	return ibmr;
 }
+
+/*
+ * This is the code that implements RDS memory registrations
+ * through FMRs.
+ */
+static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool,
+			struct rds_ib_mr *ibmr)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct ib_fmr *fmr;
+
+	fmr = ib_alloc_fmr(rds_ibdev->pd,
+			(IB_ACCESS_LOCAL_WRITE |
+			 IB_ACCESS_REMOTE_READ |
+			 IB_ACCESS_REMOTE_WRITE),
+			&pool->fmr_attr);
+	if (IS_ERR(fmr)) {
+		int err = PTR_ERR(fmr);
+
+		printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
+		return err;
+	}
+
+	ibmr->u.fmr = fmr;
+	return 0;
+}
+
+static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
+	       struct scatterlist *sg, unsigned int nents)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct rds_ib_scatterlist ibsg;
+	u64 *dma_pages;
+	int ret;
+
+	rds_ib_set_scatterlist(&ibsg, sg, nents);
+
+	dma_pages = rds_ib_map_scatterlist(rds_ibdev, &ibsg, rds_ibdev->fmr_page_shift);
+	if (IS_ERR(dma_pages))
+		return PTR_ERR(dma_pages);
+
+	ret = ib_map_phys_fmr(ibmr->u.fmr, dma_pages, ibsg.dma_npages, 0);
+	if (ret) {
+		rds_ib_drop_scatterlist(rds_ibdev, &ibsg);
+		goto out;
+	}
+
+	/* Success - we successfully remapped the MR, so we can
+	 * safely tear down the old mapping. */
+	rds_ib_teardown_mr(ibmr);
+
+	ibmr->mapping.m_sg = ibsg;
+	ibmr->remap_count++;
+
+	rds_ib_stats_inc(s_ib_rdma_mr_used);
+	ret = 0;
+
+out:
+	kfree(dma_pages);
+
+	return ret;
+}
+
+static void rds_ib_free_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr)
+{
+	unsigned long flags;
+
+	/* MRs that have reached their maximum remap count get queued
+	 * to the head of the list.
+	 */
+	spin_lock_irqsave(&pool->list_lock, flags);
+	if (ibmr->remap_count >= pool->fmr_attr.max_maps) {
+		list_add(&ibmr->mapping.m_list, &pool->dirty_list);
+	} else {
+		list_add_tail(&ibmr->mapping.m_list, &pool->dirty_list);
+	}
+	atomic_add(ibmr->mapping.m_sg.len, &pool->free_pinned);
+	atomic_inc(&pool->dirty_count);
+	spin_unlock_irqrestore(&pool->list_lock, flags);
+}
+
+static unsigned int rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool,
+			       struct list_head *unmap_list,
+			       struct list_head *kill_list)
+{
+	struct rds_ib_mapping *mapping, *next;
+	struct rds_ib_mr *ibmr;
+	LIST_HEAD(fmr_list);
+	unsigned long unpinned = 0, ncleaned = 0;
+	int ret;
+
+	/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
+	list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
+		ibmr = mapping->m_mr;
+
+		list_add(&ibmr->u.fmr->list, &fmr_list);
+	}
+	ret = ib_unmap_fmr(&fmr_list);
+	if (ret)
+		printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+
+	/* Now we can destroy the DMA mapping and unpin any pages */
+	list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
+		ibmr = mapping->m_mr;
+
+		unpinned += rds_ib_drop_scatterlist(ibmr->device, &mapping->m_sg);
+		if (ibmr->remap_count >= pool->fmr_attr.max_maps)
+			list_move(&mapping->m_list, kill_list);
+		ncleaned++;
+	}
+
+	atomic_sub(unpinned, &pool->free_pinned);
+	return ncleaned;
+}
+
+static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool,
+			       struct rds_ib_mr *ibmr)
+{
+	if (ibmr->u.fmr)
+		ib_dealloc_fmr(ibmr->u.fmr);
+	ibmr->u.fmr = NULL;
+}
+
+/*
+ * iWARP fastreg handling
+ *
+ * The life cycle of a fastreg registration is a bit different from
+ * FMRs.
+ * The idea behind fastreg is to have one MR, to which we bind different
+ * mappings over time. To avoid stalling on the expensive map and invalidate
+ * operations, these operations are pipelined on the same send queue on
+ * which we want to send the message containing the r_key.
+ *
+ * This creates a bit of a problem for us, as we do not have the destination
+ * IP in GET_MR, so the connection must be setup prior to the GET_MR call for
+ * RDMA to be correctly setup.  If a fastreg request is present, rds_ib_xmit
+ * will try to queue a LOCAL_INV (if needed) and a FAST_REG_MR work request
+ * before queuing the SEND. When completions for these arrive, they are
+ * dispatched to the MR has a bit set showing that RDMa can be performed.
+ *
+ * There is another interesting aspect that's related to invalidation.
+ * The application can request that a mapping is invalidated in FREE_MR.
+ * The expectation there is that this invalidation step includes ALL
+ * PREVIOUSLY FREED MRs.
+ */
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
+				struct rds_ib_mr *ibmr)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct rds_ib_mapping *mapping = &ibmr->mapping;
+	struct ib_fast_reg_page_list *page_list = NULL;
+	struct ib_mr *mr;
+	int err;
+
+	mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->max_message_size);
+	if (IS_ERR(mr)) {
+		err = PTR_ERR(mr);
+
+		printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
+		return err;
+	}
+
+	page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, mapping->m_sg.dma_npages);
+	if (IS_ERR(page_list)) {
+		err = PTR_ERR(page_list);
+
+		printk(KERN_WARNING "RDS/IB: ib_alloc_fast_reg_page_list failed (err=%d)\n", err);
+		ib_dereg_mr(mr);
+		return err;
+	}
+
+	ibmr->fr_page_list = page_list;
+	ibmr->fr_mr = mr;
+	return 0;
+}
+
+static int rds_ib_rdma_fastreg_inv(struct rds_ib_mr *ibmr)
+{
+	struct ib_send_wr i_wr, *failed_wr;
+	int ret;
+
+	memset(&i_wr, 0, sizeof(i_wr));
+	i_wr.opcode = IB_WR_LOCAL_INV;
+	i_wr.ex.invalidate_rkey = ibmr->fr_mr->rkey;
+	i_wr.send_flags = IB_SEND_SIGNALED;
+
+	failed_wr = &i_wr;
+	ret = ib_post_send(ibmr->qp, &i_wr, &failed_wr);
+	BUG_ON(failed_wr != &i_wr);
+	if (ret) {
+		printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
+			__func__, __LINE__, ret);
+		goto out;
+	}
+out:
+	return ret;
+}
+
+static int rds_ib_rdma_build_fastreg(struct ib_qp *qp, struct rds_ib_mapping *mapping)
+{
+	struct rds_ib_mr *ibmr = mapping->m_mr;
+	struct ib_send_wr f_wr, *failed_wr;
+	int ret;
+
+	/*
+	 * Perform a WR for the fast_reg_mr. Each individual page
+	 * in the sg list is added to the fast reg page list and placed
+	 * inside the fast_reg_mr WR.  The key used is a rolling 8bit
+	 * counter, which should guarantee uniqueness.
+	 */
+
+	ib_update_fast_reg_key(ibmr->fr_mr, ibmr->remap_count++);
+	mapping->m_rkey = ibmr->fr_mr->rkey;
+
+	memset(&f_wr, 0, sizeof(f_wr));
+	f_wr.opcode = IB_WR_FAST_REG_MR;
+	f_wr.wr.fast_reg.length = mapping->m_sg.bytes;
+	f_wr.wr.fast_reg.rkey = mapping->m_rkey;
+	f_wr.wr.fast_reg.page_list = ibmr->fr_page_list;
+	f_wr.wr.fast_reg.page_list_len = mapping->m_sg.dma_len;
+	f_wr.wr.fast_reg.page_shift = ibmr->device->fmr_page_shift;
+	f_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
+				IB_ACCESS_REMOTE_READ |
+				IB_ACCESS_REMOTE_WRITE;
+	f_wr.wr.fast_reg.iova_start = 0;
+	f_wr.send_flags = IB_SEND_SIGNALED;
+
+	failed_wr = &f_wr;
+	ret = ib_post_send(qp, &f_wr, &failed_wr);
+	BUG_ON(failed_wr != &f_wr);
+	if (ret) {
+		printk(KERN_WARNING "RDS/IB: %s %d ib_post_send returned %d\n",
+			__func__, __LINE__, ret);
+		goto out;
+	}
+
+out:
+	return ret;
+}
+
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+			struct rds_ib_mr *ibmr,
+	       		struct scatterlist *sg,
+			unsigned int sg_len)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct rds_ib_mapping *mapping = &ibmr->mapping;
+	u64 *dma_pages;
+	int i, ret;
+
+	rds_ib_set_scatterlist(&mapping->m_sg, sg, sg_len);
+
+	dma_pages = rds_ib_map_scatterlist(rds_ibdev,
+				&mapping->m_sg,
+				rds_ibdev->fmr_page_shift);
+	if (IS_ERR(dma_pages)) {
+		ret = PTR_ERR(dma_pages);
+		dma_pages = NULL;
+		goto out;
+	}
+
+	if (mapping->m_sg.dma_len > pool->max_message_size) {
+		ret = -EMSGSIZE;
+		goto out;
+	}
+
+	for (i = 0; i < mapping->m_sg.dma_npages; ++i)
+		ibmr->fr_page_list->page_list[i] = dma_pages[i];
+
+	rds_ib_rdma_build_fastreg(ibmr->qp, mapping);
+
+	rds_ib_stats_inc(s_ib_rdma_mr_used);
+	ret = 0;
+
+out:
+	kfree(dma_pages);
+
+	return ret;
+}
+
+/*
+ * "Free" a fastreg MR.
+ */
+static void rds_ib_free_fastreg(struct rds_ib_mr_pool *pool,
+		struct rds_ib_mr *ibmr)
+{
+	unsigned long flags;
+
+	if (!ibmr->mapping.m_sg.dma_len)
+		return;
+
+	rds_ib_rdma_fastreg_inv(ibmr);
+
+	/* Try to post the LOCAL_INV WR to the queue. */
+	spin_lock_irqsave(&pool->list_lock, flags);
+
+	list_add_tail(&ibmr->mapping.m_list, &pool->dirty_list);
+	atomic_add(ibmr->mapping.m_sg.len, &pool->free_pinned);
+	atomic_inc(&pool->dirty_count);
+
+	spin_unlock_irqrestore(&pool->list_lock, flags);
+}
+
+static unsigned int rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
+				struct list_head *unmap_list,
+				struct list_head *kill_list)
+{
+	struct rds_ib_mapping *mapping, *next;
+	unsigned int ncleaned = 0;
+	LIST_HEAD(laundered);
+
+	/* Batched invalidation of fastreg MRs.
+	 * Why do we do it this way, even though we could pipeline unmap
+	 * and remap? The reason is the application semantics - when the
+	 * application requests an invalidation of MRs, it expects all
+	 * previously released R_Keys to become invalid.
+	 *
+	 * If we implement MR reuse naively, we risk memory corruption
+	 * (this has actually been observed). So the default behavior
+	 * requires that a MR goes through an explicit unmap operation before
+	 * we can reuse it again.
+	 *
+	 * We could probably improve on this a little, by allowing immediate
+	 * reuse of a MR on the same socket (eg you could add small
+	 * cache of unused MRs to strct rds_socket - GET_MR could grab one
+	 * of these without requiring an explicit invalidate).
+	 */
+	while (!list_empty(unmap_list)) {
+		unsigned long flags;
+
+		spin_lock_irqsave(&pool->list_lock, flags);
+		list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
+			list_move(&mapping->m_list, &laundered);
+			ncleaned++;
+		}
+		spin_unlock_irqrestore(&pool->list_lock, flags);
+	}
+
+	/* Move all laundered mappings back to the unmap list.
+	 * We do not kill any WRs right now - it doesn't seem the
+	 * fastreg API has a max_remap limit. */
+	list_splice_init(&laundered, unmap_list);
+
+	return ncleaned;
+}
+
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool,
+		struct rds_ib_mr *ibmr)
+{
+	if (ibmr->u.fastreg.page_list)
+		ib_free_fast_reg_page_list(ibmr->u.fastreg.page_list);
+	if (ibmr->u.fastreg.mr)
+		ib_dereg_mr(ibmr->u.fastreg.mr);
+}
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index 1da664e..6738758 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -402,9 +402,10 @@ static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
 	unsigned long flags;
 
 	spin_lock_irqsave(&ic->i_ack_lock, flags);
-	ic->i_ack_next = seq;
 	if (ack_required)
 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
+	if (seq > ic->i_ack_next)
+		ic->i_ack_next = seq;
 	spin_unlock_irqrestore(&ic->i_ack_lock, flags);
 }
 
@@ -749,13 +750,10 @@ static void rds_ib_process_recv(struct rds_connection *conn,
 
 		if (ibinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
 			rds_ib_cong_recv(conn, ibinc);
-		else {
+		else
 			rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
 					  &ibinc->ii_inc, GFP_ATOMIC,
 					  KM_SOFTIRQ0);
-			state->ack_next = be64_to_cpu(hdr->h_sequence);
-			state->ack_next_valid = 1;
-		}
 
 		/* Evaluate the ACK_REQUIRED flag *after* we received
 		 * the complete frame, and after bumping the next_rx
@@ -765,6 +763,8 @@ static void rds_ib_process_recv(struct rds_connection *conn,
 			state->ack_required = 1;
 		}
 
+		state->ack_next = be64_to_cpu(hdr->h_sequence);
+		state->ack_next_valid = 1;
 		rds_inc_put(&ibinc->ii_inc);
 	}
 }
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 6317ce3..798af51 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -135,7 +135,9 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
 
 		send->s_rm = NULL;
 		send->s_op = NULL;
+		send->s_mapping = NULL;
 
+		send->s_wr.next = NULL;
 		send->s_wr.wr_id = i;
 		send->s_wr.sg_list = send->s_sge;
 		send->s_wr.num_sge = 1;
@@ -192,12 +194,22 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 		rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
 	}
 
-	while (ib_poll_cq(cq, 1, &wc) > 0 ) {
+	while (ib_poll_cq(cq, 1, &wc) > 0) {
 		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
 			 be32_to_cpu(wc.ex.imm_data));
 		rds_ib_stats_inc(s_ib_tx_cq_event);
 
+		if (wc.opcode == IB_WC_LOCAL_INV) { 
+			ic->fastreg_posted = 0;
+			continue;
+		}
+
+		if (wc.opcode == IB_WC_FAST_REG_MR) {
+			ic->fastreg_posted = 1;
+			continue;
+		}
+
 		if (wc.wr_id == RDS_IB_ACK_WR_ID) {
 			if (ic->i_ack_queued + HZ/2 < jiffies)
 				rds_ib_stats_inc(s_ib_tx_stalled);
@@ -475,6 +487,14 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 	BUG_ON(off % RDS_FRAG_SIZE);
 	BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
 
+	/* Fastreg support */
+	if (rds_rdma_cookie_key(rm->m_rdma_cookie)
+	 && ic->i_fastreg
+	 && !ic->fastreg_posted) {
+		ret = -EAGAIN;
+		goto out;
+	}
+
 	/* FIXME we may overallocate here */
 	if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
 		i = 1;
diff --git a/net/rds/message.c b/net/rds/message.c
index 9269b9a..ddeb95b 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -71,6 +71,8 @@ static void rds_message_purge(struct rds_message *rm)
 
 	if (rm->m_rdma_op)
 		rds_rdma_free_op(rm->m_rdma_op);
+	if (rm->m_rdma_mr)
+		rds_mr_put(rm->m_rdma_mr);
 }
 
 void rds_message_inc_purge(struct rds_incoming *inc)
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index 1f1039e..9e12e87 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -116,11 +116,8 @@ static void rds_destroy_mr(struct rds_mr *mr)
 		mr->r_trans->free_mr(trans_private, mr->r_invalidate);
 }
 
-static void rds_mr_put(struct rds_mr *mr)
+void __rds_put_mr_final(struct rds_mr *mr)
 {
-	if (!atomic_dec_and_test(&mr->r_refcount))
-		return;
-
 	rds_destroy_mr(mr);
 	kfree(mr);
 }
@@ -169,7 +166,7 @@ static int rds_pin_pages(unsigned long user_addr, unsigned int nr_pages,
 }
 
 static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
-				u64 *cookie_ret)
+				u64 *cookie_ret, struct rds_mr **mr_ret)
 {
 	struct rds_mr *mr = NULL, *found;
 	unsigned int nr_pages;
@@ -257,8 +254,7 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 	 * s/g list is now owned by the MR.
 	 * Note that dma_map() implies that pending writes are
 	 * flushed to RAM, so no dma_sync is needed here. */
-	trans_private = rs->rs_transport->get_mr(sg, nents,
-						 rs->rs_bound_addr, 
+	trans_private = rs->rs_transport->get_mr(sg, nents, rs, 
 						 &mr->r_key);
 
 	if (IS_ERR(trans_private)) {
@@ -297,6 +293,11 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 
 	rdsdebug("RDS: get_mr key is %x\n", mr->r_key);
 
+	if (mr_ret) {
+		atomic_inc(&mr->r_refcount);
+		*mr_ret = mr;
+	}
+
 	ret = 0;
 out:
 	if (pages)
@@ -317,7 +318,7 @@ int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen)
 			   sizeof(struct rds_get_mr_args)))
 		return -EFAULT;
 
-	return __rds_rdma_map(rs, &args, NULL);
+	return __rds_rdma_map(rs, &args, NULL, NULL);
 }
 
 /*
@@ -655,7 +656,7 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
 
 	if (mr) {
 		mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
-		rds_mr_put(mr);
+		rm->m_rdma_mr = mr;
 	}
 	return err;
 }
@@ -673,5 +674,5 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
 	 || rm->m_rdma_cookie != 0)
 		return -EINVAL;
 
-	return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie);
+	return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->m_rdma_mr);
 }
diff --git a/net/rds/rdma.h b/net/rds/rdma.h
index b1734a0..b6249e4 100644
--- a/net/rds/rdma.h
+++ b/net/rds/rdma.h
@@ -74,4 +74,11 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
 void rds_rdma_free_op(struct rds_rdma_op *ro);
 void rds_rdma_send_complete(struct rds_message *rm, int);
 
+extern void __rds_put_mr_final(struct rds_mr *mr);
+static inline void rds_mr_put(struct rds_mr *mr)
+{
+	if (atomic_dec_and_test(&mr->r_refcount))
+		__rds_put_mr_final(mr);
+}
+
 #endif
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 235c951..fee481e 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -297,6 +297,7 @@ struct rds_message {
 	struct rds_sock		*m_rs;
 	struct rds_rdma_op	*m_rdma_op;
 	rds_rdma_cookie_t	m_rdma_cookie;
+	struct rds_mr		*m_rdma_mr;
 	unsigned int		m_nents;
 	unsigned int		m_count;
 	struct scatterlist	m_sg[0];
@@ -373,7 +374,7 @@ struct rds_transport {
 					unsigned int avail);
 	void (*exit)(void);
 	void *(*get_mr)(struct scatterlist *sg, unsigned long nr_sg,
-			__be32 ip_addr, u32 *key_ret);
+			struct rds_sock *rs, u32 *key_ret);
 	void (*sync_mr)(void *trans_private, int direction);
 	void (*free_mr)(void *trans_private, int invalidate);
 	void (*flush_mrs)(void);
diff --git a/net/rds/send.c b/net/rds/send.c
index b9d98c8..87237be 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -765,6 +765,9 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
 		if (cmsg->cmsg_level != SOL_RDS)
 			continue;
 
+		/* As a side effect, RDMA_DEST and RDMA_MAP will set
+		 * rm->m_rdma_cookie and rm->m_rdma_mr.
+		 */
 		switch (cmsg->cmsg_type) {
 		case RDS_CMSG_RDMA_ARGS:
 			ret = rds_cmsg_rdma_args(rs, rm, cmsg);



More information about the rds-devel mailing list