[rds-devel] [PATCH RFC] RDS: Don't use mutex in interrupt contexts.

Steve Wise swise at opengridcomputing.com
Wed Jan 14 08:12:29 PST 2009


From: Steve Wise <swise at opengridcomputing.com>

This patch is a request for comments.  While it resolves the issue
with grabbing the ring mutex in interrupt context, it perhaps will
affect performance because the ring can no longer be refilled in the
interrupt context.

Thoughts?  

Is my rds_xx_ring_low() function correct?

Possible extensions:

- make the low water threshold a syscfg variable...

Alternative Design:

- redesign the serialization in such a way that both the thread -and- 
  the interrupt context can refill, but one at GFP_KERNEL and one
  at GFP_ATOMIC.  This would require removing the mutex in place of
  a spinlock -and- not holding the spinlock with irqs disabled when
  refilling from the thread context (so you can really do GFP_KERNEL).


Steve

------

RDS: Don't use mutex in interrupt contexts.

Can't grab a mutex in the CQ completion handler since it runs in the
interrupt context.  Instead, we define a "low water" mark as 1/4 of the
recv ring size, and schedule the worker thread to refill the ring if it
drops below this threshold.

Signed-off-by: Steve Wise <swise at opengridcomputing.com>
---

 drivers/infiniband/ulp/rds/ib.h      |    1 +
 drivers/infiniband/ulp/rds/ib_recv.c |   30 ++++--------------------------
 drivers/infiniband/ulp/rds/ib_ring.c |    5 +++++
 drivers/infiniband/ulp/rds/iw.h      |    1 +
 drivers/infiniband/ulp/rds/iw_recv.c |   30 ++++--------------------------
 drivers/infiniband/ulp/rds/iw_ring.c |    6 ++++++
 6 files changed, 21 insertions(+), 52 deletions(-)

diff --git a/drivers/infiniband/ulp/rds/ib.h b/drivers/infiniband/ulp/rds/ib.h
index 81d39a3..f7ee4d6 100644
--- a/drivers/infiniband/ulp/rds/ib.h
+++ b/drivers/infiniband/ulp/rds/ib.h
@@ -301,6 +301,7 @@ u32 rds_ib_ring_alloc(struct rds_ib_work_ring *ring, u32 val, u32 *pos);
 void rds_ib_ring_free(struct rds_ib_work_ring *ring, u32 val);
 void rds_ib_ring_unalloc(struct rds_ib_work_ring *ring, u32 val);
 int rds_ib_ring_empty(struct rds_ib_work_ring *ring);
+int rds_ib_ring_low(struct rds_ib_work_ring *ring);
 u32 rds_ib_ring_oldest(struct rds_ib_work_ring *ring);
 u32 rds_ib_ring_completed(struct rds_ib_work_ring *ring, u32 wr_id, u32 oldest);
 extern wait_queue_head_t rds_ib_ring_empty_wait;
diff --git a/drivers/infiniband/ulp/rds/ib_recv.c b/drivers/infiniband/ulp/rds/ib_recv.c
index 19dd508..d738a4b 100644
--- a/drivers/infiniband/ulp/rds/ib_recv.c
+++ b/drivers/infiniband/ulp/rds/ib_recv.c
@@ -828,38 +828,16 @@ void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 	if (rds_conn_up(conn))
 		rds_ib_attempt_ack(ic);
 
-	/*
-	 * XXX atomic is bad as it drains reserve pools, we should really
-	 * do some non-blocking alloc that doesn't touch the pools but
-	 * will fail.  Then leave it to the thread to get to reclaim
-	 * and alloc.
-	 */
-
-	/*
-	 * If we fail to refill we assume it's a allocation failure
-	 * from our use of GFP_ATOMIC and we want the thread to try again
-	 * immediately.  Similarly, if the thread is already trying to
-	 * refill we want it to try again immediately as it may have missed
-	 * the ring entry we just completed before it released the
-	 * i_recv_mutex.
-	 */
 	/* If we ever end up with a really empty receive ring, we're
 	 * in deep trouble, as the sender will definitely see RNR
 	 * timeouts. */
 	if (rds_ib_ring_empty(&ic->i_recv_ring))
 		rds_ib_stats_inc(s_ib_rx_ring_empty);
 
-	if (mutex_trylock(&ic->i_recv_mutex)) {
-		if (rds_ib_recv_refill(conn, GFP_ATOMIC,
-					 GFP_ATOMIC | __GFP_HIGHMEM, 0))
-			ret = -EAGAIN;
-		else
-			rds_ib_stats_inc(s_ib_rx_refill_from_cq);
-		mutex_unlock(&ic->i_recv_mutex);
-	} else
-		ret = -EAGAIN;
-
-	if (ret)
+ 	/* 
+ 	 * If the ring is running low, then schedule the thread to refill.
+ 	 */
+ 	if (rds_ib_ring_low(&ic->i_recv_ring))
 		queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
 }
 
