diff options
-rw-r--r-- | src/bftw.c | 23 | ||||
-rw-r--r-- | src/ioq.c | 307 |
2 files changed, 288 insertions, 42 deletions
@@ -470,21 +470,34 @@ static int bftw_state_init(struct bftw_state *state, const struct bftw_args *arg state->error = 0; - if (args->nopenfd < 1) { + if (args->nopenfd < 2) { errno = EMFILE; return -1; } - bftw_cache_init(&state->cache, args->nopenfd); - state->nthreads = args->nthreads; - if (state->nthreads > 0) { - state->ioq = ioq_create(4096, state->nthreads); + size_t nopenfd = args->nopenfd; + size_t qdepth = 4096; + size_t nthreads = args->nthreads; + +#if BFS_USE_LIBURING + // io_uring uses one fd per ring, ioq uses one ring per thread + if (nthreads >= nopenfd - 1) { + nthreads = nopenfd - 2; + } + nopenfd -= nthreads; +#endif + + bftw_cache_init(&state->cache, nopenfd); + + if (nthreads > 0) { + state->ioq = ioq_create(qdepth, nthreads); if (!state->ioq) { return -1; } } else { state->ioq = NULL; } + state->nthreads = nthreads; SLIST_INIT(&state->to_open); SLIST_INIT(&state->to_read); @@ -16,6 +16,10 @@ #include <pthread.h> #include <stdlib.h> +#if BFS_USE_LIBURING +# include <liburing.h> +#endif + /** * A monitor for an I/O queue slot. */ @@ -280,6 +284,21 @@ static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) { /** Sentinel stop command. */ static struct ioq_ent IOQ_STOP; +/** I/O queue thread-specific data. */ +struct ioq_thread { + /** The thread handle. */ + pthread_t id; + /** Pointer back to the I/O queue. */ + struct ioq *parent; + +#if BFS_USE_LIBURING + /** io_uring instance. */ + struct io_uring ring; + /** Any error that occurred initializing the ring. */ + int ring_err; +#endif +}; + struct ioq { /** The depth of the queue. */ size_t depth; @@ -299,60 +318,247 @@ struct ioq { /** The number of background threads. */ size_t nthreads; /** The background threads themselves. */ - pthread_t threads[]; + struct ioq_thread threads[]; }; -/** Background thread entry point. */ -static void *ioq_work(void *ptr) { - struct ioq *ioq = ptr; +/** Cancel a request if we need to. */ +static bool ioq_check_cancel(struct ioq *ioq, struct ioq_ent *ent) { + if (!load(&ioq->cancel, relaxed)) { + return false; + } - while (true) { - struct ioq_ent *ent = ioqq_pop(ioq->pending); - if (ent == &IOQ_STOP) { - break; + // Always close(), even if we're cancelled, just like a real EINTR + if (ent->op == IOQ_CLOSE || ent->op == IOQ_CLOSEDIR) { + return false; + } + + ent->ret = -1; + ent->error = EINTR; + ioqq_push(ioq->ready, ent); + return true; +} + +/** Handle a single request synchronously. */ +static void ioq_handle(struct ioq *ioq, struct ioq_ent *ent) { + int ret; + + switch (ent->op) { + case IOQ_CLOSE: + ret = xclose(ent->close.fd); + break; + + case IOQ_OPENDIR: + ret = bfs_opendir(ent->opendir.dir, ent->opendir.dfd, ent->opendir.path); + if (ret == 0) { + bfs_polldir(ent->opendir.dir); } + break; + + case IOQ_CLOSEDIR: + ret = bfs_closedir(ent->closedir.dir); + break; + + default: + bfs_bug("Unknown ioq_op %d", (int)ent->op); + ret = -1; + errno = ENOSYS; + break; + } + + ent->ret = ret; + ent->error = ret == 0 ? 0 : errno; + + ioqq_push(ioq->ready, ent); +} - bool cancel = load(&ioq->cancel, relaxed); +#if BFS_USE_LIBURING +/** io_uring worker state. */ +struct ioq_ring_state { + /** The I/O queue. */ + struct ioq *ioq; + /** The io_uring. */ + struct io_uring *ring; + /** The current ioq->pending slot. */ + ioq_slot *slot; + /** Number of prepped, unsubmitted SQEs. */ + size_t prepped; + /** Number of submitted, unreaped SQEs. */ + size_t submitted; + /** Whether to stop the loop. */ + bool stop; +}; + +/** Pop a request for ioq_ring_prep(). */ +static struct ioq_ent *ioq_ring_pop(struct ioq_ring_state *state) { + if (state->stop) { + return NULL; + } + + // Advance to the next slot if necessary + struct ioq *ioq = state->ioq; + if (!state->slot) { + state->slot = ioqq_read(ioq->pending); + } + + // Block if we have nothing else to do + bool block = !state->prepped && !state->submitted; + struct ioq_ent *ret = ioq_slot_pop(ioq->pending, state->slot, block); + + if (ret) { + // Got an entry, move to the next slot next time + state->slot = NULL; + } + + if (ret == &IOQ_STOP) { + state->stop = true; + ret = NULL; + } + + return ret; +} - ent->ret = -1; +/** Prep a single SQE. */ +static void ioq_prep_sqe(struct io_uring_sqe *sqe, struct ioq_ent *ent) { + switch (ent->op) { + case IOQ_CLOSE: + io_uring_prep_close(sqe, ent->close.fd); + break; + + case IOQ_OPENDIR: + io_uring_prep_openat(sqe, ent->opendir.dfd, ent->opendir.path, O_RDONLY | O_CLOEXEC | O_DIRECTORY, 0); + break; + +#if BFS_USE_UNWRAPDIR + case IOQ_CLOSEDIR: + io_uring_prep_close(sqe, bfs_unwrapdir(ent->closedir.dir)); + break; +#endif + + default: + bfs_bug("Unknown ioq_op %d", (int)ent->op); + io_uring_prep_nop(sqe); + break; + } - switch (ent->op) { - case IOQ_CLOSE: - // Always close(), even if we're cancelled, just like a real EINTR - ent->ret = xclose(ent->close.fd); + io_uring_sqe_set_data(sqe, ent); +} + +/** Prep a batch of SQEs. */ +static bool ioq_ring_prep(struct ioq_ring_state *state) { + struct ioq *ioq = state->ioq; + struct io_uring *ring = state->ring; + + while (io_uring_sq_space_left(ring)) { + struct ioq_ent *ent = ioq_ring_pop(state); + if (!ent) { break; + } + + if (ioq_check_cancel(ioq, ent)) { + continue; + } - case IOQ_OPENDIR: - if (!cancel) { - struct ioq_opendir *args = &ent->opendir; - ent->ret = bfs_opendir(args->dir, args->dfd, args->path); - if (ent->ret == 0) { - bfs_polldir(args->dir); - } +#if !BFS_USE_UNWRAPDIR + if (ent->op == IOQ_CLOSEDIR) { + ioq_handle(ioq, ent); + continue; + } +#endif + + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + ioq_prep_sqe(sqe, ent); + ++state->prepped; + } + + return state->prepped || state->submitted; +} + +/** Reap a batch of SQEs. */ +static void ioq_ring_reap(struct ioq_ring_state *state) { + struct ioq *ioq = state->ioq; + struct io_uring *ring = state->ring; + + while (state->prepped) { + int ret = io_uring_submit_and_wait(ring, 1); + if (ret > 0) { + state->prepped -= ret; + state->submitted += ret; + } + } + + while (state->submitted) { + struct io_uring_cqe *cqe; + if (io_uring_wait_cqe(ring, &cqe) < 0) { + continue; + } + + struct ioq_ent *ent = io_uring_cqe_get_data(cqe); + ent->ret = cqe->res >= 0 ? cqe->res : -1; + ent->error = cqe->res < 0 ? -cqe->res : 0; + io_uring_cqe_seen(ring, cqe); + --state->submitted; + + if (ent->op == IOQ_OPENDIR && ent->ret >= 0) { + int fd = ent->ret; + if (ioq_check_cancel(ioq, ent)) { + xclose(fd); + continue; } - break; - case IOQ_CLOSEDIR: - ent->ret = bfs_closedir(ent->closedir.dir); - break; + ent->ret = bfs_opendir(ent->opendir.dir, fd, NULL); + if (ent->ret == 0) { + // TODO: io_uring_prep_getdents() + bfs_polldir(ent->opendir.dir); + } else { + ent->error = errno; + } + } + + ioqq_push(ioq->ready, ent); + } +} + +/** io_uring worker loop. */ +static void ioq_ring_work(struct ioq_thread *thread) { + struct ioq_ring_state state = { + .ioq = thread->parent, + .ring = &thread->ring, + }; - default: - bfs_bug("Unknown ioq_op %d", (int)ent->op); - errno = ENOSYS; + while (ioq_ring_prep(&state)) { + ioq_ring_reap(&state); + } +} +#endif + +/** Synchronous syscall loop. */ +static void ioq_sync_work(struct ioq_thread *thread) { + struct ioq *ioq = thread->parent; + + while (true) { + struct ioq_ent *ent = ioqq_pop(ioq->pending); + if (ent == &IOQ_STOP) { break; } - if (cancel) { - ent->error = EINTR; - } else if (ent->ret < 0) { - ent->error = errno; - } else { - ent->error = 0; + if (!ioq_check_cancel(ioq, ent)) { + ioq_handle(ioq, ent); } + } +} - ioqq_push(ioq->ready, ent); +/** Background thread entry point. */ +static void *ioq_work(void *ptr) { + struct ioq_thread *thread = ptr; + +#if BFS_USE_LIBURING + if (thread->ring_err == 0) { + ioq_ring_work(thread); + return NULL; } +#endif + ioq_sync_work(thread); return NULL; } @@ -376,7 +582,30 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) { } for (size_t i = 0; i < nthreads; ++i) { - if (thread_create(&ioq->threads[i], NULL, ioq_work, ioq) != 0) { + struct ioq_thread *thread = &ioq->threads[i]; + thread->parent = ioq; + +#if BFS_USE_LIBURING + struct ioq_thread *prev = i ? &ioq->threads[i - 1] : NULL; + if (prev && prev->ring_err) { + thread->ring_err = prev->ring_err; + } else { + // Share io-wq workers between rings + struct io_uring_params params = {0}; + if (prev) { + params.flags |= IORING_SETUP_ATTACH_WQ; + params.wq_fd = prev->ring.ring_fd; + } + + size_t entries = depth / nthreads; + if (entries < 16) { + entries = 16; + } + thread->ring_err = -io_uring_queue_init_params(entries, &thread->ring, ¶ms); + } +#endif + + if (thread_create(&thread->id, NULL, ioq_work, thread) != 0) { goto fail; } ++ioq->nthreads; @@ -496,7 +725,11 @@ void ioq_destroy(struct ioq *ioq) { ioq_cancel(ioq); for (size_t i = 0; i < ioq->nthreads; ++i) { - thread_join(ioq->threads[i], NULL); + struct ioq_thread *thread = &ioq->threads[i]; + thread_join(thread->id, NULL); +#if BFS_USE_LIBURING + io_uring_queue_exit(&thread->ring); +#endif } ioqq_destroy(ioq->ready); |