[PATCH] RPC: transport-specific timeouts Prepare the way to remove the "xprt->nocong" variable by adding callouts to the RPC client transport switch API to handle setting RPC retransmit timeouts. Note that we move __xprt_{get,put}_cong to work around a compiler inlining bug. [ gcc version 3.4.2 20041017 (Red Hat 3.4.2-6.fc3) ] Test-plan: Use WAN simulation to cause sporadic bursty packet loss. Look for significant regression in performance or client stability. Version: Thu, 10 Mar 2005 18:50:50 -0500 Signed-off-by: Chuck Lever --- include/linux/sunrpc/xprt.h | 4 + net/sunrpc/xprt.c | 104 +++++++++++++----------------------- net/sunrpc/xprtsock.c | 66 ++++++++++++++++++++++ 3 files changed, 109 insertions(+), 65 deletions(-) diff -X /home/cel/src/linux/dont-diff -Naurp 37-xprt-flush-connects/include/linux/sunrpc/xprt.h 40-rpc-congestion/include/linux/sunrpc/xprt.h --- 37-xprt-flush-connects/include/linux/sunrpc/xprt.h 2005-03-07 16:39:15.721976000 -0500 +++ 40-rpc-congestion/include/linux/sunrpc/xprt.h 2005-03-07 16:41:05.814966000 -0500 @@ -41,7 +41,6 @@ extern unsigned int xprt_tcp_slot_table_ #define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT) #define RPC_INITCWND RPC_CWNDSCALE #define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT) -#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) /* * RPC call and reply header size as number of 32bit words (verifier @@ -121,6 +120,8 @@ struct rpc_xprt_ops { void (*setbufsize)(struct rpc_xprt *); void (*connect)(struct rpc_task *); int (*send_request)(struct rpc_task *); + void (*set_receive_timeout)(struct rpc_task *); + int (*is_congested)(struct rpc_xprt *); void (*timeout)(struct rpc_xprt *); void (*close)(struct rpc_xprt *); void (*destroy)(struct rpc_xprt *); @@ -216,6 +217,7 @@ void xprt_set_timeout(struct rpc_timeo struct rpc_rqst * xprt_lookup_rqst(struct rpc_xprt *, u32); void xprt_complete_rqst(struct rpc_xprt *, struct rpc_rqst *, int); +void xprt_adjust_cwnd(struct rpc_rqst *, int); void xprt_reserve(struct rpc_task *); int xprt_prepare_transmit(struct rpc_task *); void xprt_transmit(struct rpc_task *); diff -X /home/cel/src/linux/dont-diff -Naurp 37-xprt-flush-connects/net/sunrpc/xprt.c 40-rpc-congestion/net/sunrpc/xprt.c --- 37-xprt-flush-connects/net/sunrpc/xprt.c 2005-03-07 16:39:15.812953000 -0500 +++ 40-rpc-congestion/net/sunrpc/xprt.c 2005-03-07 16:41:05.837966000 -0500 @@ -58,11 +58,41 @@ static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); static inline void do_xprt_reserve(struct rpc_task *); static void xprt_connect_status(struct rpc_task *task); -static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); static int xprt_clear_backlog(struct rpc_xprt *xprt); /* + * Van Jacobson congestion avoidance. Check if the congestion window + * overflowed. The caller will put the task to sleep if this is the case. + */ +static inline int __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + + if (req->rq_cong) + return 1; + dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n", + task->tk_pid, xprt->cong, xprt->cwnd); + if (xprt->ops->is_congested(xprt)) + return 0; + req->rq_cong = 1; + xprt->cong += RPC_CWNDSCALE; + return 1; +} + +/* + * Adjust the congestion window. The caller wakes up the next task + * that has been sleeping due to congestion. + */ +static inline void __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) +{ + if (!req->rq_cong) + return; + req->rq_cong = 0; + xprt->cong -= RPC_CWNDSCALE; +} + +/* * Serialize write access to transports, in order to prevent different * requests from interfering with each other. * Also prevents transport connects from colliding with writes. @@ -120,7 +150,7 @@ __xprt_lock_write_next(struct rpc_xprt * if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; - if (!xprt->nocong && RPCXPRT_CONGESTED(xprt)) + if (xprt->ops->is_congested(xprt)) goto out_unlock; task = rpc_wake_up_next(&xprt->resend); if (!task) { @@ -167,47 +197,13 @@ xprt_release_write(struct rpc_xprt *xprt } /* - * Van Jacobson congestion avoidance. Check if the congestion window - * overflowed. Put the task to sleep if this is the case. - */ -static int -__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) -{ - struct rpc_rqst *req = task->tk_rqstp; - - if (req->rq_cong) - return 1; - dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n", - task->tk_pid, xprt->cong, xprt->cwnd); - if (RPCXPRT_CONGESTED(xprt)) - return 0; - req->rq_cong = 1; - xprt->cong += RPC_CWNDSCALE; - return 1; -} - -/* - * Adjust the congestion window, and wake up the next task - * that has been sleeping due to congestion - */ -static void -__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) -{ - if (!req->rq_cong) - return; - req->rq_cong = 0; - xprt->cong -= RPC_CWNDSCALE; - __xprt_lock_write_next(xprt); -} - -/* * Adjust RPC congestion window * We use a time-smoothed congestion estimator to avoid heavy oscillation. */ -static void -xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) +void xprt_adjust_cwnd(struct rpc_rqst *req, int result) { unsigned long cwnd; + struct rpc_xprt *xprt = req->rq_xprt; cwnd = xprt->cwnd; if (result >= 0 && cwnd <= xprt->cong) { @@ -225,6 +221,8 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", xprt->cong, xprt->cwnd, cwnd); xprt->cwnd = cwnd; + __xprt_put_cong(xprt, req); + __xprt_lock_write_next(xprt); } /* @@ -419,20 +417,6 @@ struct rpc_rqst *xprt_lookup_rqst(struct void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) { struct rpc_task *task = req->rq_task; - struct rpc_clnt *clnt = task->tk_client; - - /* Adjust congestion window */ - if (!xprt->nocong) { - unsigned timer = task->tk_msg.rpc_proc->p_timer; - xprt_adjust_cwnd(xprt, copied); - __xprt_put_cong(xprt, req); - if (timer) { - if (req->rq_ntrans == 1) - rpc_update_rtt(clnt->cl_rtt, timer, - (long)jiffies - req->rq_xtime); - rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1); - } - } #ifdef RPC_PROFILE /* Profile only reads for now */ @@ -476,8 +460,7 @@ xprt_timer(struct rpc_task *task) if (req->rq_received) goto out; - xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT); - __xprt_put_cong(xprt, req); + xprt_adjust_cwnd(req, -ETIMEDOUT); dprintk("RPC: %4d xprt_timer (%s request)\n", task->tk_pid, req ? "pending" : "backlogged"); @@ -527,7 +510,6 @@ out_unlock: void xprt_transmit(struct rpc_task *task) { - struct rpc_clnt *clnt = task->tk_client; struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; int status; @@ -552,16 +534,9 @@ xprt_transmit(struct rpc_task *task) status = xprt->ops->send_request(task); if (!status) { dprintk("RPC: %4d xmit complete\n", task->tk_pid); - /* Set the task's receive timeout value */ + spin_lock_bh(&xprt->transport_lock); - if (!xprt->nocong) { - int timer = task->tk_msg.rpc_proc->p_timer; - task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer); - task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries; - if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0) - task->tk_timeout = xprt->timeout.to_maxval; - } else - task->tk_timeout = req->rq_timeout; + xprt->ops->set_receive_timeout(task); /* Don't race with disconnect */ if (!xprt_connected(xprt)) task->tk_status = -ENOTCONN; @@ -675,6 +650,7 @@ xprt_release(struct rpc_task *task) spin_lock_bh(&xprt->transport_lock); __xprt_release_write(xprt, task); __xprt_put_cong(xprt, req); + __xprt_lock_write_next(xprt); if (!list_empty(&req->rq_list)) list_del(&req->rq_list); xprt->last_used = jiffies; diff -X /home/cel/src/linux/dont-diff -Naurp 37-xprt-flush-connects/net/sunrpc/xprtsock.c 40-rpc-congestion/net/sunrpc/xprtsock.c --- 37-xprt-flush-connects/net/sunrpc/xprtsock.c 2005-03-07 16:39:15.825940000 -0500 +++ 40-rpc-congestion/net/sunrpc/xprtsock.c 2005-03-07 16:41:05.858966000 -0500 @@ -431,6 +431,56 @@ static void xs_tcp_timeout(struct rpc_xp } /** + * xs_udp_set_receive_timeout - determine time to wait for reply + * @task: task whose timeout we'll set + * + */ +static void xs_udp_set_receive_timeout(struct rpc_task *task) +{ + int timer = task->tk_msg.rpc_proc->p_timer; + struct rpc_rtt *rtt = task->tk_client->cl_rtt; + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + task->tk_timeout = rpc_calc_rto(rtt, timer); + task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; + if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0) + task->tk_timeout = xprt->timeout.to_maxval; +} + +/** + * xs_tcp_set_receive_timeout - determine time to wait for reply + * @task: task whose timeout we'll set + * + */ +static void xs_tcp_set_receive_timeout(struct rpc_task *task) +{ + task->tk_timeout = task->tk_rqstp->rq_timeout; +} + +/** + * xs_udp_is_congested - determine whether UDP transport is congested + * @task: task whose timeout we'll set + * + */ +static int xs_udp_is_congested(struct rpc_xprt *xprt) +{ + return xprt->cong >= xprt->cwnd; +} + +/** + * xs_tcp_is_congested - determine whether TCP transport is congested + * @xprt: transport to check + * + * TCP handles congestion control itself, so TCP transports are + * never congested. + */ +static int xs_tcp_is_congested(struct rpc_xprt *xprt) +{ + return 0; +} + +/** * xs_close - close a socket * @xprt: transport * @@ -503,6 +553,7 @@ static void xs_udp_data_ready(struct soc struct sk_buff *skb; int err, repsize, copied; u32 _xid, *xp; + unsigned timer; read_lock(&sk->sk_callback_lock); dprintk("RPC: xs_udp_data_ready...\n"); @@ -550,6 +601,17 @@ static void xs_udp_data_ready(struct soc /* Something worked... */ dst_confirm(skb->dst); + /* Adjust congestion window */ + timer = task->tk_msg.rpc_proc->p_timer; + xprt_adjust_cwnd(rovr, copied); + if (timer) { + struct rpc_rtt *rtt = task->tk_client->cl_rtt; + if (rovr->rq_ntrans == 1) + rpc_update_rtt(rtt, timer, + (long)jiffies - rovr->rq_xtime); + rpc_set_timeo(rtt, timer, rovr->rq_ntrans - 1); + } + xprt_complete_rqst(xprt, rovr, copied); out_unlock: @@ -1185,6 +1247,8 @@ static struct rpc_xprt_ops xs_udp_ops = .setbufsize = xs_udp_setbufsize, .connect = xs_connect, .send_request = xs_udp_send_request, + .set_receive_timeout = xs_udp_set_receive_timeout, + .is_congested = xs_udp_is_congested, .timeout = xs_udp_timeout, .close = xs_close, .destroy = xs_destroy, @@ -1194,6 +1258,8 @@ static struct rpc_xprt_ops xs_tcp_ops = .setbufsize = xs_tcp_setbufsize, .connect = xs_connect, .send_request = xs_tcp_send_request, + .set_receive_timeout = xs_tcp_set_receive_timeout, + .is_congested = xs_tcp_is_congested, .timeout = xs_tcp_timeout, .close = xs_close, .destroy = xs_destroy,