diff options
Diffstat (limited to 'src/ioq.c')
-rw-r--r-- | src/ioq.c | 604 |
1 files changed, 417 insertions, 187 deletions
@@ -118,24 +118,27 @@ * [1]: https://arxiv.org/abs/2201.02179 */ -#include "prelude.h" #include "ioq.h" + #include "alloc.h" #include "atomic.h" +#include "bfs.h" #include "bfstd.h" #include "bit.h" #include "diag.h" #include "dir.h" #include "stat.h" #include "thread.h" + #include <errno.h> #include <fcntl.h> #include <pthread.h> #include <stdint.h> #include <stdlib.h> #include <sys/stat.h> +#include <unistd.h> -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING # include <liburing.h> #endif @@ -180,8 +183,7 @@ typedef atomic uintptr_t ioq_slot; /** Amount to add for an additional skip. */ #define IOQ_SKIP_ONE (~IOQ_BLOCKED) -// Need room for two flag bits -bfs_static_assert(alignof(struct ioq_ent) >= (1 << 2)); +static_assert(alignof(struct ioq_ent) >= (1 << 2), "struct ioq_ent is underaligned"); /** * An MPMC queue of I/O commands. @@ -201,7 +203,7 @@ struct ioqq { cache_align atomic size_t tail; /** The circular buffer itself. */ - cache_align ioq_slot slots[]; + cache_align ioq_slot slots[]; // _counted_by(slot_mask + 1) }; /** Destroy an I/O command queue. */ @@ -258,17 +260,45 @@ static struct ioqq *ioqq_create(size_t size) { /** Get the monitor associated with a slot. */ static struct ioq_monitor *ioq_slot_monitor(struct ioqq *ioqq, ioq_slot *slot) { - size_t i = slot - ioqq->slots; + uint32_t i = slot - ioqq->slots; + + // Hash the index to de-correlate waiters + // https://nullprogram.com/blog/2018/07/31/ + // https://github.com/skeeto/hash-prospector/issues/19#issuecomment-1120105785 + i ^= i >> 16; + i *= UINT32_C(0x21f0aaad); + i ^= i >> 15; + i *= UINT32_C(0x735a2d97); + i ^= i >> 15; + return &ioqq->monitors[i & ioqq->monitor_mask]; } /** Atomically wait for a slot to change. */ -attr(noinline) +_noinline static uintptr_t ioq_slot_wait(struct ioqq *ioqq, ioq_slot *slot, uintptr_t value) { + uintptr_t ret; + + // Try spinning a few times (with exponential backoff) before blocking + _nounroll + for (int i = 1; i < 1024; i *= 2) { + _nounroll + for (int j = 0; j < i; ++j) { + spin_loop(); + } + + // Check if the slot changed + ret = load(slot, relaxed); + if (ret != value) { + return ret; + } + } + + // Nothing changed, start blocking struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot); mutex_lock(&monitor->mutex); - uintptr_t ret = load(slot, relaxed); + ret = load(slot, relaxed); if (ret != value) { goto done; } @@ -293,7 +323,7 @@ done: } /** Wake up any threads waiting on a slot. */ -attr(noinline) +_noinline static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) { struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot); @@ -313,9 +343,11 @@ static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) { cond_broadcast(&monitor->cond); } -/** Branch-free (slot & IOQ_SKIP) ? ~IOQ_BLOCKED : 0 */ -static uintptr_t ioq_skip_mask(uintptr_t slot) { - return -(slot >> IOQ_SKIP_BIT) << 1; +/** Branch-free ((slot & IOQ_SKIP) ? skip : full) & ~IOQ_BLOCKED */ +static uintptr_t ioq_slot_blend(uintptr_t slot, uintptr_t skip, uintptr_t full) { + uintptr_t mask = -(slot >> IOQ_SKIP_BIT); + uintptr_t ret = (skip & mask) | (full & ~mask); + return ret & ~IOQ_BLOCKED; } /** Push an entry into a slot. */ @@ -323,19 +355,18 @@ static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent uintptr_t prev = load(slot, relaxed); while (true) { - size_t skip_mask = ioq_skip_mask(prev); - size_t full_mask = ~skip_mask & ~IOQ_BLOCKED; - if (prev & full_mask) { + uintptr_t full = ioq_slot_blend(prev, 0, prev); + if (full) { // full(ptr) → wait prev = ioq_slot_wait(ioqq, slot, prev); continue; } // empty → full(ptr) - uintptr_t next = ((uintptr_t)ent >> 1) & full_mask; + uintptr_t next = (uintptr_t)ent >> 1; // skip(1) → empty // skip(n) → skip(n - 1) - next |= (prev - IOQ_SKIP_ONE) & skip_mask; + next = ioq_slot_blend(prev, prev - IOQ_SKIP_ONE, next); if (compare_exchange_weak(slot, &prev, next, release, relaxed)) { break; @@ -353,13 +384,20 @@ static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) { uintptr_t prev = load(slot, relaxed); while (true) { +#if __has_builtin(__builtin_prefetch) + // Optimistically prefetch the pointer in this slot. If this + // slot is not full, this will prefetch an invalid address, but + // experimentally this is worth it on both Intel (Alder Lake) + // and AMD (Zen 2). + __builtin_prefetch((void *)(prev << 1), 1 /* write */); +#endif + // empty → skip(1) // skip(n) → skip(n + 1) // full(ptr) → full(ptr - 1) uintptr_t next = prev + IOQ_SKIP_ONE; - // skip(n) → ~IOQ_BLOCKED // full(ptr) → 0 - next &= ioq_skip_mask(next); + next = ioq_slot_blend(next, next, 0); if (block && next) { prev = ioq_slot_wait(ioqq, slot, prev); @@ -378,7 +416,7 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc // empty → 0 // skip(n) → 0 // full(ptr) → ptr - prev &= ioq_skip_mask(~prev); + prev = ioq_slot_blend(prev, 0, prev); return (struct ioq_ent *)(prev << 1); } @@ -408,13 +446,6 @@ static void ioqq_push_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t s } while (size > 0); } -/** Pop an entry from the queue. */ -static struct ioq_ent *ioqq_pop(struct ioqq *ioqq, bool block) { - size_t i = fetch_add(&ioqq->tail, 1, relaxed); - ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask]; - return ioq_slot_pop(ioqq, slot, block); -} - /** Pop a batch of entries from the queue. */ static void ioqq_pop_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t size, bool block) { size_t mask = ioqq->slot_mask; @@ -430,36 +461,83 @@ static void ioqq_pop_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t si #define IOQ_BATCH (FALSE_SHARING_SIZE / sizeof(ioq_slot)) /** - * A batch of entries to send all at once. + * A batch of I/O queue entries. */ struct ioq_batch { - /** The current batch size. */ - size_t size; + /** The start of the batch. */ + size_t head; + /** The end of the batch. */ + size_t tail; /** The array of entries. */ struct ioq_ent *entries[IOQ_BATCH]; }; -/** Send the batch to a queue. */ +/** Reset a batch. */ +static void ioq_batch_reset(struct ioq_batch *batch) { + batch->head = batch->tail = 0; +} + +/** Check if a batch is empty. */ +static bool ioq_batch_empty(const struct ioq_batch *batch) { + return batch->head >= batch->tail; +} + +/** Send a batch to a queue. */ static void ioq_batch_flush(struct ioqq *ioqq, struct ioq_batch *batch) { - if (batch->size > 0) { - ioqq_push_batch(ioqq, batch->entries, batch->size); - batch->size = 0; + if (batch->tail > 0) { + ioqq_push_batch(ioqq, batch->entries, batch->tail); + ioq_batch_reset(batch); } } -/** An an entry to a batch, flushing if necessary. */ +/** Push an entry to a batch, flushing if necessary. */ static void ioq_batch_push(struct ioqq *ioqq, struct ioq_batch *batch, struct ioq_ent *ent) { - if (batch->size >= IOQ_BATCH) { + batch->entries[batch->tail++] = ent; + + if (batch->tail >= IOQ_BATCH) { ioq_batch_flush(ioqq, batch); } +} + +/** Fill a batch from a queue. */ +static bool ioq_batch_fill(struct ioqq *ioqq, struct ioq_batch *batch, bool block) { + ioqq_pop_batch(ioqq, batch->entries, IOQ_BATCH, block); + + ioq_batch_reset(batch); + for (size_t i = 0; i < IOQ_BATCH; ++i) { + struct ioq_ent *ent = batch->entries[i]; + if (ent) { + batch->entries[batch->tail++] = ent; + } + } + + return batch->tail > 0; +} + +/** Pop an entry from a batch, filling it first if necessary. */ +static struct ioq_ent *ioq_batch_pop(struct ioqq *ioqq, struct ioq_batch *batch, bool block) { + if (ioq_batch_empty(batch)) { + // For non-blocking pops, make sure that each ioq_batch_pop() + // corresponds to a single (amortized) increment of ioqq->head. + // Otherwise, we start skipping many slots and batching ends up + // degrading performance. + if (!block && batch->head < IOQ_BATCH) { + ++batch->head; + return NULL; + } + + if (!ioq_batch_fill(ioqq, batch, block)) { + return NULL; + } + } - batch->entries[batch->size++] = ent; + return batch->entries[batch->head++]; } /** Sentinel stop command. */ static struct ioq_ent IOQ_STOP; -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING /** * Supported io_uring operations. */ @@ -477,7 +555,7 @@ struct ioq_thread { /** Pointer back to the I/O queue. */ struct ioq *parent; -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING /** io_uring instance. */ struct io_uring ring; /** Any error that occurred initializing the ring. */ @@ -497,20 +575,25 @@ struct ioq { /** ioq_ent arena. */ struct arena ents; -#if BFS_USE_LIBURING && BFS_USE_STATX +#if BFS_WITH_LIBURING && BFS_USE_STATX /** struct statx arena. */ struct arena xbufs; #endif - /** Pending I/O requests. */ + /** Pending I/O request queue. */ struct ioqq *pending; - /** Ready I/O responses. */ + /** Ready I/O response queue. */ struct ioqq *ready; + /** Pending request batch. */ + struct ioq_batch pending_batch; + /** Ready request batch. */ + struct ioq_batch ready_batch; + /** The number of background threads. */ size_t nthreads; /** The background threads themselves. */ - struct ioq_thread threads[]; + struct ioq_thread threads[] _counted_by(nthreads); }; /** Cancel a request if we need to. */ @@ -531,6 +614,14 @@ static bool ioq_check_cancel(struct ioq *ioq, struct ioq_ent *ent) { /** Dispatch a single request synchronously. */ static void ioq_dispatch_sync(struct ioq *ioq, struct ioq_ent *ent) { switch (ent->op) { + case IOQ_NOP: + if (ent->nop.type == IOQ_NOP_HEAVY) { + // A fast, no-op syscall + getppid(); + } + ent->result = 0; + return; + case IOQ_CLOSE: ent->result = try(xclose(ent->close.fd)); return; @@ -559,7 +650,7 @@ static void ioq_dispatch_sync(struct ioq *ioq, struct ioq_ent *ent) { ent->result = -ENOSYS; } -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING /** io_uring worker state. */ struct ioq_ring_state { @@ -579,23 +670,161 @@ struct ioq_ring_state { struct ioq_batch ready; }; +/** Reap a single CQE. */ +static void ioq_reap_cqe(struct ioq_ring_state *state, struct io_uring_cqe *cqe) { + struct ioq *ioq = state->ioq; + + struct ioq_ent *ent = io_uring_cqe_get_data(cqe); + ent->result = cqe->res; + + if (ent->result < 0) { + goto push; + } + + switch (ent->op) { + case IOQ_OPENDIR: { + int fd = ent->result; + if (ioq_check_cancel(ioq, ent)) { + xclose(fd); + goto push; + } + + struct ioq_opendir *args = &ent->opendir; + ent->result = try(bfs_opendir(args->dir, fd, NULL, args->flags)); + if (ent->result >= 0) { + // TODO: io_uring_prep_getdents() + bfs_polldir(args->dir); + } else { + xclose(fd); + } + + break; + } + +#if BFS_USE_STATX + case IOQ_STAT: { + struct ioq_stat *args = &ent->stat; + ent->result = try(bfs_statx_convert(args->buf, args->xbuf)); + break; + } +#endif + + default: + break; + } + +push: + ioq_batch_push(ioq->ready, &state->ready, ent); +} + +/** Wait for submitted requests to complete. */ +static void ioq_ring_drain(struct ioq_ring_state *state, size_t wait_nr) { + struct ioq *ioq = state->ioq; + struct io_uring *ring = state->ring; + + bfs_assert(wait_nr <= state->submitted); + + while (state->submitted > 0) { + struct io_uring_cqe *cqe; + if (wait_nr > 0) { + io_uring_wait_cqes(ring, &cqe, wait_nr, NULL, NULL); + } + + unsigned int head; + size_t seen = 0; + io_uring_for_each_cqe (ring, head, cqe) { + ioq_reap_cqe(state, cqe); + ++seen; + } + + io_uring_cq_advance(ring, seen); + state->submitted -= seen; + + if (seen >= wait_nr) { + break; + } + wait_nr -= seen; + } + + ioq_batch_flush(ioq->ready, &state->ready); +} + +/** Submit prepped SQEs, and wait for some to complete. */ +static void ioq_ring_submit(struct ioq_ring_state *state) { + struct io_uring *ring = state->ring; + + size_t unreaped = state->prepped + state->submitted; + size_t wait_nr = 0; + + if (state->prepped == 0 && unreaped > 0) { + // If we have no new SQEs, wait for at least one old one to + // complete, to avoid livelock + wait_nr = 1; + } + + if (unreaped > ring->sq.ring_entries) { + // Keep the completion queue below half full + wait_nr = unreaped - ring->sq.ring_entries; + } + + // Submit all prepped SQEs + while (state->prepped > 0) { + int ret = io_uring_submit_and_wait(state->ring, wait_nr); + if (ret <= 0) { + continue; + } + + state->submitted += ret; + state->prepped -= ret; + if (state->prepped > 0) { + // In the unlikely event of a short submission, any SQE + // links will be broken. Wait for all SQEs to complete + // to preserve any ordering requirements. + ioq_ring_drain(state, state->submitted); + wait_nr = 0; + } + } + + // Drain all the CQEs we waited for (and any others that are ready) + ioq_ring_drain(state, wait_nr); +} + +/** Reserve space for a number of SQEs, submitting if necessary. */ +static void ioq_reserve_sqes(struct ioq_ring_state *state, unsigned int count) { + while (io_uring_sq_space_left(state->ring) < count) { + ioq_ring_submit(state); + } +} + +/** Get an SQE, submitting if necessary. */ +static struct io_uring_sqe *ioq_get_sqe(struct ioq_ring_state *state) { + ioq_reserve_sqes(state, 1); + return io_uring_get_sqe(state->ring); +} + /** Dispatch a single request asynchronously. */ static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, struct ioq_ent *ent) { - struct io_uring *ring = state->ring; enum ioq_ring_ops ops = state->ops; struct io_uring_sqe *sqe = NULL; switch (ent->op) { + case IOQ_NOP: + if (ent->nop.type == IOQ_NOP_HEAVY) { + sqe = ioq_get_sqe(state); + io_uring_prep_nop(sqe); + } + return sqe; + case IOQ_CLOSE: if (ops & IOQ_RING_CLOSE) { - sqe = io_uring_get_sqe(ring); + sqe = ioq_get_sqe(state); io_uring_prep_close(sqe, ent->close.fd); } return sqe; case IOQ_OPENDIR: if (ops & IOQ_RING_OPENAT) { - sqe = io_uring_get_sqe(ring); + sqe = ioq_get_sqe(state); struct ioq_opendir *args = &ent->opendir; int flags = O_RDONLY | O_CLOEXEC | O_DIRECTORY; io_uring_prep_openat(sqe, args->dfd, args->path, flags, 0); @@ -605,7 +834,7 @@ static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, str case IOQ_CLOSEDIR: #if BFS_USE_UNWRAPDIR if (ops & IOQ_RING_CLOSE) { - sqe = io_uring_get_sqe(ring); + sqe = ioq_get_sqe(state); io_uring_prep_close(sqe, bfs_unwrapdir(ent->closedir.dir)); } #endif @@ -614,10 +843,10 @@ static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, str case IOQ_STAT: #if BFS_USE_STATX if (ops & IOQ_RING_STATX) { - sqe = io_uring_get_sqe(ring); + sqe = ioq_get_sqe(state); struct ioq_stat *args = &ent->stat; int flags = bfs_statx_flags(args->flags); - unsigned int mask = STATX_BASIC_STATS | STATX_BTIME; + unsigned int mask = bfs_statx_mask(); io_uring_prep_statx(sqe, args->dfd, args->path, flags, mask, args->xbuf); } #endif @@ -630,7 +859,7 @@ static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, str /** Check if ioq_ring_reap() has work to do. */ static bool ioq_ring_empty(struct ioq_ring_state *state) { - return !state->prepped && !state->submitted && !state->ready.size; + return !state->prepped && !state->submitted && ioq_batch_empty(&state->ready); } /** Prep a single SQE. */ @@ -658,163 +887,94 @@ static bool ioq_ring_prep(struct ioq_ring_state *state) { } struct ioq *ioq = state->ioq; - struct io_uring *ring = state->ring; - struct ioq_ent *pending[IOQ_BATCH]; - - while (io_uring_sq_space_left(ring) >= IOQ_BATCH) { - bool block = ioq_ring_empty(state); - ioqq_pop_batch(ioq->pending, pending, IOQ_BATCH, block); - - bool any = false; - for (size_t i = 0; i < IOQ_BATCH; ++i) { - struct ioq_ent *ent = pending[i]; - if (ent == &IOQ_STOP) { - ioqq_push(ioq->pending, &IOQ_STOP); - state->stop = true; - goto done; - } else if (ent) { - ioq_prep_sqe(state, ent); - any = true; - } - } - - if (!any) { - break; - } - } - -done: - return !ioq_ring_empty(state); -} - -/** Reap a single CQE. */ -static void ioq_reap_cqe(struct ioq_ring_state *state, struct io_uring_cqe *cqe) { - struct ioq *ioq = state->ioq; - struct io_uring *ring = state->ring; - - struct ioq_ent *ent = io_uring_cqe_get_data(cqe); - ent->result = cqe->res; - io_uring_cqe_seen(ring, cqe); - --state->submitted; - - if (ent->result < 0) { - goto push; - } - switch (ent->op) { - case IOQ_OPENDIR: { - int fd = ent->result; - if (ioq_check_cancel(ioq, ent)) { - xclose(fd); - goto push; - } - - struct ioq_opendir *args = &ent->opendir; - ent->result = try(bfs_opendir(args->dir, fd, NULL, args->flags)); - if (ent->result >= 0) { - // TODO: io_uring_prep_getdents() - bfs_polldir(args->dir); - } else { - xclose(fd); - } + struct ioq_batch pending; + ioq_batch_reset(&pending); + while (true) { + bool block = ioq_ring_empty(state); + struct ioq_ent *ent = ioq_batch_pop(ioq->pending, &pending, block); + if (ent == &IOQ_STOP) { + ioqq_push(ioq->pending, ent); + state->stop = true; break; - } - -#if BFS_USE_STATX - case IOQ_STAT: { - struct ioq_stat *args = &ent->stat; - ent->result = try(bfs_statx_convert(args->buf, args->xbuf)); + } else if (ent) { + ioq_prep_sqe(state, ent); + } else { break; } -#endif - - default: - break; } -push: - ioq_batch_push(ioq->ready, &state->ready, ent); + bfs_assert(ioq_batch_empty(&pending)); + return !ioq_ring_empty(state); } -/** Reap a batch of CQEs. */ -static void ioq_ring_reap(struct ioq_ring_state *state) { - struct ioq *ioq = state->ioq; - struct io_uring *ring = state->ring; +/** io_uring worker loop. */ +static int ioq_ring_work(struct ioq_thread *thread) { + struct io_uring *ring = &thread->ring; - while (state->prepped) { - int ret = io_uring_submit_and_wait(ring, 1); - if (ret > 0) { - state->prepped -= ret; - state->submitted += ret; +#ifdef IORING_SETUP_R_DISABLED + if (ring->flags & IORING_SETUP_R_DISABLED) { + if (io_uring_enable_rings(ring) != 0) { + return -1; } } +#endif - while (state->submitted) { - struct io_uring_cqe *cqe; - if (io_uring_wait_cqe(ring, &cqe) < 0) { - continue; - } - - ioq_reap_cqe(state, cqe); - } - - ioq_batch_flush(ioq->ready, &state->ready); -} - -/** io_uring worker loop. */ -static void ioq_ring_work(struct ioq_thread *thread) { struct ioq_ring_state state = { .ioq = thread->parent, - .ring = &thread->ring, + .ring = ring, .ops = thread->ring_ops, }; while (ioq_ring_prep(&state)) { - ioq_ring_reap(&state); + ioq_ring_submit(&state); } + + ioq_ring_drain(&state, state.submitted); + return 0; } -#endif // BFS_USE_LIBURING +#endif // BFS_WITH_LIBURING /** Synchronous syscall loop. */ static void ioq_sync_work(struct ioq_thread *thread) { struct ioq *ioq = thread->parent; - bool stop = false; - while (!stop) { - struct ioq_ent *pending[IOQ_BATCH]; - ioqq_pop_batch(ioq->pending, pending, IOQ_BATCH, true); - - struct ioq_batch ready; - ready.size = 0; - - for (size_t i = 0; i < IOQ_BATCH; ++i) { - struct ioq_ent *ent = pending[i]; - if (ent == &IOQ_STOP) { - ioqq_push(ioq->pending, &IOQ_STOP); - stop = true; - break; - } else if (ent) { - if (!ioq_check_cancel(ioq, ent)) { - ioq_dispatch_sync(ioq, ent); - } - ioq_batch_push(ioq->ready, &ready, ent); - } + struct ioq_batch pending, ready; + ioq_batch_reset(&pending); + ioq_batch_reset(&ready); + + while (true) { + if (ioq_batch_empty(&pending)) { + ioq_batch_flush(ioq->ready, &ready); } - ioq_batch_flush(ioq->ready, &ready); + struct ioq_ent *ent = ioq_batch_pop(ioq->pending, &pending, true); + if (ent == &IOQ_STOP) { + ioqq_push(ioq->pending, ent); + break; + } + + if (!ioq_check_cancel(ioq, ent)) { + ioq_dispatch_sync(ioq, ent); + } + ioq_batch_push(ioq->ready, &ready, ent); } + + bfs_assert(ioq_batch_empty(&pending)); + ioq_batch_flush(ioq->ready, &ready); } /** Background thread entry point. */ static void *ioq_work(void *ptr) { struct ioq_thread *thread = ptr; -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING if (thread->ring_err == 0) { - ioq_ring_work(thread); - return NULL; + if (ioq_ring_work(thread) == 0) { + return NULL; + } } #endif @@ -822,9 +982,30 @@ static void *ioq_work(void *ptr) { return NULL; } +#if BFS_WITH_LIBURING +/** Test whether some io_uring setup flags are supported. */ +static bool ioq_ring_probe_flags(struct io_uring_params *params, unsigned int flags) { + unsigned int saved = params->flags; + params->flags |= flags; + + struct io_uring ring; + int ret = io_uring_queue_init_params(2, &ring, params); + if (ret == 0) { + io_uring_queue_exit(&ring); + } + + if (ret == -EINVAL) { + params->flags = saved; + return false; + } + + return true; +} +#endif + /** Initialize io_uring thread state. */ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) { -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING struct ioq_thread *prev = NULL; if (thread > ioq->threads) { prev = thread - 1; @@ -835,11 +1016,31 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) { return -1; } - // Share io-wq workers between rings struct io_uring_params params = {0}; + if (prev) { - params.flags |= IORING_SETUP_ATTACH_WQ; + // Share io-wq workers between rings + params.flags = prev->ring.flags | IORING_SETUP_ATTACH_WQ; params.wq_fd = prev->ring.ring_fd; + } else { +#ifdef IORING_SETUP_SUBMIT_ALL + // Don't abort submission just because an inline request fails + ioq_ring_probe_flags(¶ms, IORING_SETUP_SUBMIT_ALL); +#endif + +#ifdef IORING_SETUP_R_DISABLED + // Don't enable the ring yet (needed for SINGLE_ISSUER) + if (ioq_ring_probe_flags(¶ms, IORING_SETUP_R_DISABLED)) { +# ifdef IORING_SETUP_SINGLE_ISSUER + // Allow optimizations assuming only one task submits SQEs + ioq_ring_probe_flags(¶ms, IORING_SETUP_SINGLE_ISSUER); +# endif +# ifdef IORING_SETUP_DEFER_TASKRUN + // Don't interrupt us aggressively with completion events + ioq_ring_probe_flags(¶ms, IORING_SETUP_DEFER_TASKRUN); +# endif + } +#endif } // Use a page for each SQE ring @@ -877,6 +1078,7 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) { return -1; } +#if BFS_HAS_IO_URING_MAX_WORKERS // Limit the number of io_uring workers unsigned int values[] = { ioq->nthreads, // [IO_WQ_BOUND] @@ -885,12 +1087,14 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) { io_uring_register_iowq_max_workers(&thread->ring, values); #endif +#endif // BFS_WITH_LIBURING + return 0; } /** Destroy an io_uring. */ static void ioq_ring_exit(struct ioq_thread *thread) { -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING if (thread->ring_err == 0) { io_uring_queue_exit(&thread->ring); } @@ -898,7 +1102,8 @@ static void ioq_ring_exit(struct ioq_thread *thread) { } /** Create an I/O queue thread. */ -static int ioq_thread_create(struct ioq *ioq, struct ioq_thread *thread) { +static int ioq_thread_create(struct ioq *ioq, size_t i) { + struct ioq_thread *thread = &ioq->threads[i]; thread->parent = ioq; ioq_ring_init(ioq, thread); @@ -908,6 +1113,11 @@ static int ioq_thread_create(struct ioq *ioq, struct ioq_thread *thread) { return -1; } + char name[16]; + if (snprintf(name, sizeof(name), "ioq-%zu", i) >= 0) { + thread_setname(thread->id, name); + } + return 0; } @@ -926,7 +1136,7 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) { ioq->depth = depth; ARENA_INIT(&ioq->ents, struct ioq_ent); -#if BFS_USE_LIBURING && BFS_USE_STATX +#if BFS_WITH_LIBURING && BFS_USE_STATX ARENA_INIT(&ioq->xbufs, struct statx); #endif @@ -942,7 +1152,7 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) { ioq->nthreads = nthreads; for (size_t i = 0; i < nthreads; ++i) { - if (ioq_thread_create(ioq, &ioq->threads[i]) != 0) { + if (ioq_thread_create(ioq, i) != 0) { ioq->nthreads = i; goto fail; } @@ -984,6 +1194,18 @@ static struct ioq_ent *ioq_request(struct ioq *ioq, enum ioq_op op, void *ptr) { return ent; } +int ioq_nop(struct ioq *ioq, enum ioq_nop_type type, void *ptr) { + struct ioq_ent *ent = ioq_request(ioq, IOQ_NOP, ptr); + if (!ent) { + return -1; + } + + ent->nop.type = type; + + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); + return 0; +} + int ioq_close(struct ioq *ioq, int fd, void *ptr) { struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSE, ptr); if (!ent) { @@ -992,7 +1214,7 @@ int ioq_close(struct ioq *ioq, int fd, void *ptr) { ent->close.fd = fd; - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } @@ -1008,7 +1230,7 @@ int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, args->path = path; args->flags = flags; - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } @@ -1020,7 +1242,7 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) { ent->closedir.dir = dir; - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } @@ -1036,7 +1258,7 @@ int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags fla args->flags = flags; args->buf = buf; -#if BFS_USE_LIBURING && BFS_USE_STATX +#if BFS_WITH_LIBURING && BFS_USE_STATX args->xbuf = arena_alloc(&ioq->xbufs); if (!args->xbuf) { ioq_free(ioq, ent); @@ -1044,23 +1266,30 @@ int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags fla } #endif - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } +void ioq_submit(struct ioq *ioq) { + ioq_batch_flush(ioq->pending, &ioq->pending_batch); +} + struct ioq_ent *ioq_pop(struct ioq *ioq, bool block) { + // Don't forget to submit before popping + bfs_assert(ioq_batch_empty(&ioq->pending_batch)); + if (ioq->size == 0) { return NULL; } - return ioqq_pop(ioq->ready, block); + return ioq_batch_pop(ioq->ready, &ioq->ready_batch, block); } void ioq_free(struct ioq *ioq, struct ioq_ent *ent) { bfs_assert(ioq->size > 0); --ioq->size; -#if BFS_USE_LIBURING && BFS_USE_STATX +#if BFS_WITH_LIBURING && BFS_USE_STATX if (ent->op == IOQ_STAT && ent->stat.xbuf) { arena_free(&ioq->xbufs, ent->stat.xbuf); } @@ -1071,7 +1300,8 @@ void ioq_free(struct ioq *ioq, struct ioq_ent *ent) { void ioq_cancel(struct ioq *ioq) { if (!exchange(&ioq->cancel, true, relaxed)) { - ioqq_push(ioq->pending, &IOQ_STOP); + ioq_batch_push(ioq->pending, &ioq->pending_batch, &IOQ_STOP); + ioq_submit(ioq); } } @@ -1091,7 +1321,7 @@ void ioq_destroy(struct ioq *ioq) { ioqq_destroy(ioq->ready); ioqq_destroy(ioq->pending); -#if BFS_USE_LIBURING && BFS_USE_STATX +#if BFS_WITH_LIBURING && BFS_USE_STATX arena_destroy(&ioq->xbufs); #endif arena_destroy(&ioq->ents); |