[rds-devel] Re: [PATCH] Implement our own RDMA MR pool

Or Gerlitz ogerlitz at voltaire.com
Wed Jan 30 07:06:07 PST 2008


>     Implement our own RDMA MR pool
> 
>     The RDS RDMA code currently uses the FMR pool provided by the IB
>     stack. There are a number of issues with our way of using this pool,

Hi Olaf, see my comments below

>     the worst of which is probably that we have no control about when the
>     FMR is destroyed. We tear down the mapping when we think we're done
>     with the MR, but the MR may continue to be active a long time after
>     that, and if the peer decides to do a RDMA write to that MR it will
>     scribble over random memory.

this is a hell of good catch, so the fmr pool expose the system to 
someone scribble over pages it was given access before. I wonder if 
there's a way for the fmr pool to handle that. Maybe allocate some pages 
by the pool and have each MR which is dirty and in the free list more 
then 10 seconds to be mapped to this pages? Roland?

>     So in order to play things safely we would either keep MR mappings
>     around indefinitely, or invalidate each time we release a MR (which
>     is prohibitively costly) or write some code.

if not invalidating (fmr unmap) each time you release an MR, how can you 
protect against the problem pointed by you above?

>     This patch implements our own very light-weight MR pool. It does not
>     provide any of the caching that the IB fmr_pool provides, but it does
>     do batched release, and keeps track of the mappings and destroys them
>     when appropriate.
> 
>     This code differs from the previous implementation in the following
>     areas:
> 

>      -	we never unpin pages before the FMR is either unmapped, or remapped

>      -	we keep an upper limit on the number of pages pinned by
>      	free MRs, and if that limit is exceeded, we start flushing

how can a free MR have pinned pages? just one line above you say that 
page is never unpinned before...

>      -	there is no longer a hard upper limit on the number of MRs,
>      	as was the case of the fmr_pool

limits are good, no-limits is problematic... you say there's no "hard 
upper limit", is there any limit?

>      -	smaller memory footprint - the fmr_pool would allocate lots
>      	of memory in advance (in particular, one full sized address
>     	vector for each FMR, which was never used anyway).

thanks for pointing this out, I sent Roland a patch yesterday that fixes 
that in the fmr_pool, that is no page_list is allocated if the user does 
not ask for caching

>      -	there is no separate thread for flushing MRs. If we're in
>      	IRQ context, we put a work item on the rds work queue.
>     	Otherwise, the flush code is executed in the calling context

what problem does this solve?

>     Signed-off-by: Olaf Kirch <olaf.kirch at oracle.com>

Or


