diff options
-rw-r--r-- | src/ioq.c | 85 |
1 files changed, 67 insertions, 18 deletions
@@ -458,6 +458,17 @@ static void ioq_batch_push(struct ioqq *ioqq, struct ioq_batch *batch, struct io /** Sentinel stop command. */ static struct ioq_ent IOQ_STOP; +#if BFS_USE_LIBURING +/** + * Supported io_uring operations. + */ +enum ioq_ring_ops { + IOQ_RING_OPENAT = 1 << 0, + IOQ_RING_CLOSE = 1 << 1, + IOQ_RING_STATX = 1 << 2, +}; +#endif + /** I/O queue thread-specific data. */ struct ioq_thread { /** The thread handle. */ @@ -470,6 +481,8 @@ struct ioq_thread { struct io_uring ring; /** Any error that occurred initializing the ring. */ int ring_err; + /** Bitmask of supported io_uring operations. */ + enum ioq_ring_ops ring_ops; #endif }; @@ -553,6 +566,8 @@ struct ioq_ring_state { struct ioq *ioq; /** The io_uring. */ struct io_uring *ring; + /** Supported io_uring operations. */ + enum ioq_ring_ops ops; /** Number of prepped, unsubmitted SQEs. */ size_t prepped; /** Number of submitted, unreaped SQEs. */ @@ -564,40 +579,48 @@ struct ioq_ring_state { }; /** Dispatch a single request asynchronously. */ -static struct io_uring_sqe *ioq_dispatch_async(struct io_uring *ring, struct ioq_ent *ent) { +static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, struct ioq_ent *ent) { + struct io_uring *ring = state->ring; + enum ioq_ring_ops ops = state->ops; struct io_uring_sqe *sqe = NULL; switch (ent->op) { - case IOQ_CLOSE: + case IOQ_CLOSE: + if (ops & IOQ_RING_CLOSE) { sqe = io_uring_get_sqe(ring); io_uring_prep_close(sqe, ent->close.fd); - return sqe; + } + return sqe; - case IOQ_OPENDIR: { + case IOQ_OPENDIR: + if (ops & IOQ_RING_OPENAT) { sqe = io_uring_get_sqe(ring); struct ioq_opendir *args = &ent->opendir; int flags = O_RDONLY | O_CLOEXEC | O_DIRECTORY; io_uring_prep_openat(sqe, args->dfd, args->path, flags, 0); - return sqe; } + return sqe; - case IOQ_CLOSEDIR: + case IOQ_CLOSEDIR: #if BFS_USE_UNWRAPDIR + if (ops & IOQ_RING_CLOSE) { sqe = io_uring_get_sqe(ring); io_uring_prep_close(sqe, bfs_unwrapdir(ent->closedir.dir)); + } #endif - return sqe; + return sqe; - case IOQ_STAT: { + case IOQ_STAT: #if BFS_USE_STATX + if (ops & IOQ_RING_STATX) { sqe = io_uring_get_sqe(ring); struct ioq_stat *args = &ent->stat; int flags = bfs_statx_flags(args->flags); unsigned int mask = STATX_BASIC_STATS | STATX_BTIME; io_uring_prep_statx(sqe, args->dfd, args->path, flags, mask, args->xbuf); -#endif - return sqe; } +#endif + return sqe; } bfs_bug("Unknown ioq_op %d", (int)ent->op); @@ -617,7 +640,7 @@ static void ioq_prep_sqe(struct ioq_ring_state *state, struct ioq_ent *ent) { return; } - struct io_uring_sqe *sqe = ioq_dispatch_async(state->ring, ent); + struct io_uring_sqe *sqe = ioq_dispatch_async(state, ent); if (sqe) { io_uring_sqe_set_data(sqe, ent); ++state->prepped; @@ -743,6 +766,7 @@ static void ioq_ring_work(struct ioq_thread *thread) { struct ioq_ring_state state = { .ioq = thread->parent, .ring = &thread->ring, + .ops = thread->ring_ops, }; while (ioq_ring_prep(&state)) { @@ -824,14 +848,39 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) { return -1; } - if (!prev) { - // Limit the number of io_uring workers - unsigned int values[] = { - ioq->nthreads, // [IO_WQ_BOUND] - 0, // [IO_WQ_UNBOUND] - }; - io_uring_register_iowq_max_workers(&thread->ring, values); + if (prev) { + // Initial setup already complete + return 0; + } + + // Check for supported operations + struct io_uring_probe *probe = io_uring_get_probe_ring(&thread->ring); + if (probe) { + if (io_uring_opcode_supported(probe, IORING_OP_OPENAT)) { + thread->ring_ops |= IOQ_RING_OPENAT; + } + if (io_uring_opcode_supported(probe, IORING_OP_CLOSE)) { + thread->ring_ops |= IOQ_RING_CLOSE; + } +#if BFS_USE_STATX + if (io_uring_opcode_supported(probe, IORING_OP_STATX)) { + thread->ring_ops |= IOQ_RING_STATX; + } +#endif + io_uring_free_probe(probe); } + if (!thread->ring_ops) { + io_uring_queue_exit(&thread->ring); + thread->ring_err = ENOTSUP; + return -1; + } + + // Limit the number of io_uring workers + unsigned int values[] = { + ioq->nthreads, // [IO_WQ_BOUND] + 0, // [IO_WQ_UNBOUND] + }; + io_uring_register_iowq_max_workers(&thread->ring, values); #endif return 0; |