diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2020-08-16 10:55:12 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2020-08-16 10:55:12 -0700 |
commit | 2cc3c4b3c2e9c99e90aaf19cd801ff2c160f283c (patch) | |
tree | bdaf69e9d6c92ac87f0f1e340c99b0155f337ba1 /fs | |
parent | 6f6aea7e966cda5a817d091e938c2d9b52209893 (diff) | |
parent | f91daf565b0e272a33bd3fcd19eaebd331c5cffd (diff) | |
download | linux-rt-2cc3c4b3c2e9c99e90aaf19cd801ff2c160f283c.tar.gz |
Merge tag 'io_uring-5.9-2020-08-15' of git://git.kernel.dk/linux-block
Pull io_uring fixes from Jens Axboe:
"A few differerent things in here.
Seems like syzbot got some more io_uring bits wired up, and we got a
handful of reports and the associated fixes are in here.
General fixes too, and a lot of them marked for stable.
Lastly, a bit of fallout from the async buffered reads, where we now
more easily trigger short reads. Some applications don't really like
that, so the io_read() code now handles short reads internally, and
got a cleanup along the way so that it's now easier to read (and
documented). We're now passing tests that failed before"
* tag 'io_uring-5.9-2020-08-15' of git://git.kernel.dk/linux-block:
io_uring: short circuit -EAGAIN for blocking read attempt
io_uring: sanitize double poll handling
io_uring: internally retry short reads
io_uring: retain iov_iter state over io_read/io_write calls
task_work: only grab task signal lock when needed
io_uring: enable lookup of links holding inflight files
io_uring: fail poll arm on queue proc failure
io_uring: hold 'ctx' reference around task_work queue + execute
fs: RWF_NOWAIT should imply IOCB_NOIO
io_uring: defer file table grabbing request cleanup for locked requests
io_uring: add missing REQ_F_COMP_LOCKED for nested requests
io_uring: fix recursive completion locking on oveflow flush
io_uring: use TWA_SIGNAL for task_work uncondtionally
io_uring: account locked memory before potential error case
io_uring: set ctx sq/cq entry count earlier
io_uring: Fix NULL pointer dereference in loop_rw_iter()
io_uring: add comments on how the async buffered read retry works
io_uring: io_async_buf_func() need not test page bit
Diffstat (limited to 'fs')
-rw-r--r-- | fs/io_uring.c | 539 |
1 files changed, 386 insertions, 153 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c index 2a3af95be4ca..dc506b75659c 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -508,9 +508,9 @@ struct io_async_msghdr { struct io_async_rw { struct iovec fast_iov[UIO_FASTIOV]; - struct iovec *iov; - ssize_t nr_segs; - ssize_t size; + const struct iovec *free_iovec; + struct iov_iter iter; + size_t bytes_done; struct wait_page_queue wpq; }; @@ -898,6 +898,7 @@ static void io_put_req(struct io_kiocb *req); static void io_double_put_req(struct io_kiocb *req); static void __io_double_put_req(struct io_kiocb *req); static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req); +static void __io_queue_linked_timeout(struct io_kiocb *req); static void io_queue_linked_timeout(struct io_kiocb *req); static int __io_sqe_files_update(struct io_ring_ctx *ctx, struct io_uring_files_update *ip, @@ -914,9 +915,9 @@ static void io_file_put_work(struct work_struct *work); static ssize_t io_import_iovec(int rw, struct io_kiocb *req, struct iovec **iovec, struct iov_iter *iter, bool needs_lock); -static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size, - struct iovec *iovec, struct iovec *fast_iov, - struct iov_iter *iter); +static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec, + const struct iovec *fast_iov, + struct iov_iter *iter, bool force); static struct kmem_cache *req_cachep; @@ -1107,10 +1108,16 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx) } } -static void io_req_clean_work(struct io_kiocb *req) +/* + * Returns true if we need to defer file table putting. This can only happen + * from the error path with REQ_F_COMP_LOCKED set. + */ +static bool io_req_clean_work(struct io_kiocb *req) { if (!(req->flags & REQ_F_WORK_INITIALIZED)) - return; + return false; + + req->flags &= ~REQ_F_WORK_INITIALIZED; if (req->work.mm) { mmdrop(req->work.mm); @@ -1123,6 +1130,9 @@ static void io_req_clean_work(struct io_kiocb *req) if (req->work.fs) { struct fs_struct *fs = req->work.fs; + if (req->flags & REQ_F_COMP_LOCKED) + return true; + spin_lock(&req->work.fs->lock); if (--fs->users) fs = NULL; @@ -1131,7 +1141,8 @@ static void io_req_clean_work(struct io_kiocb *req) free_fs_struct(fs); req->work.fs = NULL; } - req->flags &= ~REQ_F_WORK_INITIALIZED; + + return false; } static void io_prep_async_work(struct io_kiocb *req) @@ -1179,7 +1190,7 @@ static void io_prep_async_link(struct io_kiocb *req) io_prep_async_work(cur); } -static void __io_queue_async_work(struct io_kiocb *req) +static struct io_kiocb *__io_queue_async_work(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; struct io_kiocb *link = io_prep_linked_timeout(req); @@ -1187,16 +1198,19 @@ static void __io_queue_async_work(struct io_kiocb *req) trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req, &req->work, req->flags); io_wq_enqueue(ctx->io_wq, &req->work); - - if (link) - io_queue_linked_timeout(link); + return link; } static void io_queue_async_work(struct io_kiocb *req) { + struct io_kiocb *link; + /* init ->work of the whole link before punting */ io_prep_async_link(req); - __io_queue_async_work(req); + link = __io_queue_async_work(req); + + if (link) + io_queue_linked_timeout(link); } static void io_kill_timeout(struct io_kiocb *req) @@ -1229,12 +1243,19 @@ static void __io_queue_deferred(struct io_ring_ctx *ctx) do { struct io_defer_entry *de = list_first_entry(&ctx->defer_list, struct io_defer_entry, list); + struct io_kiocb *link; if (req_need_defer(de->req, de->seq)) break; list_del_init(&de->list); /* punt-init is done before queueing for defer */ - __io_queue_async_work(de->req); + link = __io_queue_async_work(de->req); + if (link) { + __io_queue_linked_timeout(link); + /* drop submission reference */ + link->flags |= REQ_F_COMP_LOCKED; + io_put_req(link); + } kfree(de); } while (!list_empty(&ctx->defer_list)); } @@ -1533,7 +1554,7 @@ static inline void io_put_file(struct io_kiocb *req, struct file *file, fput(file); } -static void io_dismantle_req(struct io_kiocb *req) +static bool io_dismantle_req(struct io_kiocb *req) { io_clean_op(req); @@ -1541,7 +1562,6 @@ static void io_dismantle_req(struct io_kiocb *req) kfree(req->io); if (req->file) io_put_file(req, req->file, (req->flags & REQ_F_FIXED_FILE)); - io_req_clean_work(req); if (req->flags & REQ_F_INFLIGHT) { struct io_ring_ctx *ctx = req->ctx; @@ -1553,15 +1573,15 @@ static void io_dismantle_req(struct io_kiocb *req) wake_up(&ctx->inflight_wait); spin_unlock_irqrestore(&ctx->inflight_lock, flags); } + + return io_req_clean_work(req); } -static void __io_free_req(struct io_kiocb *req) +static void __io_free_req_finish(struct io_kiocb *req) { - struct io_ring_ctx *ctx; + struct io_ring_ctx *ctx = req->ctx; - io_dismantle_req(req); __io_put_req_task(req); - ctx = req->ctx; if (likely(!io_is_fallback_req(req))) kmem_cache_free(req_cachep, req); else @@ -1569,6 +1589,39 @@ static void __io_free_req(struct io_kiocb *req) percpu_ref_put(&ctx->refs); } +static void io_req_task_file_table_put(struct callback_head *cb) +{ + struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); + struct fs_struct *fs = req->work.fs; + + spin_lock(&req->work.fs->lock); + if (--fs->users) + fs = NULL; + spin_unlock(&req->work.fs->lock); + if (fs) + free_fs_struct(fs); + req->work.fs = NULL; + __io_free_req_finish(req); +} + +static void __io_free_req(struct io_kiocb *req) +{ + if (!io_dismantle_req(req)) { + __io_free_req_finish(req); + } else { + int ret; + + init_task_work(&req->task_work, io_req_task_file_table_put); + ret = task_work_add(req->task, &req->task_work, TWA_RESUME); + if (unlikely(ret)) { + struct task_struct *tsk; + + tsk = io_wq_get_task(req->ctx->io_wq); + task_work_add(tsk, &req->task_work, 0); + } + } +} + static bool io_link_cancel_timeout(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; @@ -1598,6 +1651,7 @@ static bool __io_kill_linked_timeout(struct io_kiocb *req) return false; list_del_init(&link->link_list); + link->flags |= REQ_F_COMP_LOCKED; wake_ev = io_link_cancel_timeout(link); req->flags &= ~REQ_F_LINK_TIMEOUT; return wake_ev; @@ -1656,6 +1710,7 @@ static void __io_fail_links(struct io_kiocb *req) trace_io_uring_fail_link(req, link); io_cqring_fill_event(link, -ECANCELED); + link->flags |= REQ_F_COMP_LOCKED; __io_double_put_req(link); req->flags &= ~REQ_F_LINK_TIMEOUT; } @@ -1710,22 +1765,22 @@ static int io_req_task_work_add(struct io_kiocb *req, struct callback_head *cb) { struct task_struct *tsk = req->task; struct io_ring_ctx *ctx = req->ctx; - int ret, notify = TWA_RESUME; + int ret, notify; /* - * SQPOLL kernel thread doesn't need notification, just a wakeup. - * If we're not using an eventfd, then TWA_RESUME is always fine, - * as we won't have dependencies between request completions for - * other kernel wait conditions. + * SQPOLL kernel thread doesn't need notification, just a wakeup. For + * all other cases, use TWA_SIGNAL unconditionally to ensure we're + * processing task_work. There's no reliable way to tell if TWA_RESUME + * will do the job. */ - if (ctx->flags & IORING_SETUP_SQPOLL) - notify = 0; - else if (ctx->cq_ev_fd) + notify = 0; + if (!(ctx->flags & IORING_SETUP_SQPOLL)) notify = TWA_SIGNAL; ret = task_work_add(tsk, cb, notify); if (!ret) wake_up_process(tsk); + return ret; } @@ -1766,8 +1821,10 @@ static void __io_req_task_submit(struct io_kiocb *req) static void io_req_task_submit(struct callback_head *cb) { struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); + struct io_ring_ctx *ctx = req->ctx; __io_req_task_submit(req); + percpu_ref_put(&ctx->refs); } static void io_req_task_queue(struct io_kiocb *req) @@ -1775,6 +1832,7 @@ static void io_req_task_queue(struct io_kiocb *req) int ret; init_task_work(&req->task_work, io_req_task_submit); + percpu_ref_get(&req->ctx->refs); ret = io_req_task_work_add(req, &req->task_work); if (unlikely(ret)) { @@ -1855,7 +1913,7 @@ static void io_req_free_batch(struct req_batch *rb, struct io_kiocb *req) req->flags &= ~REQ_F_TASK_PINNED; } - io_dismantle_req(req); + WARN_ON_ONCE(io_dismantle_req(req)); rb->reqs[rb->to_free++] = req; if (unlikely(rb->to_free == ARRAY_SIZE(rb->reqs))) __io_req_free_batch_flush(req->ctx, rb); @@ -2241,7 +2299,7 @@ static bool io_resubmit_prep(struct io_kiocb *req, int error) ret = io_import_iovec(rw, req, &iovec, &iter, false); if (ret < 0) goto end_req; - ret = io_setup_async_rw(req, ret, iovec, inline_vecs, &iter); + ret = io_setup_async_rw(req, iovec, inline_vecs, &iter, false); if (!ret) return true; kfree(iovec); @@ -2263,6 +2321,8 @@ static void io_rw_resubmit(struct callback_head *cb) refcount_inc(&req->refs); io_queue_async_work(req); } + + percpu_ref_put(&ctx->refs); } #endif @@ -2275,6 +2335,8 @@ static bool io_rw_reissue(struct io_kiocb *req, long res) return false; init_task_work(&req->task_work, io_rw_resubmit); + percpu_ref_get(&req->ctx->refs); + ret = io_req_task_work_add(req, &req->task_work); if (!ret) return true; @@ -2527,6 +2589,14 @@ static void kiocb_done(struct kiocb *kiocb, ssize_t ret, { struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb); + /* add previously done IO, if any */ + if (req->io && req->io->rw.bytes_done > 0) { + if (ret < 0) + ret = req->io->rw.bytes_done; + else + ret += req->io->rw.bytes_done; + } + if (req->flags & REQ_F_CUR_POS) req->file->f_pos = kiocb->ki_pos; if (ret >= 0 && kiocb->ki_complete == io_complete_rw) @@ -2758,6 +2828,13 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req, ssize_t ret; u8 opcode; + if (req->io) { + struct io_async_rw *iorw = &req->io->rw; + + *iovec = NULL; + return iov_iter_count(&iorw->iter); + } + opcode = req->opcode; if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) { *iovec = NULL; @@ -2783,14 +2860,6 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req, return ret < 0 ? ret : sqe_len; } - if (req->io) { - struct io_async_rw *iorw = &req->io->rw; - - iov_iter_init(iter, rw, iorw->iov, iorw->nr_segs, iorw->size); - *iovec = NULL; - return iorw->size; - } - if (req->flags & REQ_F_BUFFER_SELECT) { ret = io_iov_buffer_select(req, *iovec, needs_lock); if (!ret) { @@ -2868,21 +2937,30 @@ static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb, return ret; } -static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size, - struct iovec *iovec, struct iovec *fast_iov, - struct iov_iter *iter) +static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec, + const struct iovec *fast_iov, struct iov_iter *iter) { struct io_async_rw *rw = &req->io->rw; - rw->nr_segs = iter->nr_segs; - rw->size = io_size; + memcpy(&rw->iter, iter, sizeof(*iter)); + rw->free_iovec = NULL; + rw->bytes_done = 0; + /* can only be fixed buffers, no need to do anything */ + if (iter->type == ITER_BVEC) + return; if (!iovec) { - rw->iov = rw->fast_iov; - if (rw->iov != fast_iov) - memcpy(rw->iov, fast_iov, + unsigned iov_off = 0; + + rw->iter.iov = rw->fast_iov; + if (iter->iov != fast_iov) { + iov_off = iter->iov - fast_iov; + rw->iter.iov += iov_off; + } + if (rw->fast_iov != fast_iov) + memcpy(rw->fast_iov + iov_off, fast_iov + iov_off, sizeof(struct iovec) * iter->nr_segs); } else { - rw->iov = iovec; + rw->free_iovec = iovec; req->flags |= REQ_F_NEED_CLEANUP; } } @@ -2901,17 +2979,17 @@ static int io_alloc_async_ctx(struct io_kiocb *req) return __io_alloc_async_ctx(req); } -static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size, - struct iovec *iovec, struct iovec *fast_iov, - struct iov_iter *iter) +static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec, + const struct iovec *fast_iov, + struct iov_iter *iter, bool force) { - if (!io_op_defs[req->opcode].async_ctx) + if (!force && !io_op_defs[req->opcode].async_ctx) return 0; if (!req->io) { if (__io_alloc_async_ctx(req)) return -ENOMEM; - io_req_map_rw(req, io_size, iovec, fast_iov, iter); + io_req_map_rw(req, iovec, fast_iov, iter); } return 0; } @@ -2919,18 +2997,19 @@ static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size, static inline int io_rw_prep_async(struct io_kiocb *req, int rw, bool force_nonblock) { - struct io_async_ctx *io = req->io; - struct iov_iter iter; + struct io_async_rw *iorw = &req->io->rw; ssize_t ret; - io->rw.iov = io->rw.fast_iov; + iorw->iter.iov = iorw->fast_iov; + /* reset ->io around the iovec import, we don't want to use it */ req->io = NULL; - ret = io_import_iovec(rw, req, &io->rw.iov, &iter, !force_nonblock); - req->io = io; + ret = io_import_iovec(rw, req, (struct iovec **) &iorw->iter.iov, + &iorw->iter, !force_nonblock); + req->io = container_of(iorw, struct io_async_ctx, rw); if (unlikely(ret < 0)) return ret; - io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter); + io_req_map_rw(req, iorw->iter.iov, iorw->fast_iov, &iorw->iter); return 0; } @@ -2952,6 +3031,16 @@ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, return io_rw_prep_async(req, READ, force_nonblock); } +/* + * This is our waitqueue callback handler, registered through lock_page_async() + * when we initially tried to do the IO with the iocb armed our waitqueue. + * This gets called when the page is unlocked, and we generally expect that to + * happen when the page IO is completed and the page is now uptodate. This will + * queue a task_work based retry of the operation, attempting to copy the data + * again. If the latter fails because the page was NOT uptodate, then we will + * do a thread based blocking retry of the operation. That's the unexpected + * slow path. + */ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode, int sync, void *arg) { @@ -2965,13 +3054,11 @@ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode, if (!wake_page_match(wpq, key)) return 0; - /* Stop waking things up if the page is locked again */ - if (test_bit(key->bit_nr, &key->page->flags)) - return -1; - list_del_init(&wait->entry); init_task_work(&req->task_work, io_req_task_submit); + percpu_ref_get(&req->ctx->refs); + /* submit ref gets dropped, acquire a new one */ refcount_inc(&req->refs); ret = io_req_task_work_add(req, &req->task_work); @@ -3008,7 +3095,18 @@ static inline int kiocb_wait_page_queue_init(struct kiocb *kiocb, return -EOPNOTSUPP; } - +/* + * This controls whether a given IO request should be armed for async page + * based retry. If we return false here, the request is handed to the async + * worker threads for retry. If we're doing buffered reads on a regular file, + * we prepare a private wait_page_queue entry and retry the operation. This + * will either succeed because the page is now uptodate and unlocked, or it + * will register a callback when the page is unlocked at IO completion. Through + * that callback, io_uring uses task_work to setup a retry of the operation. + * That retry will attempt the buffered read again. The retry will generally + * succeed, or in rare cases where it fails, we then fall back to using the + * async worker threads for a blocking retry. + */ static bool io_rw_should_retry(struct io_kiocb *req) { struct kiocb *kiocb = &req->rw.kiocb; @@ -3018,8 +3116,8 @@ static bool io_rw_should_retry(struct io_kiocb *req) if (req->flags & REQ_F_NOWAIT) return false; - /* already tried, or we're doing O_DIRECT */ - if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ)) + /* Only for buffered IO */ + if (kiocb->ki_flags & IOCB_DIRECT) return false; /* * just use poll if we can, and don't attempt if the fs doesn't @@ -3028,13 +3126,6 @@ static bool io_rw_should_retry(struct io_kiocb *req) if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC)) return false; - /* - * If request type doesn't require req->io to defer in general, - * we need to allocate it here - */ - if (!req->io && __io_alloc_async_ctx(req)) - return false; - ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq, io_async_buf_func, req); if (!ret) { @@ -3049,7 +3140,10 @@ static int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter) { if (req->file->f_op->read_iter) return call_read_iter(req->file, &req->rw.kiocb, iter); - return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter); + else if (req->file->f_op->read) + return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter); + else + return -EINVAL; } static int io_read(struct io_kiocb *req, bool force_nonblock, @@ -3057,16 +3151,19 @@ static int io_read(struct io_kiocb *req, bool force_nonblock, { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; struct kiocb *kiocb = &req->rw.kiocb; - struct iov_iter iter; - size_t iov_count; + struct iov_iter __iter, *iter = &__iter; ssize_t io_size, ret, ret2; - unsigned long nr_segs; + size_t iov_count; + + if (req->io) + iter = &req->io->rw.iter; - ret = io_import_iovec(READ, req, &iovec, &iter, !force_nonblock); + ret = io_import_iovec(READ, req, &iovec, iter, !force_nonblock); if (ret < 0) return ret; io_size = ret; req->result = io_size; + ret = 0; /* Ensure we clear previously set non-block flag */ if (!force_nonblock) @@ -3076,40 +3173,70 @@ static int io_read(struct io_kiocb *req, bool force_nonblock, if (force_nonblock && !io_file_supports_async(req->file, READ)) goto copy_iov; - iov_count = iov_iter_count(&iter); - nr_segs = iter.nr_segs; + iov_count = iov_iter_count(iter); ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count); if (unlikely(ret)) goto out_free; - ret2 = io_iter_do_read(req, &iter); + ret = io_iter_do_read(req, iter); - /* Catch -EAGAIN return for forced non-blocking submission */ - if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) { - kiocb_done(kiocb, ret2, cs); - } else { - iter.count = iov_count; - iter.nr_segs = nr_segs; -copy_iov: - ret = io_setup_async_rw(req, io_size, iovec, inline_vecs, - &iter); + if (!ret) { + goto done; + } else if (ret == -EIOCBQUEUED) { + ret = 0; + goto out_free; + } else if (ret == -EAGAIN) { + if (!force_nonblock) + goto done; + ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false); if (ret) goto out_free; - /* it's copied and will be cleaned with ->io */ - iovec = NULL; - /* if we can retry, do so with the callbacks armed */ - if (io_rw_should_retry(req)) { - ret2 = io_iter_do_read(req, &iter); - if (ret2 == -EIOCBQUEUED) { - goto out_free; - } else if (ret2 != -EAGAIN) { - kiocb_done(kiocb, ret2, cs); - goto out_free; - } - } + return -EAGAIN; + } else if (ret < 0) { + goto out_free; + } + + /* read it all, or we did blocking attempt. no retry. */ + if (!iov_iter_count(iter) || !force_nonblock || + (req->file->f_flags & O_NONBLOCK)) + goto done; + + io_size -= ret; +copy_iov: + ret2 = io_setup_async_rw(req, iovec, inline_vecs, iter, true); + if (ret2) { + ret = ret2; + goto out_free; + } + /* it's copied and will be cleaned with ->io */ + iovec = NULL; + /* now use our persistent iterator, if we aren't already */ + iter = &req->io->rw.iter; +retry: + req->io->rw.bytes_done += ret; + /* if we can retry, do so with the callbacks armed */ + if (!io_rw_should_retry(req)) { kiocb->ki_flags &= ~IOCB_WAITQ; return -EAGAIN; } + + /* + * Now retry read with the IOCB_WAITQ parts set in the iocb. If we + * get -EIOCBQUEUED, then we'll get a notification when the desired + * page gets unlocked. We can also get a partial read here, and if we + * do, then just retry at the new offset. + */ + ret = io_iter_do_read(req, iter); + if (ret == -EIOCBQUEUED) { + ret = 0; + goto out_free; + } else if (ret > 0 && ret < io_size) { + /* we got some bytes, but not all. retry. */ + goto retry; + } +done: + kiocb_done(kiocb, ret, cs); + ret = 0; out_free: if (iovec) kfree(iovec); @@ -3139,12 +3266,14 @@ static int io_write(struct io_kiocb *req, bool force_nonblock, { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; struct kiocb *kiocb = &req->rw.kiocb; - struct iov_iter iter; + struct iov_iter __iter, *iter = &__iter; size_t iov_count; ssize_t ret, ret2, io_size; - unsigned long nr_segs; - ret = io_import_iovec(WRITE, req, &iovec, &iter, !force_nonblock); + if (req->io) + iter = &req->io->rw.iter; + + ret = io_import_iovec(WRITE, req, &iovec, iter, !force_nonblock); if (ret < 0) return ret; io_size = ret; @@ -3163,8 +3292,7 @@ static int io_write(struct io_kiocb *req, bool force_nonblock, (req->flags & REQ_F_ISREG)) goto copy_iov; - iov_count = iov_iter_count(&iter); - nr_segs = iter.nr_segs; + iov_count = iov_iter_count(iter); ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count); if (unlikely(ret)) goto out_free; @@ -3185,9 +3313,11 @@ static int io_write(struct io_kiocb *req, bool force_nonblock, kiocb->ki_flags |= IOCB_WRITE; if (req->file->f_op->write_iter) - ret2 = call_write_iter(req->file, kiocb, &iter); + ret2 = call_write_iter(req->file, kiocb, iter); + else if (req->file->f_op->write) + ret2 = loop_rw_iter(WRITE, req->file, kiocb, iter); else - ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter); + ret2 = -EINVAL; /* * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just @@ -3198,16 +3328,10 @@ static int io_write(struct io_kiocb *req, bool force_nonblock, if (!force_nonblock || ret2 != -EAGAIN) { kiocb_done(kiocb, ret2, cs); } else { - iter.count = iov_count; - iter.nr_segs = nr_segs; copy_iov: - ret = io_setup_async_rw(req, io_size, iovec, inline_vecs, - &iter); - if (ret) - goto out_free; - /* it's copied and will be cleaned with ->io */ - iovec = NULL; - return -EAGAIN; + ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false); + if (!ret) + return -EAGAIN; } out_free: if (iovec) @@ -4488,6 +4612,8 @@ static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll, req->result = mask; init_task_work(&req->task_work, func); + percpu_ref_get(&req->ctx->refs); + /* * If this fails, then the task is exiting. When a task exits, the * work gets canceled, so just cancel this request as well instead @@ -4526,9 +4652,24 @@ static bool io_poll_rewait(struct io_kiocb *req, struct io_poll_iocb *poll) return false; } -static void io_poll_remove_double(struct io_kiocb *req, void *data) +static struct io_poll_iocb *io_poll_get_double(struct io_kiocb *req) { - struct io_poll_iocb *poll = data; + /* pure poll stashes this in ->io, poll driven retry elsewhere */ + if (req->opcode == IORING_OP_POLL_ADD) + return (struct io_poll_iocb *) req->io; + return req->apoll->double_poll; +} + +static struct io_poll_iocb *io_poll_get_single(struct io_kiocb *req) +{ + if (req->opcode == IORING_OP_POLL_ADD) + return &req->poll; + return &req->apoll->poll; +} + +static void io_poll_remove_double(struct io_kiocb *req) +{ + struct io_poll_iocb *poll = io_poll_get_double(req); lockdep_assert_held(&req->ctx->completion_lock); @@ -4548,7 +4689,7 @@ static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error) { struct io_ring_ctx *ctx = req->ctx; - io_poll_remove_double(req, req->io); + io_poll_remove_double(req); req->poll.done = true; io_cqring_fill_event(req, error ? error : mangle_poll(mask)); io_commit_cqring(ctx); @@ -4575,18 +4716,20 @@ static void io_poll_task_handler(struct io_kiocb *req, struct io_kiocb **nxt) static void io_poll_task_func(struct callback_head *cb) { struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); + struct io_ring_ctx *ctx = req->ctx; struct io_kiocb *nxt = NULL; io_poll_task_handler(req, &nxt); if (nxt) __io_req_task_submit(nxt); + percpu_ref_put(&ctx->refs); } static int io_poll_double_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { struct io_kiocb *req = wait->private; - struct io_poll_iocb *poll = req->apoll->double_poll; + struct io_poll_iocb *poll = io_poll_get_single(req); __poll_t mask = key_to_poll(key); /* for instances that support it check for an event match first: */ @@ -4600,6 +4743,8 @@ static int io_poll_double_wake(struct wait_queue_entry *wait, unsigned mode, done = list_empty(&poll->wait.entry); if (!done) list_del_init(&poll->wait.entry); + /* make sure double remove sees this as being gone */ + wait->private = NULL; spin_unlock(&poll->head->lock); if (!done) __io_async_wake(req, poll, mask, io_poll_task_func); @@ -4675,6 +4820,7 @@ static void io_async_task_func(struct callback_head *cb) if (io_poll_rewait(req, &apoll->poll)) { spin_unlock_irq(&ctx->completion_lock); + percpu_ref_put(&ctx->refs); return; } @@ -4682,7 +4828,7 @@ static void io_async_task_func(struct callback_head *cb) if (hash_hashed(&req->hash_node)) hash_del(&req->hash_node); - io_poll_remove_double(req, apoll->double_poll); + io_poll_remove_double(req); spin_unlock_irq(&ctx->completion_lock); if (!READ_ONCE(apoll->poll.canceled)) @@ -4690,6 +4836,7 @@ static void io_async_task_func(struct callback_head *cb) else __io_req_task_cancel(req, -ECANCELED); + percpu_ref_put(&ctx->refs); kfree(apoll->double_poll); kfree(apoll); } @@ -4791,8 +4938,8 @@ static bool io_arm_poll_handler(struct io_kiocb *req) ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask, io_async_wake); - if (ret) { - io_poll_remove_double(req, apoll->double_poll); + if (ret || ipt.error) { + io_poll_remove_double(req); spin_unlock_irq(&ctx->completion_lock); kfree(apoll->double_poll); kfree(apoll); @@ -4824,14 +4971,13 @@ static bool io_poll_remove_one(struct io_kiocb *req) { bool do_complete; + io_poll_remove_double(req); + if (req->opcode == IORING_OP_POLL_ADD) { - io_poll_remove_double(req, req->io); do_complete = __io_poll_remove_one(req, &req->poll); } else { struct async_poll *apoll = req->apoll; - io_poll_remove_double(req, apoll->double_poll); - /* non-poll requests have submit ref still */ do_complete = __io_poll_remove_one(req, &apoll->poll); if (do_complete) { @@ -4845,6 +4991,7 @@ static bool io_poll_remove_one(struct io_kiocb *req) io_cqring_fill_event(req, -ECANCELED); io_commit_cqring(req->ctx); req->flags |= REQ_F_COMP_LOCKED; + req_set_fail_links(req); io_put_req(req); } @@ -5017,6 +5164,23 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer) return HRTIMER_NORESTART; } +static int __io_timeout_cancel(struct io_kiocb *req) +{ + int ret; + + list_del_init(&req->timeout.list); + + ret = hrtimer_try_to_cancel(&req->io->timeout.timer); + if (ret == -1) + return -EALREADY; + + req_set_fail_links(req); + req->flags |= REQ_F_COMP_LOCKED; + io_cqring_fill_event(req, -ECANCELED); + io_put_req(req); + return 0; +} + static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data) { struct io_kiocb *req; @@ -5024,7 +5188,6 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data) list_for_each_entry(req, &ctx->timeout_list, timeout.list) { if (user_data == req->user_data) { - list_del_init(&req->timeout.list); ret = 0; break; } @@ -5033,14 +5196,7 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data) if (ret == -ENOENT) return ret; - ret = hrtimer_try_to_cancel(&req->io->timeout.timer); - if (ret == -1) - return -EALREADY; - - req_set_fail_links(req); - io_cqring_fill_event(req, -ECANCELED); - io_put_req(req); - return 0; + return __io_timeout_cancel(req); } static int io_timeout_remove_prep(struct io_kiocb *req, @@ -5481,8 +5637,8 @@ static void __io_clean_op(struct io_kiocb *req) case IORING_OP_WRITEV: case IORING_OP_WRITE_FIXED: case IORING_OP_WRITE: - if (io->rw.iov != io->rw.fast_iov) - kfree(io->rw.iov); + if (io->rw.free_iovec) + kfree(io->rw.free_iovec); break; case IORING_OP_RECVMSG: case IORING_OP_SENDMSG: @@ -5917,15 +6073,12 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer) return HRTIMER_NORESTART; } -static void io_queue_linked_timeout(struct io_kiocb *req) +static void __io_queue_linked_timeout(struct io_kiocb *req) { - struct io_ring_ctx *ctx = req->ctx; - /* * If the list is now empty, then our linked request finished before * we got a chance to setup the timer */ - spin_lock_irq(&ctx->completion_lock); if (!list_empty(&req->link_list)) { struct io_timeout_data *data = &req->io->timeout; @@ -5933,6 +6086,14 @@ static void io_queue_linked_timeout(struct io_kiocb *req) hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode); } +} + +static void io_queue_linked_timeout(struct io_kiocb *req) +{ + struct io_ring_ctx *ctx = req->ctx; + + spin_lock_irq(&ctx->completion_lock); + __io_queue_linked_timeout(req); spin_unlock_irq(&ctx->completion_lock); /* drop submission reference */ @@ -7837,6 +7998,71 @@ static bool io_wq_files_match(struct io_wq_work *work, void *data) return work->files == files; } +/* + * Returns true if 'preq' is the link parent of 'req' + */ +static bool io_match_link(struct io_kiocb *preq, struct io_kiocb *req) +{ + struct io_kiocb *link; + + if (!(preq->flags & REQ_F_LINK_HEAD)) + return false; + + list_for_each_entry(link, &preq->link_list, link_list) { + if (link == req) + return true; + } + + return false; +} + +/* + * We're looking to cancel 'req' because it's holding on to our files, but + * 'req' could be a link to another request. See if it is, and cancel that + * parent request if so. + */ +static bool io_poll_remove_link(struct io_ring_ctx *ctx, struct io_kiocb *req) +{ + struct hlist_node *tmp; + struct io_kiocb *preq; + bool found = false; + int i; + + spin_lock_irq(&ctx->completion_lock); + for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) { + struct hlist_head *list; + + list = &ctx->cancel_hash[i]; + hlist_for_each_entry_safe(preq, tmp, list, hash_node) { + found = io_match_link(preq, req); + if (found) { + io_poll_remove_one(preq); + break; + } + } + } + spin_unlock_irq(&ctx->completion_lock); + return found; +} + +static bool io_timeout_remove_link(struct io_ring_ctx *ctx, + struct io_kiocb *req) +{ + struct io_kiocb *preq; + bool found = false; + + spin_lock_irq(&ctx->completion_lock); + list_for_each_entry(preq, &ctx->timeout_list, timeout.list) { + found = io_match_link(preq, req); + if (found) { + __io_timeout_cancel(preq); + break; + } + } + spin_unlock_irq(&ctx->completion_lock); + return found; +} + static void io_uring_cancel_files(struct io_ring_ctx *ctx, struct files_struct *files) { @@ -7891,6 +8117,9 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx, } } else { io_wq_cancel_work(ctx->io_wq, &cancel_req->work); + /* could be a link, check and remove if it is */ + if (!io_poll_remove_link(ctx, cancel_req)) + io_timeout_remove_link(ctx, cancel_req); io_put_req(cancel_req); } @@ -8171,6 +8400,10 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx, struct io_rings *rings; size_t size, sq_array_offset; + /* make sure these are sane, as we already accounted them */ + ctx->sq_entries = p->sq_entries; + ctx->cq_entries = p->cq_entries; + size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset); if (size == SIZE_MAX) return -EOVERFLOW; @@ -8187,8 +8420,6 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx, rings->cq_ring_entries = p->cq_entries; ctx->sq_mask = rings->sq_ring_mask; ctx->cq_mask = rings->cq_ring_mask; - ctx->sq_entries = rings->sq_ring_entries; - ctx->cq_entries = rings->cq_ring_entries; size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); if (size == SIZE_MAX) { @@ -8317,6 +8548,16 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, ctx->user = user; ctx->creds = get_current_cred(); + /* + * Account memory _before_ installing the file descriptor. Once + * the descriptor is installed, it can get closed at any time. Also + * do this before hitting the general error path, as ring freeing + * will un-account as well. + */ + io_account_mem(ctx, ring_pages(p->sq_entries, p->cq_entries), + ACCT_LOCKED); + ctx->limit_mem = limit_mem; + ret = io_allocate_scq_urings(ctx, p); if (ret) goto err; @@ -8354,14 +8595,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, } /* - * Account memory _before_ installing the file descriptor. Once - * the descriptor is installed, it can get closed at any time. - */ - io_account_mem(ctx, ring_pages(p->sq_entries, p->cq_entries), - ACCT_LOCKED); - ctx->limit_mem = limit_mem; - - /* * Install ring fd as the very last thing, so we don't risk someone * having closed it before we finish setup */ |