summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/ioq.c110
1 files changed, 73 insertions, 37 deletions
diff --git a/src/ioq.c b/src/ioq.c
index 0126f5c..3c26814 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -712,6 +712,75 @@ static void *ioq_work(void *ptr) {
return NULL;
}
+/** Initialize io_uring thread state. */
+static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) {
+#if BFS_USE_LIBURING
+ struct ioq_thread *prev = NULL;
+ if (thread > ioq->threads) {
+ prev = thread - 1;
+ }
+
+ if (prev && prev->ring_err) {
+ thread->ring_err = prev->ring_err;
+ return -1;
+ }
+
+ // Share io-wq workers between rings
+ struct io_uring_params params = {0};
+ if (prev) {
+ params.flags |= IORING_SETUP_ATTACH_WQ;
+ params.wq_fd = prev->ring.ring_fd;
+ }
+
+ // Use a page for each SQE ring
+ size_t entries = 4096 / sizeof(struct io_uring_sqe);
+ thread->ring_err = -io_uring_queue_init_params(entries, &thread->ring, &params);
+ if (thread->ring_err) {
+ return -1;
+ }
+
+ if (!prev) {
+ // Limit the number of io_uring workers
+ unsigned int values[] = {
+ [IO_WQ_BOUND] = ioq->nthreads,
+ [IO_WQ_UNBOUND] = 0,
+ };
+ io_uring_register_iowq_max_workers(&thread->ring, values);
+ }
+#endif
+
+ return 0;
+}
+
+/** Destroy an io_uring. */
+static void ioq_ring_exit(struct ioq_thread *thread) {
+#if BFS_USE_LIBURING
+ if (thread->ring_err == 0) {
+ io_uring_queue_exit(&thread->ring);
+ }
+#endif
+}
+
+/** Create an I/O queue thread. */
+static int ioq_thread_create(struct ioq *ioq, struct ioq_thread *thread) {
+ thread->parent = ioq;
+
+ ioq_ring_init(ioq, thread);
+
+ if (thread_create(&thread->id, NULL, ioq_work, thread) != 0) {
+ ioq_ring_exit(thread);
+ return -1;
+ }
+
+ return 0;
+}
+
+/** Join an I/O queue thread. */
+static void ioq_thread_join(struct ioq_thread *thread) {
+ thread_join(thread->id, NULL);
+ ioq_ring_exit(thread);
+}
+
struct ioq *ioq_create(size_t depth, size_t nthreads) {
struct ioq *ioq = ZALLOC_FLEX(struct ioq, threads, nthreads);
if (!ioq) {
@@ -736,41 +805,12 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) {
goto fail;
}
+ ioq->nthreads = nthreads;
for (size_t i = 0; i < nthreads; ++i) {
- struct ioq_thread *thread = &ioq->threads[i];
- thread->parent = ioq;
-
-#if BFS_USE_LIBURING
- struct ioq_thread *prev = i ? &ioq->threads[i - 1] : NULL;
- if (prev && prev->ring_err) {
- thread->ring_err = prev->ring_err;
- } else {
- // Share io-wq workers between rings
- struct io_uring_params params = {0};
- if (prev) {
- params.flags |= IORING_SETUP_ATTACH_WQ;
- params.wq_fd = prev->ring.ring_fd;
- }
-
- // Use a page for each SQE ring
- size_t entries = 4096 / sizeof(struct io_uring_sqe);
- thread->ring_err = -io_uring_queue_init_params(entries, &thread->ring, &params);
-
- if (!prev && thread->ring_err == 0) {
- // Limit the number of io_uring workers
- unsigned int values[] = {
- [IO_WQ_BOUND] = nthreads,
- [IO_WQ_UNBOUND] = 0,
- };
- io_uring_register_iowq_max_workers(&thread->ring, values);
- }
- }
-#endif
-
- if (thread_create(&thread->id, NULL, ioq_work, thread) != 0) {
+ if (ioq_thread_create(ioq, &ioq->threads[i]) != 0) {
+ ioq->nthreads = i;
goto fail;
}
- ++ioq->nthreads;
}
return ioq;
@@ -908,11 +948,7 @@ void ioq_destroy(struct ioq *ioq) {
ioq_cancel(ioq);
for (size_t i = 0; i < ioq->nthreads; ++i) {
- struct ioq_thread *thread = &ioq->threads[i];
- thread_join(thread->id, NULL);
-#if BFS_USE_LIBURING
- io_uring_queue_exit(&thread->ring);
-#endif
+ ioq_thread_join(&ioq->threads[i]);
}
ioqq_destroy(ioq->ready);