diff --git a/drivers/infiniband/ulp/rds/ib_ring.c b/drivers/infiniband/ulp/rds/ib_ring.c
index bba1932..d23cc59 100644
--- a/drivers/infiniband/ulp/rds/ib_ring.c
+++ b/drivers/infiniband/ulp/rds/ib_ring.c
@@ -135,6 +135,11 @@ int rds_ib_ring_empty(struct rds_ib_work_ring *ring)
 	return __rds_ib_ring_empty(ring);
 }
 
+int rds_ib_ring_low(struct rds_ib_work_ring *ring)
+{
+        return __rds_ib_ring_used(ring) <= (ring->w_nr >> 2);
+}
+
 /*
  * returns the oldest alloced ring entry.  This will be the next one
  * freed.  This can't be called if there are none allocated.
diff --git a/drivers/infiniband/ulp/rds/iw.h b/drivers/infiniband/ulp/rds/iw.h
index 66bef36..10eebe4 100644
--- a/drivers/infiniband/ulp/rds/iw.h
+++ b/drivers/infiniband/ulp/rds/iw.h
@@ -327,6 +327,7 @@ u32 rds_iw_ring_alloc(struct rds_iw_work_ring *ring, u32 val, u32 *pos);
 void rds_iw_ring_free(struct rds_iw_work_ring *ring, u32 val);
 void rds_iw_ring_unalloc(struct rds_iw_work_ring *ring, u32 val);
 int rds_iw_ring_empty(struct rds_iw_work_ring *ring);
+int rds_iw_ring_low(struct rds_iw_work_ring *ring);
 u32 rds_iw_ring_oldest(struct rds_iw_work_ring *ring);
 u32 rds_iw_ring_completed(struct rds_iw_work_ring *ring, u32 wr_id, u32 oldest);
 extern wait_queue_head_t rds_iw_ring_empty_wait;
diff --git a/drivers/infiniband/ulp/rds/iw_recv.c b/drivers/infiniband/ulp/rds/iw_recv.c
index 4553eb0..0368c68 100644
--- a/drivers/infiniband/ulp/rds/iw_recv.c
+++ b/drivers/infiniband/ulp/rds/iw_recv.c
@@ -828,38 +828,16 @@ void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 	if (rds_conn_up(conn))
 		rds_iw_attempt_ack(ic);
 
-	/*
-	 * XXX atomic is bad as it drains reserve pools, we should really
-	 * do some non-blocking alloc that doesn't touch the pools but
-	 * will fail.  Then leave it to the thread to get to reclaim
-	 * and alloc.
-	 */
-
-	/*
-	 * If we fail to refill we assume it's a allocation failure
-	 * from our use of GFP_ATOMIC and we want the thread to try again
-	 * immediately.  Similarly, if the thread is already trying to
-	 * refill we want it to try again immediately as it may have missed
-	 * the ring entry we just completed before it released the
-	 * i_recv_mutex.
-	 */
 	/* If we ever end up with a really empty receive ring, we're
 	 * in deep trouble, as the sender will definitely see RNR
 	 * timeouts. */
 	if (rds_iw_ring_empty(&ic->i_recv_ring))
 		rds_iw_stats_inc(s_iw_rx_ring_empty);
 
-	if (mutex_trylock(&ic->i_recv_mutex)) {
-		if (rds_iw_recv_refill(conn, GFP_ATOMIC,
-					 GFP_ATOMIC | __GFP_HIGHMEM, 0))
-			ret = -EAGAIN;
-		else
-			rds_iw_stats_inc(s_iw_rx_refill_from_cq);
-		mutex_unlock(&ic->i_recv_mutex);
-	} else
-		ret = -EAGAIN;
-
-	if (ret)
+ 	/* 
+ 	 * If the ring is running low, then schedule the thread to refill.
+ 	 */
+ 	if (rds_iw_ring_low(&ic->i_recv_ring))
 		queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
 }
 
diff --git a/drivers/infiniband/ulp/rds/iw_ring.c b/drivers/infiniband/ulp/rds/iw_ring.c
index 158e9b1..d422d4b 100644
--- a/drivers/infiniband/ulp/rds/iw_ring.c
+++ b/drivers/infiniband/ulp/rds/iw_ring.c
@@ -135,6 +135,12 @@ int rds_iw_ring_empty(struct rds_iw_work_ring *ring)
 	return __rds_iw_ring_empty(ring);
 }
 
+int rds_iw_ring_low(struct rds_iw_work_ring *ring)
+{
+	return __rds_iw_ring_used(ring) <= (ring->w_nr >> 2);
+}
+
+
 /*
  * returns the oldest alloced ring entry.  This will be the next one
  * freed.  This can't be called if there are none allocated.



More information about the rds-devel mailing list