[rds-devel] [ofa-general] RDS flow control

Jon Mason jon at opengridcomputing.com
Tue May 20 16:43:27 PDT 2008


On Tue, May 20, 2008 at 04:54:22PM -0400, Richard Frank wrote:
> Did you see RNR retry with out the flow control ?

No, I did not try this.  I removed it based on the fact that it SHOULD no longer be
necessary.  Any disconnections would expose a problem in the rds credit schema, which
is a good thing.  Of course, I'll leave it to Olaf to decide whether removing that code
is what he wants (or maybe make it a module param and let the end user decide).

If you like, I can give rnr_retry = 0 without the flow control a try but I fully expect
it to pass. 

Thanks,
Jon

>
> Jon Mason wrote:
>> On Mon, May 19, 2008 at 10:05:59AM +0200, Olaf Kirch wrote:
>>   
>>>> However, I'm still seeing performance degradation of ~5% with some packet
>>>> sizes. And that is *just* the overhead from exchanging the credit information
>>>> and checking it - at some point we need to take a spinlock, and that seems
>>>> to delay things just enough to make a dent in my throughput graph.
>>>>       
>>> Here's an updated version of the flow control patch - which is now completely
>>> lockless, and uses a single atomic_t to hold both credit counters. This has
>>> given me back close to full performance in my testing (throughput seems to be
>>> down less than 1%, which is almost within the noise range).
>>>
>>> I'll push it to my git tree a little later today, so folks can test it if
>>> they like.
>>>     
>>
>> Works well on my setup.
>>
>> With proper flow control, there should no longer be a need for rnr_retry (as there
>> should always be a posted recv buffer waiting for the incoming data).  I did a
>> quick test and removed it and everything seemed to be happy on my rds-stress run.
>>
>> Thanks for pulling this in.
>> Jon
>>
>>   
>>> Olaf
>>> -- 
>>> Olaf Kirch  |  --- o --- Nous sommes du soleil we love when we play
>>> okir at lst.de |    / | \   sol.dhoop.naytheet.ah kin.ir.samse.qurax
>>> ----
>>> From: Olaf Kirch <olaf.kirch at oracle.com>
>>> Subject: RDS: Implement IB flow control
>>>
>>> Here it is - flow control for RDS/IB.
>>>
>>> This patch is still very much experimental. Here's the essentials
>>>
>>>  -	The approach chosen here uses a credit-based flow control
>>> 	mechanism. Every SEND WR (including ACKs) consumes one credit,
>>> 	and if the sender runs out of credits, it stalls.
>>>
>>>  -	As new receive buffers are posted, credits are transferred to the
>>> 	remote node (using yet another RDS header byte for this).
>>>
>>>  -	Flow control is negotiated during connection setup. Initial credits
>>>  	are exchanged in the rds_ib_connect_private sruct - sending a value
>>> 	of zero (which is also the default for older protocol versions)
>>> 	means no flow control.
>>>
>>>  -	We avoid deadlock (both nodes depleting their credits, and being
>>>  	unable to inform the peer of newly posted buffers) by requiring
>>> 	that the last credit can only be used if we're posting new credits
>>> 	to the peer.
>>>
>>> The approach implemented here is lock-free; preliminary tests show
>>> the impact on throughput to be less than 1%, and the impact on RTT,
>>> CPU, TX delay and other metrics to be below the noise threshold.
>>>
>>> Flow control is configurable via sysctl. It only affects newly created
>>> connections however - so your best bet is to set this right after loading
>>> the RDS module.
>>>
>>> Signed-off-by: Olaf Kirch <olaf.kirch at oracle.com>
>>> ---
>>>  net/rds/ib.c        |    1  net/rds/ib.h        |   30 ++++++++
>>>  net/rds/ib_cm.c     |   49 ++++++++++++-
>>>  net/rds/ib_recv.c   |   48 +++++++++---
>>>  net/rds/ib_send.c   |  194 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>>>  net/rds/ib_stats.c  |    3  net/rds/ib_sysctl.c |   10 ++
>>>  net/rds/rds.h       |    4 -
>>>  8 files changed, 325 insertions(+), 14 deletions(-)
>>>
>>> Index: ofa_kernel-1.3/net/rds/ib.h
>>> ===================================================================
>>> --- ofa_kernel-1.3.orig/net/rds/ib.h
>>> +++ ofa_kernel-1.3/net/rds/ib.h
>>> @@ -46,6 +46,7 @@ struct rds_ib_connect_private {
>>>  	__be16			dp_protocol_minor_mask; /* bitmask */
>>>  	__be32			dp_reserved1;
>>>  	__be64			dp_ack_seq;
>>> +	__be32			dp_credit;		/* non-zero enables flow ctl */
>>>  };
>>>   struct rds_ib_send_work {
>>> @@ -110,15 +111,32 @@ struct rds_ib_connection {
>>>  	struct ib_sge		i_ack_sge;
>>>  	u64			i_ack_dma;
>>>  	unsigned long		i_ack_queued;
>>> +
>>> +	/* Flow control related information
>>> +	 *
>>> +	 * Our algorithm uses a pair variables that we need to access
>>> +	 * atomically - one for the send credits, and one posted
>>> +	 * recv credits we need to transfer to remote.
>>> +	 * Rather than protect them using a slow spinlock, we put both into
>>> +	 * a single atomic_t and update it using cmpxchg
>>> +	 */
>>> +	atomic_t		i_credits;
>>>    	/* Protocol version specific information */
>>>  	unsigned int		i_hdr_idx;	/* 1 (old) or 0 (3.1 or later) */
>>> +	unsigned int		i_flowctl : 1;	/* enable/disable flow ctl */
>>>   	/* Batched completions */
>>>  	unsigned int		i_unsignaled_wrs;
>>>  	long			i_unsignaled_bytes;
>>>  };
>>>  +/* This assumes that atomic_t is at least 32 bits */
>>> +#define IB_GET_SEND_CREDITS(v)	((v) & 0xffff)
>>> +#define IB_GET_POST_CREDITS(v)	((v) >> 16)
>>> +#define IB_SET_SEND_CREDITS(v)	((v) & 0xffff)
>>> +#define IB_SET_POST_CREDITS(v)	((v) << 16)
>>> +
>>>  struct rds_ib_ipaddr {
>>>  	struct list_head	list;
>>>  	__be32			ipaddr;
>>> @@ -153,14 +171,17 @@ struct rds_ib_statistics {
>>>  	unsigned long	s_ib_tx_cq_call;
>>>  	unsigned long	s_ib_tx_cq_event;
>>>  	unsigned long	s_ib_tx_ring_full;
>>> +	unsigned long	s_ib_tx_throttle;
>>>  	unsigned long	s_ib_tx_sg_mapping_failure;
>>>  	unsigned long	s_ib_tx_stalled;
>>> +	unsigned long	s_ib_tx_credit_updates;
>>>  	unsigned long	s_ib_rx_cq_call;
>>>  	unsigned long	s_ib_rx_cq_event;
>>>  	unsigned long	s_ib_rx_ring_empty;
>>>  	unsigned long	s_ib_rx_refill_from_cq;
>>>  	unsigned long	s_ib_rx_refill_from_thread;
>>>  	unsigned long	s_ib_rx_alloc_limit;
>>> +	unsigned long	s_ib_rx_credit_updates;
>>>  	unsigned long	s_ib_ack_sent;
>>>  	unsigned long	s_ib_ack_send_failure;
>>>  	unsigned long	s_ib_ack_send_delayed;
>>> @@ -244,6 +265,8 @@ void rds_ib_flush_mrs(void);
>>>  int __init rds_ib_recv_init(void);
>>>  void rds_ib_recv_exit(void);
>>>  int rds_ib_recv(struct rds_connection *conn);
>>> +int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
>>> +		       gfp_t page_gfp, int prefill);
>>>  void rds_ib_inc_purge(struct rds_incoming *inc);
>>>  void rds_ib_inc_free(struct rds_incoming *inc);
>>>  int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
>>> @@ -252,6 +275,7 @@ void rds_ib_recv_cq_comp_handler(struct  void 
>>> rds_ib_recv_init_ring(struct rds_ib_connection *ic);
>>>  void rds_ib_recv_clear_ring(struct rds_ib_connection *ic);
>>>  void rds_ib_recv_init_ack(struct rds_ib_connection *ic);
>>> +void rds_ib_attempt_ack(struct rds_ib_connection *ic);
>>>  void rds_ib_ack_send_complete(struct rds_ib_connection *ic);
>>>  u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic);
>>>  @@ -266,12 +290,17 @@ u32 rds_ib_ring_completed(struct rds_ib_
>>>  extern wait_queue_head_t rds_ib_ring_empty_wait;
>>>   /* ib_send.c */
>>> +void rds_ib_xmit_complete(struct rds_connection *conn);
>>>  int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
>>>  	        unsigned int hdr_off, unsigned int sg, unsigned int off);
>>>  void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context);
>>>  void rds_ib_send_init_ring(struct rds_ib_connection *ic);
>>>  void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
>>>  int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
>>> +void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
>>> +void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
>>> +int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
>>> +			     u32 *adv_credits);
>>>   /* ib_stats.c */
>>>  RDS_DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
>>> @@ -287,6 +316,7 @@ extern unsigned long rds_ib_sysctl_max_r
>>>  extern unsigned long rds_ib_sysctl_max_unsig_wrs;
>>>  extern unsigned long rds_ib_sysctl_max_unsig_bytes;
>>>  extern unsigned long rds_ib_sysctl_max_recv_allocation;
>>> +extern unsigned int rds_ib_sysctl_flow_control;
>>>  extern ctl_table rds_ib_sysctl_table[];
>>>   /*
>>> Index: ofa_kernel-1.3/net/rds/ib_cm.c
>>> ===================================================================
>>> --- ofa_kernel-1.3.orig/net/rds/ib_cm.c
>>> +++ ofa_kernel-1.3/net/rds/ib_cm.c
>>> @@ -55,6 +55,22 @@ static void rds_ib_set_protocol(struct r
>>>  }
>>>   /*
>>> + * Set up flow control
>>> + */
>>> +static void rds_ib_set_flow_control(struct rds_connection *conn, u32 credits)
>>> +{
>>> +	struct rds_ib_connection *ic = conn->c_transport_data;
>>> +
>>> +	if (rds_ib_sysctl_flow_control && credits != 0) {
>>> +		/* We're doing flow control */
>>> +		ic->i_flowctl = 1;
>>> +		rds_ib_send_add_credits(conn, credits);
>>> +	} else {
>>> +		ic->i_flowctl = 0;
>>> +	}
>>> +}
>>> +
>>> +/*
>>>   * Connection established.
>>>   * We get here for both outgoing and incoming connection.
>>>   */
>>> @@ -72,12 +88,16 @@ static void rds_ib_connect_complete(stru
>>>  		rds_ib_set_protocol(conn,
>>>  				RDS_PROTOCOL(dp->dp_protocol_major,
>>>  					dp->dp_protocol_minor));
>>> +		rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
>>>  	}
>>>  -	rdsdebug("RDS/IB: ib conn complete on %u.%u.%u.%u version 
>>> %u.%u\n",
>>> +	printk(KERN_NOTICE "RDS/IB: connected to %u.%u.%u.%u version %u.%u%s\n",
>>>  			NIPQUAD(conn->c_laddr),
>>>  			RDS_PROTOCOL_MAJOR(conn->c_version),
>>> -			RDS_PROTOCOL_MINOR(conn->c_version));
>>> +			RDS_PROTOCOL_MINOR(conn->c_version),
>>> +			ic->i_flowctl? ", flow control" : "");
>>> +
>>> +	rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
>>>   	/* Tune the RNR timeout. We use a rather low timeout, but
>>>  	 * not the absolute minimum - this should be tunable.
>>> @@ -129,6 +149,24 @@ static void rds_ib_cm_fill_conn_param(st
>>>  		dp->dp_protocol_minor_mask = cpu_to_be16(RDS_IB_SUPPORTED_PROTOCOLS);
>>>  		dp->dp_ack_seq = rds_ib_piggyb_ack(ic);
>>>  +		/* Advertise flow control.
>>> +		 *
>>> +		 * Major chicken and egg alert!
>>> +		 * We would like to post receive buffers before we get here (eg.
>>> +		 * in rds_ib_setup_qp), so that we can give the peer an accurate
>>> +		 * credit value.
>>> +		 * Unfortunately we can't post receive buffers until we've finished
>>> +		 * protocol negotiation, and know in which order data and payload
>>> +		 * are arranged.
>>> +		 *
>>> +		 * What we do here is we give the peer a small initial credit, and
>>> +		 * initialize the number of posted buffers to a negative value.
>>> +		 */
>>> +		if (ic->i_flowctl) {
>>> +			atomic_set(&ic->i_credits, IB_SET_POST_CREDITS(-4));
>>> +			dp->dp_credit = cpu_to_be32(4);
>>> +		}
>>> +
>>>  		conn_param->private_data = dp;
>>>  		conn_param->private_data_len = sizeof(*dp);
>>>  	}
>>> @@ -363,6 +401,7 @@ static int rds_ib_cm_handle_connect(stru
>>>  	ic = conn->c_transport_data;
>>>   	rds_ib_set_protocol(conn, version);
>>> +	rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
>>>   	/* If the peer gave us the last packet it saw, process this as if
>>>  	 * we had received a regular ACK. */
>>> @@ -428,6 +467,7 @@ out:
>>>  static int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id)
>>>  {
>>>  	struct rds_connection *conn = cm_id->context;
>>> +	struct rds_ib_connection *ic = conn->c_transport_data;
>>>  	struct rdma_conn_param conn_param;
>>>  	struct rds_ib_connect_private dp;
>>>  	int ret;
>>> @@ -435,6 +475,7 @@ static int rds_ib_cm_initiate_connect(st
>>>  	/* If the peer doesn't do protocol negotiation, we must
>>>  	 * default to RDSv3.0 */
>>>  	rds_ib_set_protocol(conn, RDS_PROTOCOL_3_0);
>>> +	ic->i_flowctl = rds_ib_sysctl_flow_control;	/* advertise flow control */
>>>   	ret = rds_ib_setup_qp(conn);
>>>  	if (ret) {
>>> @@ -688,6 +729,10 @@ void rds_ib_conn_shutdown(struct rds_con
>>>  #endif
>>>  	ic->i_ack_recv = 0;
>>>  +	/* Clear flow control state */
>>> +	ic->i_flowctl = 0;
>>> +	atomic_set(&ic->i_credits, 0);
>>> +
>>>  	if (ic->i_ibinc) {
>>>  		rds_inc_put(&ic->i_ibinc->ii_inc);
>>>  		ic->i_ibinc = NULL;
>>> Index: ofa_kernel-1.3/net/rds/ib_recv.c
>>> ===================================================================
>>> --- ofa_kernel-1.3.orig/net/rds/ib_recv.c
>>> +++ ofa_kernel-1.3/net/rds/ib_recv.c
>>> @@ -220,16 +220,17 @@ out:
>>>   * -1 is returned if posting fails due to temporary resource exhaustion.
>>>   */
>>>  int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
>>> -		       gfp_t page_gfp)
>>> +		       gfp_t page_gfp, int prefill)
>>>  {
>>>  	struct rds_ib_connection *ic = conn->c_transport_data;
>>>  	struct rds_ib_recv_work *recv;
>>>  	struct ib_recv_wr *failed_wr;
>>> +	unsigned int posted = 0;
>>>  	int ret = 0;
>>>  	u32 pos;
>>>  -	while (rds_conn_up(conn) && rds_ib_ring_alloc(&ic->i_recv_ring, 1, 
>>> &pos)) {
>>> -
>>> +	while ((prefill || rds_conn_up(conn))
>>> +			&& rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
>>>  		if (pos >= ic->i_recv_ring.w_nr) {
>>>  			printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
>>>  					pos);
>>> @@ -257,8 +258,14 @@ int rds_ib_recv_refill(struct rds_connec
>>>  			ret = -1;
>>>  			break;
>>>  		}
>>> +
>>> +		posted++;
>>>  	}
>>>  +	/* We're doing flow control - update the window. */
>>> +	if (ic->i_flowctl && posted)
>>> +		rds_ib_advertise_credits(conn, posted);
>>> +
>>>  	if (ret)
>>>  		rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
>>>  	return ret;
>>> @@ -436,7 +443,7 @@ static u64 rds_ib_get_ack(struct rds_ib_
>>>  #endif
>>>   -static void rds_ib_send_ack(struct rds_ib_connection *ic)
>>> +static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credits)
>>>  {
>>>  	struct rds_header *hdr = ic->i_ack;
>>>  	struct ib_send_wr *failed_wr;
>>> @@ -448,6 +455,7 @@ static void rds_ib_send_ack(struct rds_i
>>>  	rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
>>>  	rds_message_populate_header(hdr, 0, 0, 0);
>>>  	hdr->h_ack = cpu_to_be64(seq);
>>> +	hdr->h_credit = adv_credits;
>>>  	rds_message_make_checksum(hdr);
>>>  	ic->i_ack_queued = jiffies;
>>>  @@ -460,6 +468,8 @@ static void rds_ib_send_ack(struct rds_i
>>>  		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
>>>    		rds_ib_stats_inc(s_ib_ack_send_failure);
>>> +		/* Need to finesse this later. */
>>> +		BUG();
>>>  	} else
>>>  		rds_ib_stats_inc(s_ib_ack_sent);
>>>  }
>>> @@ -502,15 +512,27 @@ static void rds_ib_send_ack(struct rds_i
>>>   * When we get here, we're called from the recv queue handler.
>>>   * Check whether we ought to transmit an ACK.
>>>   */
>>> -static void rds_ib_attempt_ack(struct rds_ib_connection *ic)
>>> +void rds_ib_attempt_ack(struct rds_ib_connection *ic)
>>>  {
>>> +	unsigned int adv_credits;
>>> +
>>>  	if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
>>>  		return;
>>> -	if (!test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
>>> -		clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
>>> -		rds_ib_send_ack(ic);
>>> -	} else
>>> +
>>> +	if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
>>>  		rds_ib_stats_inc(s_ib_ack_send_delayed);
>>> +		return;
>>> +	}
>>> +
>>> +	/* Can we get a send credit? */
>>> +	if (!rds_ib_send_grab_credits(ic, 1, &adv_credits)) {
>>> +		rds_ib_stats_inc(s_ib_tx_throttle);
>>> +		clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
>>> +		return;
>>> +	}
>>> +
>>> +	clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
>>> +	rds_ib_send_ack(ic, adv_credits);
>>>  }
>>>   /*
>>> @@ -706,6 +728,10 @@ void rds_ib_process_recv(struct rds_conn
>>>  	state->ack_recv = be64_to_cpu(ihdr->h_ack);
>>>  	state->ack_recv_valid = 1;
>>>  +	/* Process the credits update if there was one */
>>> +	if (ihdr->h_credit)
>>> +		rds_ib_send_add_credits(conn, ihdr->h_credit);
>>> +
>>>  	if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) {
>>>  		/* This is an ACK-only packet. The fact that it gets
>>>  		 * special treatment here is that historically, ACKs
>>> @@ -877,7 +903,7 @@ void rds_ib_recv_cq_comp_handler(struct   	if 
>>> (mutex_trylock(&ic->i_recv_mutex)) {
>>>  		if (rds_ib_recv_refill(conn, GFP_ATOMIC,
>>> -					 GFP_ATOMIC | __GFP_HIGHMEM))
>>> +					 GFP_ATOMIC | __GFP_HIGHMEM, 0))
>>>  			ret = -EAGAIN;
>>>  		else
>>>  			rds_ib_stats_inc(s_ib_rx_refill_from_cq);
>>> @@ -901,7 +927,7 @@ int rds_ib_recv(struct rds_connection *c
>>>  	 * we're really low and we want the caller to back off for a bit.
>>>  	 */
>>>  	mutex_lock(&ic->i_recv_mutex);
>>> -	if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER))
>>> +	if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0))
>>>  		ret = -ENOMEM;
>>>  	else
>>>  		rds_ib_stats_inc(s_ib_rx_refill_from_thread);
>>> Index: ofa_kernel-1.3/net/rds/ib.c
>>> ===================================================================
>>> --- ofa_kernel-1.3.orig/net/rds/ib.c
>>> +++ ofa_kernel-1.3/net/rds/ib.c
>>> @@ -187,6 +187,7 @@ static void rds_ib_exit(void)
>>>   struct rds_transport rds_ib_transport = {
>>>  	.laddr_check		= rds_ib_laddr_check,
>>> +	.xmit_complete		= rds_ib_xmit_complete,
>>>  	.xmit			= rds_ib_xmit,
>>>  	.xmit_cong_map		= NULL,
>>>  	.xmit_rdma		= rds_ib_xmit_rdma,
>>> Index: ofa_kernel-1.3/net/rds/ib_send.c
>>> ===================================================================
>>> --- ofa_kernel-1.3.orig/net/rds/ib_send.c
>>> +++ ofa_kernel-1.3/net/rds/ib_send.c
>>> @@ -245,6 +245,144 @@ void rds_ib_send_cq_comp_handler(struct  	}
>>>  }
>>>  +/*
>>> + * This is the main function for allocating credits when sending
>>> + * messages.
>>> + *
>>> + * Conceptually, we have two counters:
>>> + *  -	send credits: this tells us how many WRs we're allowed
>>> + *	to submit without overruning the reciever's queue. For
>>> + *	each SEND WR we post, we decrement this by one.
>>> + *
>>> + *  -	posted credits: this tells us how many WRs we recently
>>> + *	posted to the receive queue. This value is transferred
>>> + *	to the peer as a "credit update" in a RDS header field.
>>> + *	Every time we transmit credits to the peer, we subtract
>>> + *	the amount of transferred credits from this counter.
>>> + *
>>> + * It is essential that we avoid situations where both sides have
>>> + * exhausted their send credits, and are unable to send new credits
>>> + * to the peer. We achieve this by requiring that we send at least
>>> + * one credit update to the peer before exhausting our credits.
>>> + * When new credits arrive, we subtract one credit that is withheld
>>> + * until we've posted new buffers and are ready to transmit these
>>> + * credits (see rds_ib_send_add_credits below).
>>> + *
>>> + * The RDS send code is essentially single-threaded; rds_send_xmit
>>> + * grabs c_send_sem to ensure exclusive access to the send ring.
>>> + * However, the ACK sending code is independent and can race with
>>> + * message SENDs.
>>> + *
>>> + * In the send path, we need to update the counters for send credits
>>> + * and the counter of posted buffers atomically - when we use the
>>> + * last available credit, we cannot allow another thread to race us
>>> + * and grab the posted credits counter.  Hence, we have to use a
>>> + * spinlock to protect the credit counter, or use atomics.
>>> + *
>>> + * Spinlocks shared between the send and the receive path are bad,
>>> + * because they create unnecessary delays. An early implementation
>>> + * using a spinlock showed a 5% degradation in throughput at some
>>> + * loads.
>>> + *
>>> + * This implementation avoids spinlocks completely, putting both
>>> + * counters into a single atomic, and updating that atomic using
>>> + * atomic_add (in the receive path, when receiving fresh credits),
>>> + * and using atomic_cmpxchg when updating the two counters.
>>> + */
>>> +int rds_ib_send_grab_credits(struct rds_ib_connection *ic,
>>> +			     u32 wanted, u32 *adv_credits)
>>> +{
>>> +	unsigned int avail, posted, got = 0, advertise;
>>> +	long oldval, newval;
>>> +
>>> +	*adv_credits = 0;
>>> +	if (!ic->i_flowctl)
>>> +		return wanted;
>>> +
>>> +try_again:
>>> +	advertise = 0;
>>> +	oldval = newval = atomic_read(&ic->i_credits);
>>> +	posted = IB_GET_POST_CREDITS(oldval);
>>> +	avail = IB_GET_SEND_CREDITS(oldval);
>>> +
>>> +	rdsdebug("rds_ib_send_grab_credits(%u): credits=%u posted=%u\n",
>>> +			wanted, avail, posted);
>>> +
>>> +	/* The last credit must be used to send a credit updated. */
>>> +	if (avail && !posted)
>>> +		avail--;
>>> +
>>> +	if (avail < wanted) {
>>> +		struct rds_connection *conn = ic->i_cm_id->context;
>>> +
>>> +		/* Oops, there aren't that many credits left! */
>>> +		set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
>>> +		got = avail;
>>> +	} else {
>>> +		/* Sometimes you get what you want, lalala. */
>>> +		got = wanted;
>>> +	}
>>> +	newval -= IB_SET_SEND_CREDITS(got);
>>> +
>>> +	if (got && posted) {
>>> +		advertise = min_t(unsigned int, posted, RDS_MAX_ADV_CREDIT);
>>> +		newval -= IB_SET_POST_CREDITS(advertise);
>>> +	}
>>> +
>>> +	/* Finally bill everything */
>>> +	if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
>>> +		goto try_again;
>>> +
>>> +	*adv_credits = advertise;
>>> +	return got;
>>> +}
>>> +
>>> +void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
>>> +{
>>> +	struct rds_ib_connection *ic = conn->c_transport_data;
>>> +
>>> +	if (credits == 0)
>>> +		return;
>>> +
>>> +	rdsdebug("rds_ib_send_add_credits(%u): current=%u%s\n",
>>> +			credits,
>>> +			IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
>>> +			test_bit(RDS_LL_SEND_FULL, &conn->c_flags)? ", ll_send_full" : "");
>>> +
>>> +	atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
>>> +	if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
>>> +		queue_delayed_work(rds_wq, &conn->c_send_w, 0);
>>> +
>>> +	WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
>>> +
>>> +	rds_ib_stats_inc(s_ib_rx_credit_updates);
>>> +}
>>> +
>>> +void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
>>> +{
>>> +	struct rds_ib_connection *ic = conn->c_transport_data;
>>> +
>>> +	if (posted == 0)
>>> +		return;
>>> +
>>> +	atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits);
>>> +
>>> +	/* Decide whether to send an update to the peer now.
>>> +	 * If we would send a credit update for every single buffer we
>>> +	 * post, we would end up with an ACK storm (ACK arrives,
>>> +	 * consumes buffer, we refill the ring, send ACK to remote
>>> +	 * advertising the newly posted buffer... ad inf)
>>> +	 *
>>> +	 * Performance pretty much depends on how often we send
>>> +	 * credit updates - too frequent updates mean lots of ACKs.
>>> +	 * Too infrequent updates, and the peer will run out of
>>> +	 * credits and has to throttle.
>>> +	 * For the time being, 16 seems to be a good compromise.
>>> +	 */
>>> +	if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
>>> +		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
>>> +}
>>> +
>>>  static inline void
>>>  rds_ib_xmit_populate_wr(struct rds_ib_connection *ic,
>>>  		struct rds_ib_send_work *send, unsigned int pos,
>>> @@ -307,6 +445,8 @@ int rds_ib_xmit(struct rds_connection *c
>>>  	u32 pos;
>>>  	u32 i;
>>>  	u32 work_alloc;
>>> +	u32 credit_alloc;
>>> +	u32 adv_credits = 0;
>>>  	int send_flags = 0;
>>>  	int sent;
>>>  	int ret;
>>> @@ -314,6 +454,7 @@ int rds_ib_xmit(struct rds_connection *c
>>>  	BUG_ON(off % RDS_FRAG_SIZE);
>>>  	BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
>>>  +	/* FIXME we may overallocate here */
>>>  	if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
>>>  		i = 1;
>>>  	else
>>> @@ -327,8 +468,29 @@ int rds_ib_xmit(struct rds_connection *c
>>>  		goto out;
>>>  	}
>>>  +	credit_alloc = work_alloc;
>>> +	if (ic->i_flowctl) {
>>> +		credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, &adv_credits);
>>> +		if (credit_alloc < work_alloc) {
>>> +			rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);
>>> +			work_alloc = credit_alloc;
>>> +		}
>>> +		if (work_alloc == 0) {
>>> +			rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
>>> +			rds_ib_stats_inc(s_ib_tx_throttle);
>>> +			ret = -ENOMEM;
>>> +			goto out;
>>> +		}
>>> +	}
>>> +
>>>  	/* map the message the first time we see it */
>>>  	if (ic->i_rm == NULL) {
>>> +		/*
>>> +		printk(KERN_NOTICE "rds_ib_xmit prep msg dport=%u flags=0x%x len=%d\n",
>>> +				be16_to_cpu(rm->m_inc.i_hdr.h_dport),
>>> +				rm->m_inc.i_hdr.h_flags,
>>> +				be32_to_cpu(rm->m_inc.i_hdr.h_len));
>>> +		   */
>>>  		if (rm->m_nents) {
>>>  			rm->m_count = ib_dma_map_sg(dev,
>>>  					 rm->m_sg, rm->m_nents, DMA_TO_DEVICE);
>>> @@ -449,6 +611,24 @@ add_header:
>>>  		 * have been set up to point to the right header buffer. */
>>>  		memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
>>>  +		if (0) {
>>> +			struct rds_header *hdr = &ic->i_send_hdrs[pos];
>>> +
>>> +			printk(KERN_NOTICE "send WR dport=%u flags=0x%x len=%d\n",
>>> +				be16_to_cpu(hdr->h_dport),
>>> +				hdr->h_flags,
>>> +				be32_to_cpu(hdr->h_len));
>>> +		}
>>> +		if (adv_credits) {
>>> +			struct rds_header *hdr = &ic->i_send_hdrs[pos];
>>> +
>>> +			/* add credit and redo the header checksum */
>>> +			hdr->h_credit = adv_credits;
>>> +			rds_message_make_checksum(hdr);
>>> +			adv_credits = 0;
>>> +			rds_ib_stats_inc(s_ib_tx_credit_updates);
>>> +		}
>>> +
>>>  		if (prev)
>>>  			prev->s_wr.next = &send->s_wr;
>>>  		prev = send;
>>> @@ -472,6 +652,8 @@ add_header:
>>>  		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
>>>  		work_alloc = i;
>>>  	}
>>> +	if (ic->i_flowctl && i < credit_alloc)
>>> +		rds_ib_send_add_credits(conn, credit_alloc - i);
>>>   	/* XXX need to worry about failed_wr and partial sends. */
>>>  	failed_wr = &first->s_wr;
>>> @@ -487,11 +669,14 @@ add_header:
>>>  			ic->i_rm = prev->s_rm;
>>>  			prev->s_rm = NULL;
>>>  		}
>>> +		/* Finesse this later */
>>> +		BUG();
>>>  		goto out;
>>>  	}
>>>   	ret = sent;
>>>  out:
>>> +	BUG_ON(adv_credits);
>>>  	return ret;
>>>  }
>>>  @@ -630,3 +815,12 @@ int rds_ib_xmit_rdma(struct rds_connecti
>>>  out:
>>>  	return ret;
>>>  }
>>> +
>>> +void rds_ib_xmit_complete(struct rds_connection *conn)
>>> +{
>>> +	struct rds_ib_connection *ic = conn->c_transport_data;
>>> +
>>> +	/* We may have a pending ACK or window update we were unable
>>> +	 * to send previously (due to flow control). Try again. */
>>> +	rds_ib_attempt_ack(ic);
>>> +}
>>> Index: ofa_kernel-1.3/net/rds/ib_stats.c
>>> ===================================================================
>>> --- ofa_kernel-1.3.orig/net/rds/ib_stats.c
>>> +++ ofa_kernel-1.3/net/rds/ib_stats.c
>>> @@ -46,14 +46,17 @@ static char *rds_ib_stat_names[] = {
>>>  	"ib_tx_cq_call",
>>>  	"ib_tx_cq_event",
>>>  	"ib_tx_ring_full",
>>> +	"ib_tx_throttle",
>>>  	"ib_tx_sg_mapping_failure",
>>>  	"ib_tx_stalled",
>>> +	"ib_tx_credit_updates",
>>>  	"ib_rx_cq_call",
>>>  	"ib_rx_cq_event",
>>>  	"ib_rx_ring_empty",
>>>  	"ib_rx_refill_from_cq",
>>>  	"ib_rx_refill_from_thread",
>>>  	"ib_rx_alloc_limit",
>>> +	"ib_rx_credit_updates",
>>>  	"ib_ack_sent",
>>>  	"ib_ack_send_failure",
>>>  	"ib_ack_send_delayed",
>>> Index: ofa_kernel-1.3/net/rds/rds.h
>>> ===================================================================
>>> --- ofa_kernel-1.3.orig/net/rds/rds.h
>>> +++ ofa_kernel-1.3/net/rds/rds.h
>>> @@ -170,6 +170,7 @@ struct rds_connection {
>>>  #define RDS_FLAG_CONG_BITMAP	0x01
>>>  #define RDS_FLAG_ACK_REQUIRED	0x02
>>>  #define RDS_FLAG_RETRANSMITTED	0x04
>>> +#define RDS_MAX_ADV_CREDIT	255
>>>   /*
>>>   * Maximum space available for extension headers.
>>> @@ -183,7 +184,8 @@ struct rds_header {
>>>  	__be16	h_sport;
>>>  	__be16	h_dport;
>>>  	u8	h_flags;
>>> -	u8	h_padding[5];
>>> +	u8	h_credit;
>>> +	u8	h_padding[4];
>>>  	__sum16	h_csum;
>>>   	u8	h_exthdr[RDS_HEADER_EXT_SPACE];
>>> Index: ofa_kernel-1.3/net/rds/ib_sysctl.c
>>> ===================================================================
>>> --- ofa_kernel-1.3.orig/net/rds/ib_sysctl.c
>>> +++ ofa_kernel-1.3/net/rds/ib_sysctl.c
>>> @@ -53,6 +53,8 @@ unsigned long rds_ib_sysctl_max_unsig_by
>>>  static unsigned long rds_ib_sysctl_max_unsig_bytes_min = 1;
>>>  static unsigned long rds_ib_sysctl_max_unsig_bytes_max = ~0UL;
>>>  +unsigned int rds_ib_sysctl_flow_control = 1;
>>> +
>>>  ctl_table rds_ib_sysctl_table[] = {
>>>  	{
>>>  		.ctl_name       = 1,
>>> @@ -102,6 +104,14 @@ ctl_table rds_ib_sysctl_table[] = {
>>>  		.mode           = 0644,
>>>  		.proc_handler   = &proc_doulongvec_minmax,
>>>  	},
>>> +	{
>>> +		.ctl_name	= 6,
>>> +		.procname	= "flow_control",
>>> +		.data		= &rds_ib_sysctl_flow_control,
>>> +		.maxlen		= sizeof(rds_ib_sysctl_flow_control),
>>> +		.mode		= 0644,
>>> +		.proc_handler	= &proc_dointvec,
>>> +	},
>>>  	{ .ctl_name = 0}
>>>  };
>>>  _______________________________________________
>>> general mailing list
>>> general at lists.openfabrics.org
>>> http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general
>>>
>>> To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general
>>>     
>> _______________________________________________
>> general mailing list
>> general at lists.openfabrics.org
>> http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general
>>
>> To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general
>>   



More information about the rds-devel mailing list