> 
> diff --git a/net/rds/ib.c b/net/rds/ib.c
> index 45569e0..8e38346 100644
> --- a/net/rds/ib.c
> +++ b/net/rds/ib.c
> @@ -40,7 +40,7 @@
>  #include "rds.h"
>  #include "ib.h"
> 
> -static int fmr_pool_size = RDS_FMR_POOL_SIZE;
> +unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
>  unsigned int fmr_message_size = RDS_FMR_SIZE;
> 
>  module_param(fmr_pool_size, int, 0444);
> @@ -63,7 +63,6 @@ void rds_ib_add_one(struct ib_device *device)
>  {
>  	struct rds_ib_device *rds_ibdev;
>  	struct ib_device_attr *dev_attr;
> -	struct ib_fmr_pool_param fmr_param;
> 
>  	dev_attr = kmalloc(sizeof *dev_attr, GFP_KERNEL);
>  	if (!dev_attr)
> @@ -85,6 +84,7 @@ void rds_ib_add_one(struct ib_device *device)
>  	rds_ibdev->fmr_page_shift = max(9, ffs(dev_attr->page_size_cap) - 1);
>  	rds_ibdev->fmr_page_size  = 1 << rds_ibdev->fmr_page_shift;
>  	rds_ibdev->fmr_page_mask  = ~((u64) rds_ibdev->fmr_page_size - 1);
> +	rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
> 
>  	rds_ibdev->dev = device;
>  	rds_ibdev->pd = ib_alloc_pd(device);
> @@ -98,19 +98,9 @@ void rds_ib_add_one(struct ib_device *device)
>  	if (IS_ERR(rds_ibdev->mr))
>  		goto err_pd;
> 
> -	memset(&fmr_param, 0, sizeof fmr_param);
> -	fmr_param.pool_size	    = fmr_pool_size;
> -	fmr_param.dirty_watermark   = fmr_pool_size / 4;
> -	fmr_param.cache		    = 0;
> -	fmr_param.max_pages_per_fmr = fmr_message_size;
> -	fmr_param.page_shift	    = rds_ibdev->fmr_page_shift;
> -	fmr_param.access	    = (IB_ACCESS_LOCAL_WRITE |
> -				       IB_ACCESS_REMOTE_READ |
> -				       IB_ACCESS_REMOTE_WRITE);
> -
> -	rds_ibdev->fmr_pool = ib_create_fmr_pool(rds_ibdev->pd, &fmr_param);
> -	if (IS_ERR(rds_ibdev->fmr_pool)) {
> -		rds_ibdev->fmr_pool = NULL;
> +	rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
> +	if (IS_ERR(rds_ibdev->mr_pool)) {
> +		rds_ibdev->mr_pool = NULL;
>  		goto err_mr;
>  	}
> 
> @@ -143,8 +133,8 @@ void rds_ib_remove_one(struct ib_device *device)
>  		kfree(i_ipaddr);
>  	}
> 
> -	if (rds_ibdev->fmr_pool)
> -		ib_destroy_fmr_pool(rds_ibdev->fmr_pool);
> +	if (rds_ibdev->mr_pool)
> +		rds_ib_destroy_mr_pool(rds_ibdev->mr_pool);
> 
>  	ib_dereg_mr(rds_ibdev->mr);
>  	ib_dealloc_pd(rds_ibdev->pd);
> diff --git a/net/rds/ib.h b/net/rds/ib.h
> index 669ec7b..15be70d 100644
> --- a/net/rds/ib.h
> +++ b/net/rds/ib.h
> @@ -3,7 +3,6 @@
> 
>  #include <rdma/ib_verbs.h>
>  #include <rdma/rdma_cm.h>
> -#include <rdma/ib_fmr_pool.h>
>  #include "rds.h"
> 
>  #define RDS_IB_RESOLVE_TIMEOUT_MS	5000
> @@ -119,10 +118,11 @@ struct rds_ib_device {
>  	struct ib_device	*dev;
>  	struct ib_pd		*pd;
>  	struct ib_mr		*mr;
> -	struct ib_fmr_pool	*fmr_pool;
> +	struct rds_ib_mr_pool	*mr_pool;
>  	int			fmr_page_shift;
>  	int			fmr_page_size;
>  	u64			fmr_page_mask;
> +	unsigned int		fmr_max_remaps;
>  	int			max_sge;
>  	spinlock_t		spinlock;
>  };
> @@ -159,6 +159,7 @@ extern void rds_ib_add_one(struct ib_device *device);
>  extern void rds_ib_remove_one(struct ib_device *device);
>  extern struct ib_client rds_ib_client;
> 
> +extern unsigned int fmr_pool_size;
>  extern unsigned int fmr_message_size;
> 
>  /* ib_cm.c */
> @@ -176,6 +177,8 @@ void __rds_ib_conn_error(struct rds_connection *conn, const char *, ...);
> 
>  /* ib_rdma.c */
>  int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
> +struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
> +void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
>  void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
>  		    __be32 ip_addr, u64 *key_ret, u64 *phyaddr);
>  void rds_ib_free_mr(void *trans_private, int invalidate,
> diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
> index e66b813..4f570ee 100644
> --- a/net/rds/ib_rdma.c
> +++ b/net/rds/ib_rdma.c
> @@ -44,10 +44,37 @@ extern struct list_head rds_ib_devices;
>   */
>  struct rds_ib_mr {
>  	struct rds_ib_device	*device;
> -	struct ib_pool_fmr	*fmr;
> -	int			count;
> +	struct rds_ib_mr_pool	*pool;
> +	struct ib_fmr		*fmr;
> +	struct list_head	list;
> +	unsigned int		remap_count;
> +
> +	struct scatterlist *	sg;
> +	unsigned int		sg_len;
> +	u64 *			dma;
> +	int			sg_dma_len;
>  };
> 
> +/*
> + * Our own little FMR pool
> + */
> +struct rds_ib_mr_pool {
> +	struct mutex		flush_lock;		/* serialize fmr invalidate */
> +	struct work_struct	flush_worker;		/* flush worker */
> +
> +	spinlock_t		list_lock;		/* protect variables below */
> +	unsigned int		item_count;		/* total # of MRs */
> +	struct list_head	drop_list;		/* MRs that have reached their max_maps limit */
> +	struct list_head	free_list;		/* unused MRs */
> +	unsigned long		free_pinned;		/* memory pinned by free MRs */
> +	unsigned long		max_free_pinned;
> +	struct ib_fmr_attr	fmr_attr;
> +};
> +
> +static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
> +static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
> +static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
> +
>  int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
>  {
>  	struct rds_ib_ipaddr *i_ipaddr;
> @@ -93,22 +120,98 @@ struct rds_ib_device* ib_get_device(__be32 ipaddr)
>  	return NULL;
>  }
> 
> -int ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
> -	       struct scatterlist *sg)
> +struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
> +{
> +	struct rds_ib_mr_pool *pool;
> +
> +	pool = kzalloc(sizeof(*pool), GFP_KERNEL);
> +	if (!pool)
> +		return ERR_PTR(-ENOMEM);
> +
> +	INIT_LIST_HEAD(&pool->free_list);
> +	INIT_LIST_HEAD(&pool->drop_list);
> +	mutex_init(&pool->flush_lock);
> +	spin_lock_init(&pool->list_lock);
> +	INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
> +
> +	pool->fmr_attr.max_pages = fmr_message_size;
> +	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
> +	pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
> +	pool->max_free_pinned = fmr_pool_size * fmr_message_size / 4;
> +
> +	return pool;
> +}
> +
> +void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
> +{
> +	flush_workqueue(rds_wq);
> +	rds_ib_flush_mr_pool(pool, 1);
> +	BUG_ON(pool->item_count);
> +	kfree(pool);
> +}
> +
> +static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
> +{
> +	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
> +	struct rds_ib_mr *ibmr;
> +	unsigned long flags;
> +
> +	spin_lock_irqsave(&pool->list_lock, flags);
> +	if (!list_empty(&pool->free_list)) {
> +		ibmr = list_entry(pool->free_list.next, struct rds_ib_mr, list);
> +
> +		list_del_init(&ibmr->list);
> +		pool->free_pinned -= ibmr->sg_len;
> +		goto out;
> +	}
> +	spin_unlock_irqrestore(&pool->list_lock, flags);
> +
> +	ibmr = kzalloc(sizeof(*ibmr), GFP_KERNEL);
> +	if (!ibmr)
> +		return ERR_PTR(-ENOMEM);
> +
> +	ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
> +			(IB_ACCESS_LOCAL_WRITE |
> +			 IB_ACCESS_REMOTE_READ |
> +			 IB_ACCESS_REMOTE_WRITE),
> +			&pool->fmr_attr);
> +	if (IS_ERR(ibmr->fmr)) {
> +		int err = PTR_ERR(ibmr->fmr);
> +
> +		printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
> +		kfree(ibmr);
> +		return ERR_PTR(err);
> +	}
> +
> +	spin_lock_irqsave(&pool->list_lock, flags);
> +	pool->item_count++;
> +out:
> +	spin_unlock_irqrestore(&pool->list_lock, flags);
> +	return ibmr;
> +}
> +
> +static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
> +	       struct scatterlist *sg, unsigned int nents)
>  {
>  	struct scatterlist *scat = sg;
> -	struct ib_pool_fmr *fmr;
>  	u64 io_addr = 0;
>  	u64 *dma_pages;
>  	u32 len;
> -	int page_cnt;
> +	int page_cnt, sg_dma_len;
>  	int i, j;
>  	int ret;
> 
> +	sg_dma_len = dma_map_sg(rds_ibdev->dev->dma_device, sg, nents,
> +				 DMA_BIDIRECTIONAL);
> +	if (unlikely(!sg_dma_len)) {
> +	        printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
> +		return -EBUSY;
> +	}
> +
>  	len = 0;
>  	page_cnt = 0;
> 
> -	for (i = 0; i < ibmr->count; ++i) {
> +	for (i = 0; i < sg_dma_len; ++i) {
>  		unsigned int dma_len = sg_dma_len(&scat[i]);
> 
>  		if (sg_dma_address(&scat[i]) & ~rds_ibdev->fmr_page_mask) {
> @@ -119,7 +222,7 @@ int ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
>  		}
>  		if ((sg_dma_address(&scat[i]) + dma_len) &
>  		    ~rds_ibdev->fmr_page_mask) {
> -			if (i < ibmr->count - 1)
> +			if (i < sg_dma_len - 1)
>  				return -EINVAL;
>  			else
>  				++page_cnt;
> @@ -137,7 +240,7 @@ int ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
>  		return -ENOMEM;
> 
>  	page_cnt = 0;
> -	for (i = 0; i < ibmr->count; ++i) {
> +	for (i = 0; i < sg_dma_len; ++i) {
>  		unsigned int dma_len = sg_dma_len(&scat[i]);
> 
>  		for (j = 0; j < dma_len; j += rds_ibdev->fmr_page_size)
> @@ -146,14 +249,20 @@ int ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
>  				 rds_ibdev->fmr_page_mask) + j;
>  	}
> 
> -	fmr = ib_fmr_pool_map_phys(rds_ibdev->fmr_pool,
> +	ret = ib_map_phys_fmr(ibmr->fmr,
>  				   dma_pages, page_cnt, io_addr);
> -	if (IS_ERR(fmr)) {
> -		ret = PTR_ERR(fmr);
> +	if (ret)
>  		goto out;
> -	}
> 
> -	ibmr->fmr = fmr;
> +	/* Success - we successfully remapped the MR, so we can
> +	 * safely tear down the old mapping. */
> +	rds_ib_teardown_mr(ibmr);
> +
> +	ibmr->sg = scat;
> +	ibmr->sg_len = nents;
> +	ibmr->sg_dma_len = sg_dma_len;
> +	ibmr->remap_count++;
> +
>  	ret = 0;
> 
>  out:
> @@ -162,25 +271,140 @@ out:
>  	return ret;
>  }
> 
> +void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
> +{
> +	struct rds_ib_device *rds_ibdev = ibmr->device;
> +
> +	if (ibmr->sg_dma_len) {
> +		dma_unmap_sg(rds_ibdev->dev->dma_device,
> +				ibmr->sg, ibmr->sg_len,
> +				DMA_BIDIRECTIONAL);
> +		ibmr->sg_dma_len = 0;
> +	}
> +
> +	/* Release the s/g list */
> +	if (ibmr->sg_len) {
> +		unsigned int i;
> +
> +		for (i = 0; i < ibmr->sg_len; ++i) {
> +			struct page *page = sg_page(&ibmr->sg[i]);
> +
> +			/* FIXME we need a way to tell a r/w MR
> +			 * from a r/o MR */
> +			set_page_dirty(page);
> +			put_page(page);
> +		}
> +		kfree(ibmr->sg);
> +
> +		ibmr->sg = NULL;
> +		ibmr->sg_len = 0;
> +	}
> +}
> +
> +/*
> + * Flush our pool of MRs.
> + * At a minimum, all currently unused MRs are unmapped.
> + * If the number of MRs allocated exceeds fmr_pool_size, we also try
> + * to free as many MRs as needed to get back to this limit.
> + */
> +int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
> +{
> +	struct rds_ib_mr *ibmr, *next;
> +	LIST_HEAD(unmap_list);
> +	LIST_HEAD(fmr_list);
> +	unsigned long unpinned = 0;
> +	unsigned long flags;
> +	unsigned int nfreed = 0, free_goal = 0;
> +	int ret = 0;
> +
> +	mutex_lock(&pool->flush_lock);
> +
> +	spin_lock_irqsave(&pool->list_lock, flags);
> +	/* Get the list of all MRs to be dropped. Ordering matters -
> +	 * we want to put drop_list ahead of free_list. */
> +	list_splice_init(&pool->free_list, &unmap_list);
> +	list_splice_init(&pool->drop_list, &unmap_list);
> +
> +	if (free_all)
> +		free_goal = pool->item_count;
> +	else if (pool->item_count > fmr_pool_size)
> +		free_goal = pool->item_count - fmr_pool_size;
> +	spin_unlock_irqrestore(&pool->list_lock, flags);
> +
> +	if (list_empty(&unmap_list))
> +		goto out;
> +
> +	/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
> +	list_for_each_entry(ibmr, &unmap_list, list)
> +		list_add(&ibmr->fmr->list, &fmr_list);
> +	ret = ib_unmap_fmr(&fmr_list);
> +	if (ret)
> +		printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
> +
> +	/* Now we can destroy the DMA mapping and unpin any pages */
> +	list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
> +		unpinned += ibmr->sg_len;
> +		rds_ib_teardown_mr(ibmr);
> +		if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
> +			list_del(&ibmr->list);
> +			ib_dealloc_fmr(ibmr->fmr);
> +			kfree(ibmr);
> +			nfreed++;
> +		}
> +	}
> +
> +	spin_lock_irqsave(&pool->list_lock, flags);
> +	list_splice(&unmap_list, &pool->free_list);
> +	pool->free_pinned -= unpinned;
> +	pool->item_count -= nfreed;
> +	spin_unlock_irqrestore(&pool->list_lock, flags);
> +
> +out:
> +	mutex_unlock(&pool->flush_lock);
> +	return ret;
> +}
> +
> +void rds_ib_mr_pool_flush_worker(struct work_struct *work)
> +{
> +	struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker);
> +
> +	rds_ib_flush_mr_pool(pool, 0);
> +}
> +
>  void rds_ib_free_mr(void *trans_private, int invalidate,
>  		    struct scatterlist *sg, int nents)
>  {
>  	struct rds_ib_mr *ibmr = trans_private;
>  	struct rds_ib_device *rds_ibdev = ibmr->device;
> +	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
> +	unsigned long flags;
> 
>  	rdsdebug("RDS/IB: free_mr nents %u\n",nents);
> 
> -	if (ibmr->fmr)
> -		ib_fmr_pool_unmap(ibmr->fmr);
> -
> -	if (ibmr->count)
> -		dma_unmap_sg(rds_ibdev->dev->dma_device, sg,
> -			     nents, DMA_BIDIRECTIONAL);
> -
> -	if (invalidate)
> -		ib_flush_fmr_pool(rds_ibdev->fmr_pool);
> +	/* Return it to the pool's free list */
> +	spin_lock_irqsave(&pool->list_lock, flags);
> +	if (ibmr->remap_count >= pool->fmr_attr.max_maps) {
> +		list_add(&ibmr->list, &pool->drop_list);
> +	} else {
> +		list_add(&ibmr->list, &pool->free_list);
> +	}
> +	pool->free_pinned += ibmr->sg_len;
> 
> -	kfree(ibmr);
> +	/* If we've pinned too many pages, or if the remap
> +	 * count of this MR was reached, request a flush */
> +	if (pool->free_pinned >= pool->max_free_pinned
> +	 || ibmr->remap_count >= pool->fmr_attr.max_maps) {
> +		queue_work(rds_wq, &pool->flush_worker);
> +	}
> +	spin_unlock_irqrestore(&pool->list_lock, flags);
> +
> +	if (invalidate) {
> +		if (likely(!in_interrupt()))
> +			rds_ib_flush_mr_pool(pool, 0);
> +		else if (printk_ratelimit())
> +			printk(KERN_WARNING "RDS/IB: rds_ib_free_mr(invalidate) request "
> +					"in interrupt!\n");
> +	}
>  }
> 
>  void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
> @@ -196,32 +420,22 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
>  		goto out;
>  	}
> 
> -	if (!rds_ibdev->fmr_pool) {
> +	if (!rds_ibdev->mr_pool) {
>  		ret = -ENODEV;
>  		goto out;
>  	}
> 
> -	ibmr = kzalloc(sizeof(struct rds_ib_mr), GFP_KERNEL);
> -	if (ibmr == NULL) {
> -		ret = -ENOMEM;
> -		goto out;
> -	}
> -
> -	ibmr->count = dma_map_sg(rds_ibdev->dev->dma_device, sg, nents,
> -				 DMA_BIDIRECTIONAL);
> -	if (unlikely(!ibmr->count)) {
> -	        printk(KERN_WARNING "RDS/IB: no dma_sg\n");
> -		ret = -EBUSY;
> -		goto out;
> -	}
> +	ibmr = rds_ib_alloc_fmr(rds_ibdev);
> +	if (IS_ERR(ibmr))
> +		return ibmr;
> 
>  	/* Never return physical addresses and keys to user space.
>  	 * This part of the API is going away. */
>  	*phyaddr = 0;
> 
> -	ret = ib_map_fmr(rds_ibdev, ibmr, sg);
> +	ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents);
>  	if (ret == 0)
> -		*key_ret = ibmr->fmr->fmr->rkey;
> +		*key_ret = ibmr->fmr->rkey;
>  	else
>  		printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
> 
> diff --git a/net/rds/rdma.c b/net/rds/rdma.c
> index a9720af..380c9c4 100644
> --- a/net/rds/rdma.c
> +++ b/net/rds/rdma.c
> @@ -169,28 +169,15 @@ static void rds_destroy_mr(struct rds_mr *mr)
>  	if (trans_private)
>  		mr->r_trans->free_mr(trans_private,
>  			mr->r_invalidate,
> -			mr->r_sg, mr->r_nents);
> +			NULL, 0);
>  }
> 
>  static void rds_mr_put(struct rds_mr *mr)
>  {
> -	unsigned int i;
> -
>  	if (!atomic_dec_and_test(&mr->r_refcount))
>  		return;
> 
>  	rds_destroy_mr(mr);
> -
> -	for (i = 0; i < mr->r_nents; i++) {
> -		struct page *page = sg_page(&mr->r_sg[i]);
> -
> -		/* FIXME we need a way to tell a r/w MR
> -		 * from a r/o MR */
> -		set_page_dirty(page);
> -
> -		put_page(page);
> -	}
> -
>  	kfree(mr);
>  }
> 
> @@ -243,6 +230,7 @@ int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen)
>  	struct scatterlist *sg;
>  	void *trans_private;
>  	unsigned long flags;
> +	unsigned int nents;
>  	long i;
>  	int ret;
> 
> @@ -283,7 +271,7 @@ int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen)
>  		goto out;
>  	}
> 
> -	mr = kzalloc(offsetof(struct rds_mr, r_sg[nr_pages]), GFP_KERNEL);
> +	mr = kzalloc(sizeof(struct rds_mr), GFP_KERNEL);
>  	if (mr == NULL) {
>  		ret = -ENOMEM;
>  		goto out;
> @@ -311,12 +299,13 @@ int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen)
>  	if (ret < 0)
>  		goto out;
> 
> -	mr->r_nents = ret;
> -	for (i = 0 ; i < mr->r_nents; i++) {
> +	nents = ret;
> +	sg = kcalloc(nents, sizeof(*sg), GFP_KERNEL);
> +
> +	for (i = 0 ; i < nents; i++) {
>  		unsigned int offset = args.vec.addr & ~PAGE_MASK;
> 
> -		sg = &mr->r_sg[i];
> -		sg_set_page(sg, pages[i],
> +		sg_set_page(&sg[i], pages[i],
>  				min_t(unsigned int, args.vec.bytes, PAGE_SIZE - offset),
>  				offset);
> 
> @@ -324,14 +313,17 @@ int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen)
>  		args.vec.bytes -= sg->length;
>  	}
> 
> -	rdsdebug(KERN_WARNING "RDS: trans_private nents is %lu\n",
> -	       mr->r_nents);
> +	rdsdebug(KERN_WARNING "RDS: trans_private nents is %u\n", nents);
> 
> -	trans_private = rs->rs_transport->get_mr(mr->r_sg, mr->r_nents,
> +	/* Obtain a transport specific MR. If this succeeds, the
> +	 * s/g list is now owned by the MR */
> +	trans_private = rs->rs_transport->get_mr(sg, nents,
>  						 rs->rs_bound_addr,
>  						 &mr->r_key, &mr->r_phyaddr);
> 
>  	if (IS_ERR(trans_private)) {
> +		for (i = 0 ; i < nents; i++)
> +			put_page(sg_page(&sg[i]));
>  		ret = PTR_ERR(trans_private);
>  		goto out;
>  	}
> diff --git a/net/rds/rdma.h b/net/rds/rdma.h
> index 5baa0bd..ec5f037 100644
> --- a/net/rds/rdma.h
> +++ b/net/rds/rdma.h
> @@ -23,9 +23,6 @@ struct rds_mr {
>  	struct rds_sock *	r_sock;		/* back pointer to the socket that owns us */
>  	struct rds_transport	*r_trans;
>  	void			*r_trans_private;
> -	unsigned long		r_nents;
> -	unsigned long		r_count;
> -	struct scatterlist	r_sg[0];
>  };
> 
>  #define RDS_MR_DEAD		0





More information about the rds-devel mailing list