[rds-devel] [PATCH RFC] RDS: Don't use mutex in interrupt contexts.
Richard Frank
richard.frank at oracle.com
Wed Jan 14 13:15:51 PST 2009
Prior to 1.4. for IB we want to refill as fast as possible as it depends
on IB flow control...
After 1.4 ...which now has flow control - it's probably not an issue..
on the other hand we have virtually no experience running with RDS in 1.4..
Andy Grover wrote:
> This looks good to me...
>
> Your low water mark is pretty high so assuming ring size is reasonable
> then it should be refilled properly by the thread in time. I looked
> through the history to see if there was a specific issue that needed
> refills from cq, but didn't see anything.
>
> I'll give Rick and other RDS veterans a few days to review, but I'm
> inclined to apply it.
>
> Thanks -- Regards -- Andy
>
> Steve Wise wrote:
>
>> From: Steve Wise <swise at opengridcomputing.com>
>>
>> This patch is a request for comments. While it resolves the issue
>> with grabbing the ring mutex in interrupt context, it perhaps will
>> affect performance because the ring can no longer be refilled in the
>> interrupt context.
>>
>> Thoughts?
>>
>> Is my rds_xx_ring_low() function correct?
>>
>> Possible extensions:
>>
>> - make the low water threshold a syscfg variable...
>>
>> Alternative Design:
>>
>> - redesign the serialization in such a way that both the thread -and-
>> the interrupt context can refill, but one at GFP_KERNEL and one
>> at GFP_ATOMIC. This would require removing the mutex in place of
>> a spinlock -and- not holding the spinlock with irqs disabled when
>> refilling from the thread context (so you can really do GFP_KERNEL).
>>
>>
>> Steve
>>
>> ------
>>
>> RDS: Don't use mutex in interrupt contexts.
>>
>> Can't grab a mutex in the CQ completion handler since it runs in the
>> interrupt context. Instead, we define a "low water" mark as 1/4 of the
>> recv ring size, and schedule the worker thread to refill the ring if it
>> drops below this threshold.
>>
>> Signed-off-by: Steve Wise <swise at opengridcomputing.com>
>> ---
>>
>> drivers/infiniband/ulp/rds/ib.h | 1 +
>> drivers/infiniband/ulp/rds/ib_recv.c | 30 ++++--------------------------
>> drivers/infiniband/ulp/rds/ib_ring.c | 5 +++++
>> drivers/infiniband/ulp/rds/iw.h | 1 +
>> drivers/infiniband/ulp/rds/iw_recv.c | 30 ++++--------------------------
>> drivers/infiniband/ulp/rds/iw_ring.c | 6 ++++++
>> 6 files changed, 21 insertions(+), 52 deletions(-)
>>
>> diff --git a/drivers/infiniband/ulp/rds/ib.h b/drivers/infiniband/ulp/rds/ib.h
>> index 81d39a3..f7ee4d6 100644
>> --- a/drivers/infiniband/ulp/rds/ib.h
>> +++ b/drivers/infiniband/ulp/rds/ib.h
>> @@ -301,6 +301,7 @@ u32 rds_ib_ring_alloc(struct rds_ib_work_ring *ring, u32 val, u32 *pos);
>> void rds_ib_ring_free(struct rds_ib_work_ring *ring, u32 val);
>> void rds_ib_ring_unalloc(struct rds_ib_work_ring *ring, u32 val);
>> int rds_ib_ring_empty(struct rds_ib_work_ring *ring);
>> +int rds_ib_ring_low(struct rds_ib_work_ring *ring);
>> u32 rds_ib_ring_oldest(struct rds_ib_work_ring *ring);
>> u32 rds_ib_ring_completed(struct rds_ib_work_ring *ring, u32 wr_id, u32 oldest);
>> extern wait_queue_head_t rds_ib_ring_empty_wait;
>> diff --git a/drivers/infiniband/ulp/rds/ib_recv.c b/drivers/infiniband/ulp/rds/ib_recv.c
>> index 19dd508..d738a4b 100644
>> --- a/drivers/infiniband/ulp/rds/ib_recv.c
>> +++ b/drivers/infiniband/ulp/rds/ib_recv.c
>> @@ -828,38 +828,16 @@ void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
>> if (rds_conn_up(conn))
>> rds_ib_attempt_ack(ic);
>>
>> - /*
>> - * XXX atomic is bad as it drains reserve pools, we should really
>> - * do some non-blocking alloc that doesn't touch the pools but
>> - * will fail. Then leave it to the thread to get to reclaim
>> - * and alloc.
>> - */
>> -
>> - /*
>> - * If we fail to refill we assume it's a allocation failure
>> - * from our use of GFP_ATOMIC and we want the thread to try again
>> - * immediately. Similarly, if the thread is already trying to
>> - * refill we want it to try again immediately as it may have missed
>> - * the ring entry we just completed before it released the
>> - * i_recv_mutex.
>> - */
>> /* If we ever end up with a really empty receive ring, we're
>> * in deep trouble, as the sender will definitely see RNR
>> * timeouts. */
>> if (rds_ib_ring_empty(&ic->i_recv_ring))
>> rds_ib_stats_inc(s_ib_rx_ring_empty);
>>
>> - if (mutex_trylock(&ic->i_recv_mutex)) {
>> - if (rds_ib_recv_refill(conn, GFP_ATOMIC,
>> - GFP_ATOMIC | __GFP_HIGHMEM, 0))
>> - ret = -EAGAIN;
>> - else
>> - rds_ib_stats_inc(s_ib_rx_refill_from_cq);
>> - mutex_unlock(&ic->i_recv_mutex);
>> - } else
>> - ret = -EAGAIN;
>> -
>> - if (ret)
>> + /*
>> + * If the ring is running low, then schedule the thread to refill.
>> + */
>> + if (rds_ib_ring_low(&ic->i_recv_ring))
>> queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
>> }
>>
>> diff --git a/drivers/infiniband/ulp/rds/ib_ring.c b/drivers/infiniband/ulp/rds/ib_ring.c
>> index bba1932..d23cc59 100644
>> --- a/drivers/infiniband/ulp/rds/ib_ring.c
>> +++ b/drivers/infiniband/ulp/rds/ib_ring.c
>> @@ -135,6 +135,11 @@ int rds_ib_ring_empty(struct rds_ib_work_ring *ring)
>> return __rds_ib_ring_empty(ring);
>> }
>>
>> +int rds_ib_ring_low(struct rds_ib_work_ring *ring)
>> +{
>> + return __rds_ib_ring_used(ring) <= (ring->w_nr >> 2);
>> +}
>> +
>> /*
>> * returns the oldest alloced ring entry. This will be the next one
>> * freed. This can't be called if there are none allocated.
>> diff --git a/drivers/infiniband/ulp/rds/iw.h b/drivers/infiniband/ulp/rds/iw.h
>> index 66bef36..10eebe4 100644
>> --- a/drivers/infiniband/ulp/rds/iw.h
>> +++ b/drivers/infiniband/ulp/rds/iw.h
>> @@ -327,6 +327,7 @@ u32 rds_iw_ring_alloc(struct rds_iw_work_ring *ring, u32 val, u32 *pos);
>> void rds_iw_ring_free(struct rds_iw_work_ring *ring, u32 val);
>> void rds_iw_ring_unalloc(struct rds_iw_work_ring *ring, u32 val);
>> int rds_iw_ring_empty(struct rds_iw_work_ring *ring);
>> +int rds_iw_ring_low(struct rds_iw_work_ring *ring);
>> u32 rds_iw_ring_oldest(struct rds_iw_work_ring *ring);
>> u32 rds_iw_ring_completed(struct rds_iw_work_ring *ring, u32 wr_id, u32 oldest);
>> extern wait_queue_head_t rds_iw_ring_empty_wait;
>> diff --git a/drivers/infiniband/ulp/rds/iw_recv.c b/drivers/infiniband/ulp/rds/iw_recv.c
>> index 4553eb0..0368c68 100644
>> --- a/drivers/infiniband/ulp/rds/iw_recv.c
>> +++ b/drivers/infiniband/ulp/rds/iw_recv.c
>> @@ -828,38 +828,16 @@ void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context)
>> if (rds_conn_up(conn))
>> rds_iw_attempt_ack(ic);
>>
>> - /*
>> - * XXX atomic is bad as it drains reserve pools, we should really
>> - * do some non-blocking alloc that doesn't touch the pools but
>> - * will fail. Then leave it to the thread to get to reclaim
>> - * and alloc.
>> - */
>> -
>> - /*
>> - * If we fail to refill we assume it's a allocation failure
>> - * from our use of GFP_ATOMIC and we want the thread to try again
>> - * immediately. Similarly, if the thread is already trying to
>> - * refill we want it to try again immediately as it may have missed
>> - * the ring entry we just completed before it released the
>> - * i_recv_mutex.
>> - */
>> /* If we ever end up with a really empty receive ring, we're
>> * in deep trouble, as the sender will definitely see RNR
>> * timeouts. */
>> if (rds_iw_ring_empty(&ic->i_recv_ring))
>> rds_iw_stats_inc(s_iw_rx_ring_empty);
>>
>> - if (mutex_trylock(&ic->i_recv_mutex)) {
>> - if (rds_iw_recv_refill(conn, GFP_ATOMIC,
>> - GFP_ATOMIC | __GFP_HIGHMEM, 0))
>> - ret = -EAGAIN;
>> - else
>> - rds_iw_stats_inc(s_iw_rx_refill_from_cq);
>> - mutex_unlock(&ic->i_recv_mutex);
>> - } else
>> - ret = -EAGAIN;
>> -
>> - if (ret)
>> + /*
>> + * If the ring is running low, then schedule the thread to refill.
>> + */
>> + if (rds_iw_ring_low(&ic->i_recv_ring))
>> queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
>> }
>>
>> diff --git a/drivers/infiniband/ulp/rds/iw_ring.c b/drivers/infiniband/ulp/rds/iw_ring.c
>> index 158e9b1..d422d4b 100644
>> --- a/drivers/infiniband/ulp/rds/iw_ring.c
>> +++ b/drivers/infiniband/ulp/rds/iw_ring.c
>> @@ -135,6 +135,12 @@ int rds_iw_ring_empty(struct rds_iw_work_ring *ring)
>> return __rds_iw_ring_empty(ring);
>> }
>>
>> +int rds_iw_ring_low(struct rds_iw_work_ring *ring)
>> +{
>> + return __rds_iw_ring_used(ring) <= (ring->w_nr >> 2);
>> +}
>> +
>> +
>> /*
>> * returns the oldest alloced ring entry. This will be the next one
>> * freed. This can't be called if there are none allocated.
>>
>
>
> _______________________________________________
> rds-devel mailing list
> rds-devel at oss.oracle.com
> http://oss.oracle.com/mailman/listinfo/rds-devel
>
More information about the rds-devel
mailing list