Subject: [PATCH] RPC: make rpc_rqst a part of rpc_task Description: The RPC client allocates rpc_rqst structures from slots in a static table. The size of this slot table determines how many RPC requests can be in flight concurrently for that rpc_clnt (NFS mount point). Previously, this static slot table was fixed in size, and contained just 16 slots for every rpc_clnt. In recent kernels, logic was added to allow this static slot table to be sized and allocated dynamically at the time an rpc_clnt is created. The slot table can now be made as large as 128 slots to permit many more concurrent RPC requests, but the memory remains allocated until each rpc_clnt is destroyed, even when there are no outstanding RPC requests. NFSv4.1 sessions provide the ability to negotiate the maximum number of concurrent requests that the transport and the server can handle. The maximum can be increased or decreased during a transport session. This patch makes RPC requests a part of the RPC task structure. This reduces memory fragmentation by keeping all the pieces of an NFS and RPC request together. For asynchronous reads and writes, this means the RPC request, RPC task, and nfs_read/write_data structures reside in the same CPU cache lines, and will move between caches together. RPC slot tables are eliminated entirely, preventing memory from being held even when it's not being used, and allowing the maximum number of outstanding RPC requests to be modulated dynamically. The call_reserve path has been simplified. The only job it has now are to initialize each rpc_rqst structure, or to queue RPC tasks on the xprt's backlog queue if the maximum number per transport has been exceeded. We no longer have a free list, and each XID can be allocated while holding the xprt_lock. Thus we can do away with the reserve_lock entirely. Test-plan: Heavy multi-threaded tests like "sio" and "iozone" on SMP systems. Watch for signficant regression in CPU utilization with oprofile and other tools. Watch for multiple RPCs with the same XID. Destructive testing. Created: Thu, 30 Sep 2004 21:08:41 -0400 Signed-off-by: Chuck Lever --- include/linux/sunrpc/sched.h | 118 +++++++++++++++++------------------ include/linux/sunrpc/xprt.h | 12 ++- net/sunrpc/clntsock.c | 17 ----- net/sunrpc/xprt.c | 95 +++++++++++----------------- 4 files changed, 109 insertions(+), 133 deletions(-) diff -X /home/cel/src/linux/dont-diff -Naurp 36-rpc-tk_auth/include/linux/sunrpc/sched.h 37-rpc_rqst/include/linux/sunrpc/sched.h --- 36-rpc-tk_auth/include/linux/sunrpc/sched.h 2004-09-30 14:38:14.658792000 -0400 +++ 37-rpc_rqst/include/linux/sunrpc/sched.h 2004-09-30 14:46:59.810292000 -0400 @@ -27,7 +27,63 @@ struct rpc_message { struct rpc_cred * rpc_cred; /* Credentials */ }; -struct rpc_wait_queue; +/* + * Task priorities. + * Note: if you change these, you must also change + * the task initialization definitions below. + */ +#define RPC_PRIORITY_LOW 0 +#define RPC_PRIORITY_NORMAL 1 +#define RPC_PRIORITY_HIGH 2 +#define RPC_NR_PRIORITY (RPC_PRIORITY_HIGH+1) + +/* + * RPC synchronization objects + */ +struct rpc_wait_queue { + spinlock_t lock; + struct list_head tasks[RPC_NR_PRIORITY]; /* task queue for each priority level */ + unsigned long cookie; /* cookie of last task serviced */ + unsigned char maxpriority; /* maximum priority (0 if queue is not a priority queue) */ + unsigned char priority; /* current priority */ + unsigned char count; /* # task groups remaining serviced so far */ + unsigned char nr; /* # tasks remaining for cookie */ +#ifdef RPC_DEBUG + const char * name; +#endif +}; + +/* + * This is the # requests to send consecutively + * from a single cookie. The aim is to improve + * performance of NFS operations such as read/write. + */ +#define RPC_BATCH_COUNT 16 + +#ifndef RPC_DEBUG +# define RPC_WAITQ_INIT(var,qname) { \ + .lock = SPIN_LOCK_UNLOCKED, \ + .tasks = { \ + [0] = LIST_HEAD_INIT(var.tasks[0]), \ + [1] = LIST_HEAD_INIT(var.tasks[1]), \ + [2] = LIST_HEAD_INIT(var.tasks[2]), \ + }, \ + } +#else +# define RPC_WAITQ_INIT(var,qname) { \ + .lock = SPIN_LOCK_UNLOCKED, \ + .tasks = { \ + [0] = LIST_HEAD_INIT(var.tasks[0]), \ + [1] = LIST_HEAD_INIT(var.tasks[1]), \ + [2] = LIST_HEAD_INIT(var.tasks[2]), \ + }, \ + .name = qname, \ + } +#endif +# define RPC_WAITQ(var,qname) struct rpc_wait_queue var = RPC_WAITQ_INIT(var,qname) + +#define RPC_IS_PRIORITY(q) ((q)->maxpriority > 0) + struct rpc_wait { struct list_head list; /* wait queue links */ struct list_head links; /* Links to related tasks */ @@ -35,6 +91,8 @@ struct rpc_wait { struct rpc_wait_queue * rpc_waitq; /* RPC wait queue we're on */ }; +#include + /* * This is the RPC task struct */ @@ -50,6 +108,7 @@ struct rpc_task { /* * RPC call state */ + struct rpc_rqst tk_rqst; /* RPC request */ struct rpc_message tk_msg; /* RPC call info */ __u8 tk_garb_retry, tk_cred_retry, @@ -171,63 +230,6 @@ typedef void (*rpc_action)(struct rpc_ } while (0) /* - * Task priorities. - * Note: if you change these, you must also change - * the task initialization definitions below. - */ -#define RPC_PRIORITY_LOW 0 -#define RPC_PRIORITY_NORMAL 1 -#define RPC_PRIORITY_HIGH 2 -#define RPC_NR_PRIORITY (RPC_PRIORITY_HIGH+1) - -/* - * RPC synchronization objects - */ -struct rpc_wait_queue { - spinlock_t lock; - struct list_head tasks[RPC_NR_PRIORITY]; /* task queue for each priority level */ - unsigned long cookie; /* cookie of last task serviced */ - unsigned char maxpriority; /* maximum priority (0 if queue is not a priority queue) */ - unsigned char priority; /* current priority */ - unsigned char count; /* # task groups remaining serviced so far */ - unsigned char nr; /* # tasks remaining for cookie */ -#ifdef RPC_DEBUG - const char * name; -#endif -}; - -/* - * This is the # requests to send consecutively - * from a single cookie. The aim is to improve - * performance of NFS operations such as read/write. - */ -#define RPC_BATCH_COUNT 16 - -#ifndef RPC_DEBUG -# define RPC_WAITQ_INIT(var,qname) { \ - .lock = SPIN_LOCK_UNLOCKED, \ - .tasks = { \ - [0] = LIST_HEAD_INIT(var.tasks[0]), \ - [1] = LIST_HEAD_INIT(var.tasks[1]), \ - [2] = LIST_HEAD_INIT(var.tasks[2]), \ - }, \ - } -#else -# define RPC_WAITQ_INIT(var,qname) { \ - .lock = SPIN_LOCK_UNLOCKED, \ - .tasks = { \ - [0] = LIST_HEAD_INIT(var.tasks[0]), \ - [1] = LIST_HEAD_INIT(var.tasks[1]), \ - [2] = LIST_HEAD_INIT(var.tasks[2]), \ - }, \ - .name = qname, \ - } -#endif -# define RPC_WAITQ(var,qname) struct rpc_wait_queue var = RPC_WAITQ_INIT(var,qname) - -#define RPC_IS_PRIORITY(q) ((q)->maxpriority > 0) - -/* * Function prototypes */ struct rpc_task *rpc_new_task(struct rpc_clnt *, rpc_action, int flags); diff -X /home/cel/src/linux/dont-diff -Naurp 36-rpc-tk_auth/include/linux/sunrpc/xprt.h 37-rpc_rqst/include/linux/sunrpc/xprt.h --- 36-rpc-tk_auth/include/linux/sunrpc/xprt.h 2004-09-30 14:44:40.189058000 -0400 +++ 37-rpc_rqst/include/linux/sunrpc/xprt.h 2004-09-30 14:46:59.813292000 -0400 @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -108,6 +109,8 @@ struct rpc_rqst { * This is the private part */ struct rpc_task * rq_task; /* RPC task data */ + unsigned char rq_xid_needed : 1, /* XID has been allocated */ + rq_inited : 1; /* release this request */ __u32 rq_xid; /* request XID */ int rq_cong; /* has incremented xprt->cong */ int rq_received; /* receive completed */ @@ -141,6 +144,8 @@ struct rpc_rqst { #define XPRT_COPY_XID (1 << 2) #define XPRT_COPY_DATA (1 << 3) +struct rpc_clnt; + struct rpc_xprt_ops { void (*peer_addr)(struct rpc_xprt *, struct sockaddr_in *); void (*setbufsize)(struct rpc_xprt *); @@ -191,9 +196,8 @@ struct rpc_xprt { struct rpc_wait_queue resend; /* requests waiting to resend */ struct rpc_wait_queue pending; /* requests in flight */ struct rpc_wait_queue backlog; /* waiting for slot */ - struct list_head free; /* free slots */ - struct rpc_rqst * slot; /* slot table storage */ - unsigned int max_reqs; /* total slots */ + atomic_t reqs; /* # of outstanding requests */ + unsigned int max_reqs; /* max concurrent requests allowed */ unsigned long state; /* transport state */ unsigned char shutdown : 1, /* being shut down */ autobind : 1, /* use portmapper */ @@ -229,7 +233,6 @@ struct rpc_xprt { * Send stuff */ spinlock_t xprt_lock; /* lock transport info */ - spinlock_t reserve_lock; /* lock slot table */ struct rpc_task * snd_task; /* Task blocked in send */ struct list_head recv; @@ -260,7 +263,6 @@ void xprt_receive(struct rpc_task *); int xprt_adjust_timeout(struct rpc_rqst *req); void xprt_release(struct rpc_task *); void xprt_connect(struct rpc_task *); -int xprt_clear_backlog(struct rpc_xprt *); int xprt_register(struct xprt_class *); int xprt_unregister(struct xprt_class *); diff -X /home/cel/src/linux/dont-diff -Naurp 36-rpc-tk_auth/net/sunrpc/clntsock.c 37-rpc_rqst/net/sunrpc/clntsock.c --- 36-rpc-tk_auth/net/sunrpc/clntsock.c 2004-09-30 14:44:40.193051000 -0400 +++ 37-rpc_rqst/net/sunrpc/clntsock.c 2004-09-30 14:46:59.816293000 -0400 @@ -466,7 +466,6 @@ static void xprt_sock_close(struct rpc_x static void xprt_sock_destroy(struct rpc_xprt *xprt) { xprt->ops->close(xprt); - kfree(xprt->slot); module_put(THIS_MODULE); } @@ -1265,18 +1264,11 @@ extern unsigned int xprt_tcp_slot_table_ */ static int xprt_sock_setup_udp(struct rpc_xprt *xprt, struct sockaddr_in *addr, void *args) { - size_t slot_table_size; struct rpc_timeout *to = (struct rpc_timeout *) args; dprintk("RPC: setting up udp-ipv4 transport...\n"); xprt->max_reqs = xprt_udp_slot_table_entries; - slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]); - xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); - if (xprt->slot == NULL) - return -ENOMEM; - __module_get(THIS_MODULE); - memset(xprt->slot, 0, slot_table_size); memcpy(&xprt->addr, addr, sizeof(struct sockaddr_in)); xprt->prot = IPPROTO_UDP; @@ -1294,6 +1286,7 @@ static int xprt_sock_setup_udp(struct rp else xprt_set_timeout(&xprt->timeout, 5, 5 * HZ); + __module_get(THIS_MODULE); return 0; } @@ -1306,18 +1299,11 @@ static int xprt_sock_setup_udp(struct rp */ static int xprt_sock_setup_tcp(struct rpc_xprt *xprt, struct sockaddr_in *addr, void *args) { - size_t slot_table_size; struct rpc_timeout *to = (struct rpc_timeout *) args; dprintk("RPC: setting up tcp-ipv4 transport...\n"); xprt->max_reqs = xprt_tcp_slot_table_entries; - slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]); - xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); - if (xprt->slot == NULL) - return -ENOMEM; - __module_get(THIS_MODULE); - memset(xprt->slot, 0, slot_table_size); memcpy(&xprt->addr, addr, sizeof(struct sockaddr_in)); xprt->prot = IPPROTO_TCP; @@ -1335,6 +1321,7 @@ static int xprt_sock_setup_tcp(struct rp else xprt_set_timeout(&xprt->timeout, 5, 60 * HZ); + __module_get(THIS_MODULE); return 0; } diff -X /home/cel/src/linux/dont-diff -Naurp 36-rpc-tk_auth/net/sunrpc/xprt.c 37-rpc_rqst/net/sunrpc/xprt.c --- 36-rpc-tk_auth/net/sunrpc/xprt.c 2004-09-30 14:44:40.222025000 -0400 +++ 37-rpc_rqst/net/sunrpc/xprt.c 2004-09-30 14:46:59.819296000 -0400 @@ -6,9 +6,8 @@ * * The interface works like this: * - * - When a process places a call, it allocates a request slot if - * one is available. Otherwise, it sleeps on the backlog queue - * (xprt_reserve). + * - When a process places a call, it reserves a request if one is + * available. Otherwise, it sleeps on the backlog queue (xprt_reserve). * - Next, the caller puts together the RPC message, stuffs it into * the request struct, and calls xprt_transmit(). * - xprt_transmit sends the message and installs the caller on the @@ -22,7 +21,7 @@ * timeout values (minor timeout) or wakes up the caller with a status * of -ETIMEDOUT. * - When the caller receives a notification from RPC that a reply arrived, - * it should release the RPC slot, and process the reply. + * it should release the RPC request, and process the reply. * If the call timed out, it may choose to retry the operation by * adjusting the initial timeout value, and simply calling rpc_call * again. @@ -55,7 +54,7 @@ /* * Local functions */ -static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); +static inline void xprt_request_init(struct rpc_task *, struct rpc_xprt *); static inline void do_xprt_reserve(struct rpc_task *); static void xprt_connect_status(struct rpc_task *); static inline int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); @@ -118,6 +117,19 @@ int xprt_unregister(struct xprt_class *t EXPORT_SYMBOL(xprt_unregister); /* + * Allocate a 'unique' XID + */ +static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) +{ + return xprt->xid++; +} + +static inline void xprt_init_xid(struct rpc_xprt *xprt) +{ + get_random_bytes(&xprt->xid, sizeof(xprt->xid)); +} + +/* * Serialize write access to tranports, in order to prevent different * requests from interfering with each other. * Also prevents transport connects from colliding with writes. @@ -560,6 +572,12 @@ xprt_prepare_transmit(struct rpc_task *t err = -ENOTCONN; goto out_unlock; } + + if (req->rq_xid_needed) { + req->rq_xid = xprt_alloc_xid(xprt); + req->rq_xid_needed = 0; + } + out_unlock: spin_unlock_bh(&xprt->xprt_lock); return err; @@ -624,9 +642,6 @@ xprt_transmit(struct rpc_task *task) xprt_release_write(xprt, task); } -/* - * Reserve an RPC call slot. - */ static inline void do_xprt_reserve(struct rpc_task *task) { @@ -635,14 +650,17 @@ do_xprt_reserve(struct rpc_task *task) task->tk_status = 0; if (task->tk_rqstp) return; - if (!list_empty(&xprt->free)) { - struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); - list_del_init(&req->rq_list); - task->tk_rqstp = req; + + if (atomic_read(&xprt->reqs) < xprt->max_reqs) { + atomic_inc(&xprt->reqs); + task->tk_rqstp = &task->tk_rqst; xprt_request_init(task, xprt); return; } - dprintk("RPC: waiting for request slot\n"); + + dprintk("RPC: exceeded max requests (%u)\n", + xprt->max_reqs); + task->tk_status = -EAGAIN; task->tk_timeout = 0; rpc_sleep_on(&xprt->backlog, task, NULL, NULL); @@ -655,47 +673,33 @@ xprt_reserve(struct rpc_task *task) task->tk_status = -EIO; if (!xprt->shutdown) { - spin_lock(&xprt->reserve_lock); do_xprt_reserve(task); - spin_unlock(&xprt->reserve_lock); if (task->tk_rqstp) del_timer_sync(&xprt->timer); } } /* - * Allocate a 'unique' XID - */ -static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) -{ - return xprt->xid++; -} - -static inline void xprt_init_xid(struct rpc_xprt *xprt) -{ - get_random_bytes(&xprt->xid, sizeof(xprt->xid)); -} - -/* * Initialize RPC request */ -static void +static inline void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) { struct rpc_rqst *req = task->tk_rqstp; + INIT_LIST_HEAD(&req->rq_list); req->rq_timeout = xprt->timeout.to_initval; req->rq_task = task; req->rq_xprt = xprt; + req->rq_xid_needed = 1; req->rq_buffer = NULL; req->rq_bufsize = 0; - req->rq_xid = xprt_alloc_xid(xprt); - dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, - req, htonl(req->rq_xid)); + dprintk("RPC: %4d request reserved on xprt %p, xprt->reqs=%u\n", + task->tk_pid, xprt, atomic_read(&xprt->reqs)); } /* - * Release an RPC call slot + * Release an RPC request */ void xprt_release(struct rpc_task *task) @@ -720,10 +724,8 @@ xprt_release(struct rpc_task *task) dprintk("RPC: %4d release request %p\n", task->tk_pid, req); - spin_lock(&xprt->reserve_lock); - list_add(&req->rq_list, &xprt->free); - xprt_clear_backlog(xprt); - spin_unlock(&xprt->reserve_lock); + atomic_dec(&xprt->reqs); + rpc_wake_up_next(&xprt->backlog); } /* @@ -747,7 +749,6 @@ xprt_setup(int proto, struct sockaddr_in { int result; struct rpc_xprt *xprt; - struct rpc_rqst *req; struct xprt_class *t; /* @@ -783,9 +784,6 @@ found: * Generic initialization */ spin_lock_init(&xprt->xprt_lock); - spin_lock_init(&xprt->reserve_lock); - - INIT_LIST_HEAD(&xprt->free); INIT_LIST_HEAD(&xprt->recv); INIT_WORK(&xprt->task_cleanup, xprt_autoclose, xprt); init_timer(&xprt->timer); @@ -798,13 +796,9 @@ found: rpc_init_wait_queue(&xprt->resend, "xprt_resend"); rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); - /* initialize free list */ - for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) - list_add(&req->rq_list, &xprt->free); - xprt_init_xid(xprt); - dprintk("RPC: created transport %p with %u slots\n", xprt, + dprintk("RPC: created transport %p with %u reqs\n", xprt, xprt->max_reqs); return xprt; @@ -839,15 +833,6 @@ xprt_shutdown(struct rpc_xprt *xprt) } /* - * Clear the xprt backlog queue - */ -int -xprt_clear_backlog(struct rpc_xprt *xprt) { - rpc_wake_up_next(&xprt->backlog); - return 1; -} - -/* * Destroy an RPC transport, killing off all requests. */ int