[rds-devel] [ofa-general] RDS flow control
Richard Frank
richard.frank at oracle.com
Tue May 20 13:54:22 PDT 2008
Did you see RNR retry with out the flow control ?
Jon Mason wrote:
> On Mon, May 19, 2008 at 10:05:59AM +0200, Olaf Kirch wrote:
>
>>> However, I'm still seeing performance degradation of ~5% with some packet
>>> sizes. And that is *just* the overhead from exchanging the credit information
>>> and checking it - at some point we need to take a spinlock, and that seems
>>> to delay things just enough to make a dent in my throughput graph.
>>>
>> Here's an updated version of the flow control patch - which is now completely
>> lockless, and uses a single atomic_t to hold both credit counters. This has
>> given me back close to full performance in my testing (throughput seems to be
>> down less than 1%, which is almost within the noise range).
>>
>> I'll push it to my git tree a little later today, so folks can test it if
>> they like.
>>
>
> Works well on my setup.
>
> With proper flow control, there should no longer be a need for rnr_retry (as there
> should always be a posted recv buffer waiting for the incoming data). I did a
> quick test and removed it and everything seemed to be happy on my rds-stress run.
>
> Thanks for pulling this in.
> Jon
>
>
>> Olaf
>> --
>> Olaf Kirch | --- o --- Nous sommes du soleil we love when we play
>> okir at lst.de | / | \ sol.dhoop.naytheet.ah kin.ir.samse.qurax
>> ----
>> From: Olaf Kirch <olaf.kirch at oracle.com>
>> Subject: RDS: Implement IB flow control
>>
>> Here it is - flow control for RDS/IB.
>>
>> This patch is still very much experimental. Here's the essentials
>>
>> - The approach chosen here uses a credit-based flow control
>> mechanism. Every SEND WR (including ACKs) consumes one credit,
>> and if the sender runs out of credits, it stalls.
>>
>> - As new receive buffers are posted, credits are transferred to the
>> remote node (using yet another RDS header byte for this).
>>
>> - Flow control is negotiated during connection setup. Initial credits
>> are exchanged in the rds_ib_connect_private sruct - sending a value
>> of zero (which is also the default for older protocol versions)
>> means no flow control.
>>
>> - We avoid deadlock (both nodes depleting their credits, and being
>> unable to inform the peer of newly posted buffers) by requiring
>> that the last credit can only be used if we're posting new credits
>> to the peer.
>>
>> The approach implemented here is lock-free; preliminary tests show
>> the impact on throughput to be less than 1%, and the impact on RTT,
>> CPU, TX delay and other metrics to be below the noise threshold.
>>
>> Flow control is configurable via sysctl. It only affects newly created
>> connections however - so your best bet is to set this right after loading
>> the RDS module.
>>
>> Signed-off-by: Olaf Kirch <olaf.kirch at oracle.com>
>> ---
>> net/rds/ib.c | 1
>> net/rds/ib.h | 30 ++++++++
>> net/rds/ib_cm.c | 49 ++++++++++++-
>> net/rds/ib_recv.c | 48 +++++++++---
>> net/rds/ib_send.c | 194 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>> net/rds/ib_stats.c | 3
>> net/rds/ib_sysctl.c | 10 ++
>> net/rds/rds.h | 4 -
>> 8 files changed, 325 insertions(+), 14 deletions(-)
>>
>> Index: ofa_kernel-1.3/net/rds/ib.h
>> ===================================================================
>> --- ofa_kernel-1.3.orig/net/rds/ib.h
>> +++ ofa_kernel-1.3/net/rds/ib.h
>> @@ -46,6 +46,7 @@ struct rds_ib_connect_private {
>> __be16 dp_protocol_minor_mask; /* bitmask */
>> __be32 dp_reserved1;
>> __be64 dp_ack_seq;
>> + __be32 dp_credit; /* non-zero enables flow ctl */
>> };
>>
>> struct rds_ib_send_work {
>> @@ -110,15 +111,32 @@ struct rds_ib_connection {
>> struct ib_sge i_ack_sge;
>> u64 i_ack_dma;
>> unsigned long i_ack_queued;
>> +
>> + /* Flow control related information
>> + *
>> + * Our algorithm uses a pair variables that we need to access
>> + * atomically - one for the send credits, and one posted
>> + * recv credits we need to transfer to remote.
>> + * Rather than protect them using a slow spinlock, we put both into
>> + * a single atomic_t and update it using cmpxchg
>> + */
>> + atomic_t i_credits;
>>
>> /* Protocol version specific information */
>> unsigned int i_hdr_idx; /* 1 (old) or 0 (3.1 or later) */
>> + unsigned int i_flowctl : 1; /* enable/disable flow ctl */
>>
>> /* Batched completions */
>> unsigned int i_unsignaled_wrs;
>> long i_unsignaled_bytes;
>> };
>>
>> +/* This assumes that atomic_t is at least 32 bits */
>> +#define IB_GET_SEND_CREDITS(v) ((v) & 0xffff)
>> +#define IB_GET_POST_CREDITS(v) ((v) >> 16)
>> +#define IB_SET_SEND_CREDITS(v) ((v) & 0xffff)
>> +#define IB_SET_POST_CREDITS(v) ((v) << 16)
>> +
>> struct rds_ib_ipaddr {
>> struct list_head list;
>> __be32 ipaddr;
>> @@ -153,14 +171,17 @@ struct rds_ib_statistics {
>> unsigned long s_ib_tx_cq_call;
>> unsigned long s_ib_tx_cq_event;
>> unsigned long s_ib_tx_ring_full;
>> + unsigned long s_ib_tx_throttle;
>> unsigned long s_ib_tx_sg_mapping_failure;
>> unsigned long s_ib_tx_stalled;
>> + unsigned long s_ib_tx_credit_updates;
>> unsigned long s_ib_rx_cq_call;
>> unsigned long s_ib_rx_cq_event;
>> unsigned long s_ib_rx_ring_empty;
>> unsigned long s_ib_rx_refill_from_cq;
>> unsigned long s_ib_rx_refill_from_thread;
>> unsigned long s_ib_rx_alloc_limit;
>> + unsigned long s_ib_rx_credit_updates;
>> unsigned long s_ib_ack_sent;
>> unsigned long s_ib_ack_send_failure;
>> unsigned long s_ib_ack_send_delayed;
>> @@ -244,6 +265,8 @@ void rds_ib_flush_mrs(void);
>> int __init rds_ib_recv_init(void);
>> void rds_ib_recv_exit(void);
>> int rds_ib_recv(struct rds_connection *conn);
>> +int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
>> + gfp_t page_gfp, int prefill);
>> void rds_ib_inc_purge(struct rds_incoming *inc);
>> void rds_ib_inc_free(struct rds_incoming *inc);
>> int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
>> @@ -252,6 +275,7 @@ void rds_ib_recv_cq_comp_handler(struct
>> void rds_ib_recv_init_ring(struct rds_ib_connection *ic);
>> void rds_ib_recv_clear_ring(struct rds_ib_connection *ic);
>> void rds_ib_recv_init_ack(struct rds_ib_connection *ic);
>> +void rds_ib_attempt_ack(struct rds_ib_connection *ic);
>> void rds_ib_ack_send_complete(struct rds_ib_connection *ic);
>> u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic);
>>
>> @@ -266,12 +290,17 @@ u32 rds_ib_ring_completed(struct rds_ib_
>> extern wait_queue_head_t rds_ib_ring_empty_wait;
>>
>> /* ib_send.c */
>> +void rds_ib_xmit_complete(struct rds_connection *conn);
>> int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
>> unsigned int hdr_off, unsigned int sg, unsigned int off);
>> void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context);
>> void rds_ib_send_init_ring(struct rds_ib_connection *ic);
>> void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
>> int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
>> +void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
>> +void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
>> +int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
>> + u32 *adv_credits);
>>
>> /* ib_stats.c */
>> RDS_DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
>> @@ -287,6 +316,7 @@ extern unsigned long rds_ib_sysctl_max_r
>> extern unsigned long rds_ib_sysctl_max_unsig_wrs;
>> extern unsigned long rds_ib_sysctl_max_unsig_bytes;
>> extern unsigned long rds_ib_sysctl_max_recv_allocation;
>> +extern unsigned int rds_ib_sysctl_flow_control;
>> extern ctl_table rds_ib_sysctl_table[];
>>
>> /*
>> Index: ofa_kernel-1.3/net/rds/ib_cm.c
>> ===================================================================
>> --- ofa_kernel-1.3.orig/net/rds/ib_cm.c
>> +++ ofa_kernel-1.3/net/rds/ib_cm.c
>> @@ -55,6 +55,22 @@ static void rds_ib_set_protocol(struct r
>> }
>>
>> /*
>> + * Set up flow control
>> + */
>> +static void rds_ib_set_flow_control(struct rds_connection *conn, u32 credits)
>> +{
>> + struct rds_ib_connection *ic = conn->c_transport_data;
>> +
>> + if (rds_ib_sysctl_flow_control && credits != 0) {
>> + /* We're doing flow control */
>> + ic->i_flowctl = 1;
>> + rds_ib_send_add_credits(conn, credits);
>> + } else {
>> + ic->i_flowctl = 0;
>> + }
>> +}
>> +
>> +/*
>> * Connection established.
>> * We get here for both outgoing and incoming connection.
>> */
>> @@ -72,12 +88,16 @@ static void rds_ib_connect_complete(stru
>> rds_ib_set_protocol(conn,
>> RDS_PROTOCOL(dp->dp_protocol_major,
>> dp->dp_protocol_minor));
>> + rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
>> }
>>
>> - rdsdebug("RDS/IB: ib conn complete on %u.%u.%u.%u version %u.%u\n",
>> + printk(KERN_NOTICE "RDS/IB: connected to %u.%u.%u.%u version %u.%u%s\n",
>> NIPQUAD(conn->c_laddr),
>> RDS_PROTOCOL_MAJOR(conn->c_version),
>> - RDS_PROTOCOL_MINOR(conn->c_version));
>> + RDS_PROTOCOL_MINOR(conn->c_version),
>> + ic->i_flowctl? ", flow control" : "");
>> +
>> + rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
>>
>> /* Tune the RNR timeout. We use a rather low timeout, but
>> * not the absolute minimum - this should be tunable.
>> @@ -129,6 +149,24 @@ static void rds_ib_cm_fill_conn_param(st
>> dp->dp_protocol_minor_mask = cpu_to_be16(RDS_IB_SUPPORTED_PROTOCOLS);
>> dp->dp_ack_seq = rds_ib_piggyb_ack(ic);
>>
>> + /* Advertise flow control.
>> + *
>> + * Major chicken and egg alert!
>> + * We would like to post receive buffers before we get here (eg.
>> + * in rds_ib_setup_qp), so that we can give the peer an accurate
>> + * credit value.
>> + * Unfortunately we can't post receive buffers until we've finished
>> + * protocol negotiation, and know in which order data and payload
>> + * are arranged.
>> + *
>> + * What we do here is we give the peer a small initial credit, and
>> + * initialize the number of posted buffers to a negative value.
>> + */
>> + if (ic->i_flowctl) {
>> + atomic_set(&ic->i_credits, IB_SET_POST_CREDITS(-4));
>> + dp->dp_credit = cpu_to_be32(4);
>> + }
>> +
>> conn_param->private_data = dp;
>> conn_param->private_data_len = sizeof(*dp);
>> }
>> @@ -363,6 +401,7 @@ static int rds_ib_cm_handle_connect(stru
>> ic = conn->c_transport_data;
>>
>> rds_ib_set_protocol(conn, version);
>> + rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
>>
>> /* If the peer gave us the last packet it saw, process this as if
>> * we had received a regular ACK. */
>> @@ -428,6 +467,7 @@ out:
>> static int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id)
>> {
>> struct rds_connection *conn = cm_id->context;
>> + struct rds_ib_connection *ic = conn->c_transport_data;
>> struct rdma_conn_param conn_param;
>> struct rds_ib_connect_private dp;
>> int ret;
>> @@ -435,6 +475,7 @@ static int rds_ib_cm_initiate_connect(st
>> /* If the peer doesn't do protocol negotiation, we must
>> * default to RDSv3.0 */
>> rds_ib_set_protocol(conn, RDS_PROTOCOL_3_0);
>> + ic->i_flowctl = rds_ib_sysctl_flow_control; /* advertise flow control */
>>
>> ret = rds_ib_setup_qp(conn);
>> if (ret) {
>> @@ -688,6 +729,10 @@ void rds_ib_conn_shutdown(struct rds_con
>> #endif
>> ic->i_ack_recv = 0;
>>
>> + /* Clear flow control state */
>> + ic->i_flowctl = 0;
>> + atomic_set(&ic->i_credits, 0);
>> +
>> if (ic->i_ibinc) {
>> rds_inc_put(&ic->i_ibinc->ii_inc);
>> ic->i_ibinc = NULL;
>> Index: ofa_kernel-1.3/net/rds/ib_recv.c
>> ===================================================================
>> --- ofa_kernel-1.3.orig/net/rds/ib_recv.c
>> +++ ofa_kernel-1.3/net/rds/ib_recv.c
>> @@ -220,16 +220,17 @@ out:
>> * -1 is returned if posting fails due to temporary resource exhaustion.
>> */
>> int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
>> - gfp_t page_gfp)
>> + gfp_t page_gfp, int prefill)
>> {
>> struct rds_ib_connection *ic = conn->c_transport_data;
>> struct rds_ib_recv_work *recv;
>> struct ib_recv_wr *failed_wr;
>> + unsigned int posted = 0;
>> int ret = 0;
>> u32 pos;
>>
>> - while (rds_conn_up(conn) && rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
>> -
>> + while ((prefill || rds_conn_up(conn))
>> + && rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
>> if (pos >= ic->i_recv_ring.w_nr) {
>> printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
>> pos);
>> @@ -257,8 +258,14 @@ int rds_ib_recv_refill(struct rds_connec
>> ret = -1;
>> break;
>> }
>> +
>> + posted++;
>> }
>>
>> + /* We're doing flow control - update the window. */
>> + if (ic->i_flowctl && posted)
>> + rds_ib_advertise_credits(conn, posted);
>> +
>> if (ret)
>> rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
>> return ret;
>> @@ -436,7 +443,7 @@ static u64 rds_ib_get_ack(struct rds_ib_
>> #endif
>>
>>
>> -static void rds_ib_send_ack(struct rds_ib_connection *ic)
>> +static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credits)
>> {
>> struct rds_header *hdr = ic->i_ack;
>> struct ib_send_wr *failed_wr;
>> @@ -448,6 +455,7 @@ static void rds_ib_send_ack(struct rds_i
>> rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
>> rds_message_populate_header(hdr, 0, 0, 0);
>> hdr->h_ack = cpu_to_be64(seq);
>> + hdr->h_credit = adv_credits;
>> rds_message_make_checksum(hdr);
>> ic->i_ack_queued = jiffies;
>>
>> @@ -460,6 +468,8 @@ static void rds_ib_send_ack(struct rds_i
>> set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
>>
>> rds_ib_stats_inc(s_ib_ack_send_failure);
>> + /* Need to finesse this later. */
>> + BUG();
>> } else
>> rds_ib_stats_inc(s_ib_ack_sent);
>> }
>> @@ -502,15 +512,27 @@ static void rds_ib_send_ack(struct rds_i
>> * When we get here, we're called from the recv queue handler.
>> * Check whether we ought to transmit an ACK.
>> */
>> -static void rds_ib_attempt_ack(struct rds_ib_connection *ic)
>> +void rds_ib_attempt_ack(struct rds_ib_connection *ic)
>> {
>> + unsigned int adv_credits;
>> +
>> if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
>> return;
>> - if (!test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
>> - clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
>> - rds_ib_send_ack(ic);
>> - } else
>> +
>> + if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
>> rds_ib_stats_inc(s_ib_ack_send_delayed);
>> + return;
>> + }
>> +
>> + /* Can we get a send credit? */
>> + if (!rds_ib_send_grab_credits(ic, 1, &adv_credits)) {
>> + rds_ib_stats_inc(s_ib_tx_throttle);
>> + clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
>> + return;
>> + }
>> +
>> + clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
>> + rds_ib_send_ack(ic, adv_credits);
>> }
>>
>> /*
>> @@ -706,6 +728,10 @@ void rds_ib_process_recv(struct rds_conn
>> state->ack_recv = be64_to_cpu(ihdr->h_ack);
>> state->ack_recv_valid = 1;
>>
>> + /* Process the credits update if there was one */
>> + if (ihdr->h_credit)
>> + rds_ib_send_add_credits(conn, ihdr->h_credit);
>> +
>> if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) {
>> /* This is an ACK-only packet. The fact that it gets
>> * special treatment here is that historically, ACKs
>> @@ -877,7 +903,7 @@ void rds_ib_recv_cq_comp_handler(struct
>>
>> if (mutex_trylock(&ic->i_recv_mutex)) {
>> if (rds_ib_recv_refill(conn, GFP_ATOMIC,
>> - GFP_ATOMIC | __GFP_HIGHMEM))
>> + GFP_ATOMIC | __GFP_HIGHMEM, 0))
>> ret = -EAGAIN;
>> else
>> rds_ib_stats_inc(s_ib_rx_refill_from_cq);
>> @@ -901,7 +927,7 @@ int rds_ib_recv(struct rds_connection *c
>> * we're really low and we want the caller to back off for a bit.
>> */
>> mutex_lock(&ic->i_recv_mutex);
>> - if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER))
>> + if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0))
>> ret = -ENOMEM;
>> else
>> rds_ib_stats_inc(s_ib_rx_refill_from_thread);
>> Index: ofa_kernel-1.3/net/rds/ib.c
>> ===================================================================
>> --- ofa_kernel-1.3.orig/net/rds/ib.c
>> +++ ofa_kernel-1.3/net/rds/ib.c
>> @@ -187,6 +187,7 @@ static void rds_ib_exit(void)
>>
>> struct rds_transport rds_ib_transport = {
>> .laddr_check = rds_ib_laddr_check,
>> + .xmit_complete = rds_ib_xmit_complete,
>> .xmit = rds_ib_xmit,
>> .xmit_cong_map = NULL,
>> .xmit_rdma = rds_ib_xmit_rdma,
>> Index: ofa_kernel-1.3/net/rds/ib_send.c
>> ===================================================================
>> --- ofa_kernel-1.3.orig/net/rds/ib_send.c
>> +++ ofa_kernel-1.3/net/rds/ib_send.c
>> @@ -245,6 +245,144 @@ void rds_ib_send_cq_comp_handler(struct
>> }
>> }
>>
>> +/*
>> + * This is the main function for allocating credits when sending
>> + * messages.
>> + *
>> + * Conceptually, we have two counters:
>> + * - send credits: this tells us how many WRs we're allowed
>> + * to submit without overruning the reciever's queue. For
>> + * each SEND WR we post, we decrement this by one.
>> + *
>> + * - posted credits: this tells us how many WRs we recently
>> + * posted to the receive queue. This value is transferred
>> + * to the peer as a "credit update" in a RDS header field.
>> + * Every time we transmit credits to the peer, we subtract
>> + * the amount of transferred credits from this counter.
>> + *
>> + * It is essential that we avoid situations where both sides have
>> + * exhausted their send credits, and are unable to send new credits
>> + * to the peer. We achieve this by requiring that we send at least
>> + * one credit update to the peer before exhausting our credits.
>> + * When new credits arrive, we subtract one credit that is withheld
>> + * until we've posted new buffers and are ready to transmit these
>> + * credits (see rds_ib_send_add_credits below).
>> + *
>> + * The RDS send code is essentially single-threaded; rds_send_xmit
>> + * grabs c_send_sem to ensure exclusive access to the send ring.
>> + * However, the ACK sending code is independent and can race with
>> + * message SENDs.
>> + *
>> + * In the send path, we need to update the counters for send credits
>> + * and the counter of posted buffers atomically - when we use the
>> + * last available credit, we cannot allow another thread to race us
>> + * and grab the posted credits counter. Hence, we have to use a
>> + * spinlock to protect the credit counter, or use atomics.
>> + *
>> + * Spinlocks shared between the send and the receive path are bad,
>> + * because they create unnecessary delays. An early implementation
>> + * using a spinlock showed a 5% degradation in throughput at some
>> + * loads.
>> + *
>> + * This implementation avoids spinlocks completely, putting both
>> + * counters into a single atomic, and updating that atomic using
>> + * atomic_add (in the receive path, when receiving fresh credits),
>> + * and using atomic_cmpxchg when updating the two counters.
>> + */
>> +int rds_ib_send_grab_credits(struct rds_ib_connection *ic,
>> + u32 wanted, u32 *adv_credits)
>> +{
>> + unsigned int avail, posted, got = 0, advertise;
>> + long oldval, newval;
>> +
>> + *adv_credits = 0;
>> + if (!ic->i_flowctl)
>> + return wanted;
>> +
>> +try_again:
>> + advertise = 0;
>> + oldval = newval = atomic_read(&ic->i_credits);
>> + posted = IB_GET_POST_CREDITS(oldval);
>> + avail = IB_GET_SEND_CREDITS(oldval);
>> +
>> + rdsdebug("rds_ib_send_grab_credits(%u): credits=%u posted=%u\n",
>> + wanted, avail, posted);
>> +
>> + /* The last credit must be used to send a credit updated. */
>> + if (avail && !posted)
>> + avail--;
>> +
>> + if (avail < wanted) {
>> + struct rds_connection *conn = ic->i_cm_id->context;
>> +
>> + /* Oops, there aren't that many credits left! */
>> + set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
>> + got = avail;
>> + } else {
>> + /* Sometimes you get what you want, lalala. */
>> + got = wanted;
>> + }
>> + newval -= IB_SET_SEND_CREDITS(got);
>> +
>> + if (got && posted) {
>> + advertise = min_t(unsigned int, posted, RDS_MAX_ADV_CREDIT);
>> + newval -= IB_SET_POST_CREDITS(advertise);
>> + }
>> +
>> + /* Finally bill everything */
>> + if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
>> + goto try_again;
>> +
>> + *adv_credits = advertise;
>> + return got;
>> +}
>> +
>> +void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
>> +{
>> + struct rds_ib_connection *ic = conn->c_transport_data;
>> +
>> + if (credits == 0)
>> + return;
>> +
>> + rdsdebug("rds_ib_send_add_credits(%u): current=%u%s\n",
>> + credits,
>> + IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
>> + test_bit(RDS_LL_SEND_FULL, &conn->c_flags)? ", ll_send_full" : "");
>> +
>> + atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
>> + if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
>> + queue_delayed_work(rds_wq, &conn->c_send_w, 0);
>> +
>> + WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
>> +
>> + rds_ib_stats_inc(s_ib_rx_credit_updates);
>> +}
>> +
>> +void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
>> +{
>> + struct rds_ib_connection *ic = conn->c_transport_data;
>> +
>> + if (posted == 0)
>> + return;
>> +
>> + atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits);
>> +
>> + /* Decide whether to send an update to the peer now.
>> + * If we would send a credit update for every single buffer we
>> + * post, we would end up with an ACK storm (ACK arrives,
>> + * consumes buffer, we refill the ring, send ACK to remote
>> + * advertising the newly posted buffer... ad inf)
>> + *
>> + * Performance pretty much depends on how often we send
>> + * credit updates - too frequent updates mean lots of ACKs.
>> + * Too infrequent updates, and the peer will run out of
>> + * credits and has to throttle.
>> + * For the time being, 16 seems to be a good compromise.
>> + */
>> + if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
>> + set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
>> +}
>> +
>> static inline void
>> rds_ib_xmit_populate_wr(struct rds_ib_connection *ic,
>> struct rds_ib_send_work *send, unsigned int pos,
>> @@ -307,6 +445,8 @@ int rds_ib_xmit(struct rds_connection *c
>> u32 pos;
>> u32 i;
>> u32 work_alloc;
>> + u32 credit_alloc;
>> + u32 adv_credits = 0;
>> int send_flags = 0;
>> int sent;
>> int ret;
>> @@ -314,6 +454,7 @@ int rds_ib_xmit(struct rds_connection *c
>> BUG_ON(off % RDS_FRAG_SIZE);
>> BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
>>
>> + /* FIXME we may overallocate here */
>> if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
>> i = 1;
>> else
>> @@ -327,8 +468,29 @@ int rds_ib_xmit(struct rds_connection *c
>> goto out;
>> }
>>
>> + credit_alloc = work_alloc;
>> + if (ic->i_flowctl) {
>> + credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, &adv_credits);
>> + if (credit_alloc < work_alloc) {
>> + rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);
>> + work_alloc = credit_alloc;
>> + }
>> + if (work_alloc == 0) {
>> + rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
>> + rds_ib_stats_inc(s_ib_tx_throttle);
>> + ret = -ENOMEM;
>> + goto out;
>> + }
>> + }
>> +
>> /* map the message the first time we see it */
>> if (ic->i_rm == NULL) {
>> + /*
>> + printk(KERN_NOTICE "rds_ib_xmit prep msg dport=%u flags=0x%x len=%d\n",
>> + be16_to_cpu(rm->m_inc.i_hdr.h_dport),
>> + rm->m_inc.i_hdr.h_flags,
>> + be32_to_cpu(rm->m_inc.i_hdr.h_len));
>> + */
>> if (rm->m_nents) {
>> rm->m_count = ib_dma_map_sg(dev,
>> rm->m_sg, rm->m_nents, DMA_TO_DEVICE);
>> @@ -449,6 +611,24 @@ add_header:
>> * have been set up to point to the right header buffer. */
>> memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
>>
>> + if (0) {
>> + struct rds_header *hdr = &ic->i_send_hdrs[pos];
>> +
>> + printk(KERN_NOTICE "send WR dport=%u flags=0x%x len=%d\n",
>> + be16_to_cpu(hdr->h_dport),
>> + hdr->h_flags,
>> + be32_to_cpu(hdr->h_len));
>> + }
>> + if (adv_credits) {
>> + struct rds_header *hdr = &ic->i_send_hdrs[pos];
>> +
>> + /* add credit and redo the header checksum */
>> + hdr->h_credit = adv_credits;
>> + rds_message_make_checksum(hdr);
>> + adv_credits = 0;
>> + rds_ib_stats_inc(s_ib_tx_credit_updates);
>> + }
>> +
>> if (prev)
>> prev->s_wr.next = &send->s_wr;
>> prev = send;
>> @@ -472,6 +652,8 @@ add_header:
>> rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
>> work_alloc = i;
>> }
>> + if (ic->i_flowctl && i < credit_alloc)
>> + rds_ib_send_add_credits(conn, credit_alloc - i);
>>
>> /* XXX need to worry about failed_wr and partial sends. */
>> failed_wr = &first->s_wr;
>> @@ -487,11 +669,14 @@ add_header:
>> ic->i_rm = prev->s_rm;
>> prev->s_rm = NULL;
>> }
>> + /* Finesse this later */
>> + BUG();
>> goto out;
>> }
>>
>> ret = sent;
>> out:
>> + BUG_ON(adv_credits);
>> return ret;
>> }
>>
>> @@ -630,3 +815,12 @@ int rds_ib_xmit_rdma(struct rds_connecti
>> out:
>> return ret;
>> }
>> +
>> +void rds_ib_xmit_complete(struct rds_connection *conn)
>> +{
>> + struct rds_ib_connection *ic = conn->c_transport_data;
>> +
>> + /* We may have a pending ACK or window update we were unable
>> + * to send previously (due to flow control). Try again. */
>> + rds_ib_attempt_ack(ic);
>> +}
>> Index: ofa_kernel-1.3/net/rds/ib_stats.c
>> ===================================================================
>> --- ofa_kernel-1.3.orig/net/rds/ib_stats.c
>> +++ ofa_kernel-1.3/net/rds/ib_stats.c
>> @@ -46,14 +46,17 @@ static char *rds_ib_stat_names[] = {
>> "ib_tx_cq_call",
>> "ib_tx_cq_event",
>> "ib_tx_ring_full",
>> + "ib_tx_throttle",
>> "ib_tx_sg_mapping_failure",
>> "ib_tx_stalled",
>> + "ib_tx_credit_updates",
>> "ib_rx_cq_call",
>> "ib_rx_cq_event",
>> "ib_rx_ring_empty",
>> "ib_rx_refill_from_cq",
>> "ib_rx_refill_from_thread",
>> "ib_rx_alloc_limit",
>> + "ib_rx_credit_updates",
>> "ib_ack_sent",
>> "ib_ack_send_failure",
>> "ib_ack_send_delayed",
>> Index: ofa_kernel-1.3/net/rds/rds.h
>> ===================================================================
>> --- ofa_kernel-1.3.orig/net/rds/rds.h
>> +++ ofa_kernel-1.3/net/rds/rds.h
>> @@ -170,6 +170,7 @@ struct rds_connection {
>> #define RDS_FLAG_CONG_BITMAP 0x01
>> #define RDS_FLAG_ACK_REQUIRED 0x02
>> #define RDS_FLAG_RETRANSMITTED 0x04
>> +#define RDS_MAX_ADV_CREDIT 255
>>
>> /*
>> * Maximum space available for extension headers.
>> @@ -183,7 +184,8 @@ struct rds_header {
>> __be16 h_sport;
>> __be16 h_dport;
>> u8 h_flags;
>> - u8 h_padding[5];
>> + u8 h_credit;
>> + u8 h_padding[4];
>> __sum16 h_csum;
>>
>> u8 h_exthdr[RDS_HEADER_EXT_SPACE];
>> Index: ofa_kernel-1.3/net/rds/ib_sysctl.c
>> ===================================================================
>> --- ofa_kernel-1.3.orig/net/rds/ib_sysctl.c
>> +++ ofa_kernel-1.3/net/rds/ib_sysctl.c
>> @@ -53,6 +53,8 @@ unsigned long rds_ib_sysctl_max_unsig_by
>> static unsigned long rds_ib_sysctl_max_unsig_bytes_min = 1;
>> static unsigned long rds_ib_sysctl_max_unsig_bytes_max = ~0UL;
>>
>> +unsigned int rds_ib_sysctl_flow_control = 1;
>> +
>> ctl_table rds_ib_sysctl_table[] = {
>> {
>> .ctl_name = 1,
>> @@ -102,6 +104,14 @@ ctl_table rds_ib_sysctl_table[] = {
>> .mode = 0644,
>> .proc_handler = &proc_doulongvec_minmax,
>> },
>> + {
>> + .ctl_name = 6,
>> + .procname = "flow_control",
>> + .data = &rds_ib_sysctl_flow_control,
>> + .maxlen = sizeof(rds_ib_sysctl_flow_control),
>> + .mode = 0644,
>> + .proc_handler = &proc_dointvec,
>> + },
>> { .ctl_name = 0}
>> };
>>
>> _______________________________________________
>> general mailing list
>> general at lists.openfabrics.org
>> http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general
>>
>> To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general
>>
> _______________________________________________
> general mailing list
> general at lists.openfabrics.org
> http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general
>
> To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general
>
More information about the rds-devel
mailing list