diff options
Diffstat (limited to 'src/ioq.c')
-rw-r--r-- | src/ioq.c | 239 |
1 files changed, 145 insertions, 94 deletions
@@ -645,30 +645,161 @@ struct ioq_ring_state { struct ioq_batch ready; }; +/** Reap a single CQE. */ +static void ioq_reap_cqe(struct ioq_ring_state *state, struct io_uring_cqe *cqe) { + struct ioq *ioq = state->ioq; + + struct ioq_ent *ent = io_uring_cqe_get_data(cqe); + ent->result = cqe->res; + + if (ent->result < 0) { + goto push; + } + + switch (ent->op) { + case IOQ_OPENDIR: { + int fd = ent->result; + if (ioq_check_cancel(ioq, ent)) { + xclose(fd); + goto push; + } + + struct ioq_opendir *args = &ent->opendir; + ent->result = try(bfs_opendir(args->dir, fd, NULL, args->flags)); + if (ent->result >= 0) { + // TODO: io_uring_prep_getdents() + bfs_polldir(args->dir); + } else { + xclose(fd); + } + + break; + } + +#if BFS_USE_STATX + case IOQ_STAT: { + struct ioq_stat *args = &ent->stat; + ent->result = try(bfs_statx_convert(args->buf, args->xbuf)); + break; + } +#endif + + default: + break; + } + +push: + ioq_batch_push(ioq->ready, &state->ready, ent); +} + +/** Wait for submitted requests to complete. */ +static void ioq_ring_drain(struct ioq_ring_state *state, size_t wait_nr) { + struct ioq *ioq = state->ioq; + struct io_uring *ring = state->ring; + + bfs_assert(wait_nr <= state->submitted); + + while (state->submitted > 0) { + struct io_uring_cqe *cqe; + if (wait_nr > 0) { + io_uring_wait_cqes(ring, &cqe, wait_nr, NULL, NULL); + } + + unsigned int head; + size_t seen = 0; + io_uring_for_each_cqe (ring, head, cqe) { + ioq_reap_cqe(state, cqe); + ++seen; + } + + io_uring_cq_advance(ring, seen); + state->submitted -= seen; + + if (seen >= wait_nr) { + break; + } + wait_nr -= seen; + } + + ioq_batch_flush(ioq->ready, &state->ready); +} + +/** Submit prepped SQEs, and wait for some to complete. */ +static void ioq_ring_submit(struct ioq_ring_state *state) { + struct io_uring *ring = state->ring; + + size_t unreaped = state->prepped + state->submitted; + size_t wait_nr = 0; + + if (state->prepped == 0 && unreaped > 0) { + // If we have no new SQEs, wait for at least one old one to + // complete, to avoid livelock + wait_nr = 1; + } + + if (unreaped > ring->sq.ring_entries) { + // Keep the completion queue below half full + wait_nr = unreaped - ring->sq.ring_entries; + } + + // Submit all prepped SQEs + while (state->prepped > 0) { + int ret = io_uring_submit_and_wait(state->ring, wait_nr); + if (ret <= 0) { + continue; + } + + state->submitted += ret; + state->prepped -= ret; + if (state->prepped > 0) { + // In the unlikely event of a short submission, any SQE + // links will be broken. Wait for all SQEs to complete + // to preserve any ordering requirements. + ioq_ring_drain(state, state->submitted); + wait_nr = 0; + } + } + + // Drain all the CQEs we waited for (and any others that are ready) + ioq_ring_drain(state, wait_nr); +} + +/** Reserve space for a number of SQEs, submitting if necessary. */ +static void ioq_reserve_sqes(struct ioq_ring_state *state, unsigned int count) { + while (io_uring_sq_space_left(state->ring) < count) { + ioq_ring_submit(state); + } +} + +/** Get an SQE, submitting if necessary. */ +static struct io_uring_sqe *ioq_get_sqe(struct ioq_ring_state *state) { + ioq_reserve_sqes(state, 1); + return io_uring_get_sqe(state->ring); +} + /** Dispatch a single request asynchronously. */ static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, struct ioq_ent *ent) { - struct io_uring *ring = state->ring; enum ioq_ring_ops ops = state->ops; struct io_uring_sqe *sqe = NULL; switch (ent->op) { case IOQ_NOP: if (ent->nop.type == IOQ_NOP_HEAVY) { - sqe = io_uring_get_sqe(ring); + sqe = ioq_get_sqe(state); io_uring_prep_nop(sqe); } return sqe; case IOQ_CLOSE: if (ops & IOQ_RING_CLOSE) { - sqe = io_uring_get_sqe(ring); + sqe = ioq_get_sqe(state); io_uring_prep_close(sqe, ent->close.fd); } return sqe; case IOQ_OPENDIR: if (ops & IOQ_RING_OPENAT) { - sqe = io_uring_get_sqe(ring); + sqe = ioq_get_sqe(state); struct ioq_opendir *args = &ent->opendir; int flags = O_RDONLY | O_CLOEXEC | O_DIRECTORY; io_uring_prep_openat(sqe, args->dfd, args->path, flags, 0); @@ -678,7 +809,7 @@ static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, str case IOQ_CLOSEDIR: #if BFS_USE_UNWRAPDIR if (ops & IOQ_RING_CLOSE) { - sqe = io_uring_get_sqe(ring); + sqe = ioq_get_sqe(state); io_uring_prep_close(sqe, bfs_unwrapdir(ent->closedir.dir)); } #endif @@ -687,7 +818,7 @@ static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, str case IOQ_STAT: #if BFS_USE_STATX if (ops & IOQ_RING_STATX) { - sqe = io_uring_get_sqe(ring); + sqe = ioq_get_sqe(state); struct ioq_stat *args = &ent->stat; int flags = bfs_statx_flags(args->flags); unsigned int mask = STATX_BASIC_STATS | STATX_BTIME; @@ -731,110 +862,28 @@ static bool ioq_ring_prep(struct ioq_ring_state *state) { } struct ioq *ioq = state->ioq; - struct io_uring *ring = state->ring; struct ioq_batch pending; ioq_batch_reset(&pending); while (true) { - if (ioq_batch_empty(&pending)) { - if (io_uring_sq_space_left(ring) < IOQ_BATCH) { - break; - } - } - bool block = ioq_ring_empty(state); struct ioq_ent *ent = ioq_batch_pop(ioq->pending, &pending, block); - if (!ent) { - break; - } else if (ent == &IOQ_STOP) { + if (ent == &IOQ_STOP) { ioqq_push(ioq->pending, ent); state->stop = true; break; + } else if (ent) { + ioq_prep_sqe(state, ent); + } else { + break; } - - ioq_prep_sqe(state, ent); } bfs_assert(ioq_batch_empty(&pending)); return !ioq_ring_empty(state); } -/** Reap a single CQE. */ -static void ioq_reap_cqe(struct ioq_ring_state *state, struct io_uring_cqe *cqe) { - struct ioq *ioq = state->ioq; - struct io_uring *ring = state->ring; - - struct ioq_ent *ent = io_uring_cqe_get_data(cqe); - ent->result = cqe->res; - io_uring_cqe_seen(ring, cqe); - --state->submitted; - - if (ent->result < 0) { - goto push; - } - - switch (ent->op) { - case IOQ_OPENDIR: { - int fd = ent->result; - if (ioq_check_cancel(ioq, ent)) { - xclose(fd); - goto push; - } - - struct ioq_opendir *args = &ent->opendir; - ent->result = try(bfs_opendir(args->dir, fd, NULL, args->flags)); - if (ent->result >= 0) { - // TODO: io_uring_prep_getdents() - bfs_polldir(args->dir); - } else { - xclose(fd); - } - - break; - } - -#if BFS_USE_STATX - case IOQ_STAT: { - struct ioq_stat *args = &ent->stat; - ent->result = try(bfs_statx_convert(args->buf, args->xbuf)); - break; - } -#endif - - default: - break; - } - -push: - ioq_batch_push(ioq->ready, &state->ready, ent); -} - -/** Reap a batch of CQEs. */ -static void ioq_ring_reap(struct ioq_ring_state *state) { - struct ioq *ioq = state->ioq; - struct io_uring *ring = state->ring; - - while (state->prepped) { - int ret = io_uring_submit_and_wait(ring, 1); - if (ret > 0) { - state->prepped -= ret; - state->submitted += ret; - } - } - - while (state->submitted) { - struct io_uring_cqe *cqe; - if (io_uring_wait_cqe(ring, &cqe) < 0) { - continue; - } - - ioq_reap_cqe(state, cqe); - } - - ioq_batch_flush(ioq->ready, &state->ready); -} - /** io_uring worker loop. */ static void ioq_ring_work(struct ioq_thread *thread) { struct ioq_ring_state state = { @@ -844,8 +893,10 @@ static void ioq_ring_work(struct ioq_thread *thread) { }; while (ioq_ring_prep(&state)) { - ioq_ring_reap(&state); + ioq_ring_submit(&state); } + + ioq_ring_drain(&state, state.submitted); } #endif // BFS_WITH_LIBURING |