diff options
-rw-r--r-- | src/ioq.c | 110 |
1 files changed, 73 insertions, 37 deletions
@@ -712,6 +712,75 @@ static void *ioq_work(void *ptr) { return NULL; } +/** Initialize io_uring thread state. */ +static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) { +#if BFS_USE_LIBURING + struct ioq_thread *prev = NULL; + if (thread > ioq->threads) { + prev = thread - 1; + } + + if (prev && prev->ring_err) { + thread->ring_err = prev->ring_err; + return -1; + } + + // Share io-wq workers between rings + struct io_uring_params params = {0}; + if (prev) { + params.flags |= IORING_SETUP_ATTACH_WQ; + params.wq_fd = prev->ring.ring_fd; + } + + // Use a page for each SQE ring + size_t entries = 4096 / sizeof(struct io_uring_sqe); + thread->ring_err = -io_uring_queue_init_params(entries, &thread->ring, ¶ms); + if (thread->ring_err) { + return -1; + } + + if (!prev) { + // Limit the number of io_uring workers + unsigned int values[] = { + [IO_WQ_BOUND] = ioq->nthreads, + [IO_WQ_UNBOUND] = 0, + }; + io_uring_register_iowq_max_workers(&thread->ring, values); + } +#endif + + return 0; +} + +/** Destroy an io_uring. */ +static void ioq_ring_exit(struct ioq_thread *thread) { +#if BFS_USE_LIBURING + if (thread->ring_err == 0) { + io_uring_queue_exit(&thread->ring); + } +#endif +} + +/** Create an I/O queue thread. */ +static int ioq_thread_create(struct ioq *ioq, struct ioq_thread *thread) { + thread->parent = ioq; + + ioq_ring_init(ioq, thread); + + if (thread_create(&thread->id, NULL, ioq_work, thread) != 0) { + ioq_ring_exit(thread); + return -1; + } + + return 0; +} + +/** Join an I/O queue thread. */ +static void ioq_thread_join(struct ioq_thread *thread) { + thread_join(thread->id, NULL); + ioq_ring_exit(thread); +} + struct ioq *ioq_create(size_t depth, size_t nthreads) { struct ioq *ioq = ZALLOC_FLEX(struct ioq, threads, nthreads); if (!ioq) { @@ -736,41 +805,12 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) { goto fail; } + ioq->nthreads = nthreads; for (size_t i = 0; i < nthreads; ++i) { - struct ioq_thread *thread = &ioq->threads[i]; - thread->parent = ioq; - -#if BFS_USE_LIBURING - struct ioq_thread *prev = i ? &ioq->threads[i - 1] : NULL; - if (prev && prev->ring_err) { - thread->ring_err = prev->ring_err; - } else { - // Share io-wq workers between rings - struct io_uring_params params = {0}; - if (prev) { - params.flags |= IORING_SETUP_ATTACH_WQ; - params.wq_fd = prev->ring.ring_fd; - } - - // Use a page for each SQE ring - size_t entries = 4096 / sizeof(struct io_uring_sqe); - thread->ring_err = -io_uring_queue_init_params(entries, &thread->ring, ¶ms); - - if (!prev && thread->ring_err == 0) { - // Limit the number of io_uring workers - unsigned int values[] = { - [IO_WQ_BOUND] = nthreads, - [IO_WQ_UNBOUND] = 0, - }; - io_uring_register_iowq_max_workers(&thread->ring, values); - } - } -#endif - - if (thread_create(&thread->id, NULL, ioq_work, thread) != 0) { + if (ioq_thread_create(ioq, &ioq->threads[i]) != 0) { + ioq->nthreads = i; goto fail; } - ++ioq->nthreads; } return ioq; @@ -908,11 +948,7 @@ void ioq_destroy(struct ioq *ioq) { ioq_cancel(ioq); for (size_t i = 0; i < ioq->nthreads; ++i) { - struct ioq_thread *thread = &ioq->threads[i]; - thread_join(thread->id, NULL); -#if BFS_USE_LIBURING - io_uring_queue_exit(&thread->ring); -#endif + ioq_thread_join(&ioq->threads[i]); } ioqq_destroy(ioq->ready); |