aio: issue syslets This patch uses the scheduler's syslets infrastructure to avoid blocking sys_io_submit() while it is performing an operation. It is a simple approach to illustrate the change, it is not complete or without flaws. sys_io_submit() no longer allocates a kiocb for each user-provided iocb. It calls the specified system call directly. If it blocks then a cachemiss thread returns to the caller of sys_io_submit() which indicates the number of iocbs that were processed, including the one which just blocked. This doesn't seem optimal at first glance because it's likely that each iocb will block, so we'll be performing more system calls than we used to. But it might not be a significant overhead. It's also not clear if all users will know to keep trying their sys_io_submit() calls. aio-stress.c, for one, doesn't. The point of this is to remove struct kiocb from the kernel. System call handlers will no longer need specific code to be called from fs/aio.c. This is only the beginning. This patch orphans quite a lot of code in fs/aio.c that is not touched by this patch. Signed-off-by: Zach Brown diff -r 87bdf1ab548f arch/x86_64/kernel/entry.S --- a/arch/x86_64/kernel/entry.S Tue May 22 15:29:42 2007 -0700 +++ b/arch/x86_64/kernel/entry.S Wed May 23 14:15:31 2007 -0700 @@ -418,6 +418,10 @@ END(\label) */ PTREGSCALL stub_async_thread, sys_async_thread, %rdx PTREGSCALL stub_async_exec, sys_async_exec, %rdx + /* + * sys_io_submit() does the same, but with three arguments. + */ + PTREGSCALL stub_io_submit, sys_io_submit, %r8 ENTRY(ptregscall_common) popq %r11 diff -r 87bdf1ab548f fs/aio.c --- a/fs/aio.c Tue May 22 15:29:42 2007 -0700 +++ b/fs/aio.c Wed May 23 14:15:31 2007 -0700 @@ -16,6 +16,7 @@ #include #include #include +#include #define DEBUG 0 @@ -920,50 +921,18 @@ EXPORT_SYMBOL(kick_iocb); * Returns true if this is the last user of the request. The * only other user of the request can be the cancellation code. */ -int fastcall aio_complete(struct kiocb *iocb, long res, long res2) -{ - struct kioctx *ctx = iocb->ki_ctx; +static void aio_append_io_event(struct kioctx *ctx, long res, + void __user *user_iocb, u64 user_data) +{ struct aio_ring_info *info; struct aio_ring *ring; struct io_event *event; unsigned long flags; unsigned long tail; - int ret; - - /* - * Special case handling for sync iocbs: - * - events go directly into the iocb for fast handling - * - the sync task with the iocb in its stack holds the single iocb - * ref, no other paths have a way to get another ref - * - the sync task helpfully left a reference to itself in the iocb - */ - if (is_sync_kiocb(iocb)) { - BUG_ON(iocb->ki_users != 1); - iocb->ki_user_data = res; - iocb->ki_users = 0; - wake_up_process(iocb->ki_obj.tsk); - return 1; - } info = &ctx->ring_info; - /* add a completion event to the ring buffer. - * must be done holding ctx->ctx_lock to prevent - * other code from messing with the tail - * pointer since we might be called from irq - * context. - */ spin_lock_irqsave(&ctx->ctx_lock, flags); - - if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list)) - list_del_init(&iocb->ki_run_list); - - /* - * cancelled requests don't get events, userland was given one - * when the event got cancelled. - */ - if (kiocbIsCancelled(iocb)) - goto put_rq; ring = kmap_atomic(info->ring_pages[0], KM_IRQ1); @@ -972,14 +941,10 @@ int fastcall aio_complete(struct kiocb * if (++tail >= info->nr) tail = 0; - event->obj = (u64)(unsigned long)iocb->ki_obj.user; - event->data = iocb->ki_user_data; + event->obj = (u64)(unsigned long)user_iocb; + event->data = user_data; event->res = res; - event->res2 = res2; - - dprintk("aio_complete: %p[%lu]: %p: %p %Lx %lx %lx\n", - ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data, - res, res2); + event->res2 = 0; /* after flagging the request as done, we * must never even look at it again @@ -992,16 +957,10 @@ int fastcall aio_complete(struct kiocb * put_aio_ring_event(event, KM_IRQ0); kunmap_atomic(ring, KM_IRQ1); - pr_debug("added to ring %p at [%lu]\n", iocb, tail); -put_rq: - /* everything turned out well, dispose of the aiocb. */ - ret = __aio_put_req(ctx, iocb); - if (waitqueue_active(&ctx->wait)) wake_up(&ctx->wait); spin_unlock_irqrestore(&ctx->ctx_lock, flags); - return ret; } /* aio_read_evt @@ -1521,13 +1480,98 @@ static int aio_wake_function(wait_queue_ return 1; } -int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, - struct iocb *iocb) -{ - struct kiocb *req; - struct file *file; - ssize_t ret; - +static int io_call_syscall(struct iocb *iocb) +{ + int ret; + + switch(iocb->aio_lio_opcode) { + case IOCB_CMD_PREAD: + ret = sys_call_table[__NR_pread64](iocb->aio_fildes, + iocb->aio_buf, iocb->aio_nbytes, + iocb->aio_offset, 0, 0); + break; + case IOCB_CMD_PWRITE: + ret = sys_call_table[__NR_pwrite64](iocb->aio_fildes, + iocb->aio_buf, iocb->aio_nbytes, + iocb->aio_offset, 0, 0); + break; + default: + /* io_prepare_iocb should stop us from getting here */ + ret = -ENOSYS; + } + + return ret; +} + +static int io_call_syslet(struct kioctx *ctx, struct iocb *iocb, + void __user *user_iocb) +{ + struct task_struct *t = current; + struct async_head *ah = t->ah; + struct async_thread *at = &t->__at; + int ret; + + /* + * We'll also need to have a context that the user can cancel which + * this totally ignores for now. + */ + + if (unlikely(!ah)) { + async_head_init(t); + ah = t->ah; + } + + /* + * For now, just go sync if we can't make sure that a thread is + * waiting. + */ + t->async_ready = NULL; + if (unlikely(list_empty(&ah->ready_async_threads))) + async_refill_cachemiss_pool(ah, t, NULL); + if (likely(!list_empty(&ah->ready_async_threads))) + t->async_ready = at; + + /* + * Call the syscall and generate a completion event with its + * return code. + */ + ret = io_call_syscall(iocb); + aio_append_io_event(ctx, ret, user_iocb, iocb->aio_data); + + /* + * If we're still executing in the submitting task because our call + * didn't block then we return to the caller to try the next iocb. + * We're still using the submitting task's ctx ref so we don't + * need to do anything with it here. + */ + if (t->ah) { + t->async_ready = NULL; + return 0; + } + + /* + * If we did block then we were given the submitting task's io + * context ref. First drop it. + */ + put_ioctx(ctx); + + /* + * From now on we're a waiting thread that some future io submisison + * can hand userspace state over to. We need to be careful to return + * to userspace with the return code that the new io_submit that + * we're working for wanted us to return. + */ + async_cachemiss_loop(at, ah, t); + return task_ret_reg(t); +} + +/* + * Returns a negative error if the iocb is invalid. + * + * XXX it should be putting the id of the iocb to userspace. + */ +static int io_prepare_iocb(struct iocb *iocb) +{ /* enforce forwards compatibility on users */ if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 || iocb->aio_reserved3)) { @@ -1545,53 +1589,15 @@ int fastcall io_submit_one(struct kioctx return -EINVAL; } - file = fget(iocb->aio_fildes); - if (unlikely(!file)) - return -EBADF; - - req = aio_get_req(ctx); /* returns with 2 references to req */ - if (unlikely(!req)) { - fput(file); - return -EAGAIN; - } - - req->ki_filp = file; - ret = put_user(req->ki_key, &user_iocb->aio_key); - if (unlikely(ret)) { - dprintk("EFAULT: aio_key\n"); - goto out_put_req; - } - - req->ki_obj.user = user_iocb; - req->ki_user_data = iocb->aio_data; - req->ki_pos = iocb->aio_offset; - - req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf; - req->ki_left = req->ki_nbytes = iocb->aio_nbytes; - req->ki_opcode = iocb->aio_lio_opcode; - init_waitqueue_func_entry(&req->ki_wait, aio_wake_function); - INIT_LIST_HEAD(&req->ki_wait.task_list); - - ret = aio_setup_iocb(req); - - if (ret) - goto out_put_req; - - spin_lock_irq(&ctx->ctx_lock); - aio_run_iocb(req); - if (!list_empty(&ctx->run_list)) { - /* drain the run list */ - while (__aio_run_iocbs(ctx)) - ; - } - spin_unlock_irq(&ctx->ctx_lock); - aio_put_req(req); /* drop extra ref to req */ + switch (iocb->aio_lio_opcode) { + case IOCB_CMD_PREAD: + case IOCB_CMD_PWRITE: + break; + default: + return -EINVAL; + } + return 0; - -out_put_req: - aio_put_req(req); /* drop extra ref to req */ - aio_put_req(req); /* drop i/o ref to req */ - return ret; } /* sys_io_submit: @@ -1610,6 +1616,7 @@ asmlinkage long sys_io_submit(aio_contex struct iocb __user * __user *iocbpp) { struct kioctx *ctx; + struct task_struct *t = current; long ret = 0; int i; @@ -1619,17 +1626,19 @@ asmlinkage long sys_io_submit(aio_contex if (unlikely(!access_ok(VERIFY_READ, iocbpp, (nr*sizeof(*iocbpp))))) return -EFAULT; + /* + * This context reference is handled very carefully. If none of + * the iocb handlers block then we drop it at the bottom of this + * function. Otherwise it is handed off to the thread who blocked + * and they drop it after generating a completion event. + */ ctx = lookup_ioctx(ctx_id); if (unlikely(!ctx)) { pr_debug("EINVAL: io_submit: invalid context id\n"); return -EINVAL; } - /* - * AKPM: should this return a partial result if some of the IOs were - * successfully submitted? - */ - for (i=0; i