diff options
Diffstat (limited to 'src/ioq.c')
-rw-r--r-- | src/ioq.c | 1076 |
1 files changed, 949 insertions, 127 deletions
@@ -1,20 +1,146 @@ // Copyright © Tavian Barnes <tavianator@tavianator.com> // SPDX-License-Identifier: 0BSD +/** + * An asynchronous I/O queue implementation. + * + * struct ioq is composed of two separate queues: + * + * struct ioqq *pending; // Pending I/O requests + * struct ioqq *ready; // Ready I/O responses + * + * Worker threads pop requests from `pending`, execute them, and push them back + * to the `ready` queue. The main thread pushes requests to `pending` and pops + * them from `ready`. + * + * struct ioqq is a blocking MPMC queue (though it could be SPMC/MPSC for + * pending/ready respectively). It is implemented as a circular buffer: + * + * size_t mask; // (1 << N) - 1 + * [padding] + * size_t head; // Writer index + * [padding] + * size_t tail; // Reader index + * [padding] + * ioq_slot slots[1 << N]; // Queue contents + * + * Pushes are implemented with an unconditional + * + * fetch_add(&ioqq->head, 1) + * + * which scales better on many architectures than compare-and-swap (see [1] for + * details). Pops are implemented similarly. Since the fetch-and-adds are + * unconditional, non-blocking readers can get ahead of writers: + * + * Reader Writer + * ──────────────── ────────────────────── + * head: 0 → 1 + * slots[0]: empty + * tail: 0 → 1 + * slots[0]: empty → full + * head: 1 → 2 + * slots[1]: empty! + * + * To avoid this, non-blocking reads (ioqq_pop(ioqq, false)) must mark the slots + * somehow so that writers can skip them: + * + * Reader Writer + * ─────────────────────── ─────────────────────── + * head: 0 → 1 + * slots[0]: empty → skip + * tail: 0 → 1 + * slots[0]: skip → empty + * tail: 1 → 2 + * slots[1]: empty → full + * head: 1 → 2 + * slots[1]: full → empty + * + * As well, a reader might "lap" a writer (or another reader), so slots need to + * count how many times they should be skipped: + * + * Reader Writer + * ────────────────────────── ───────────────────────── + * head: 0 → 1 + * slots[0]: empty → skip(1) + * head: 1 → 2 + * slots[1]: empty → skip(1) + * ... + * head: M → 0 + * slots[M]: empty → skip(1) + * head: 0 → 1 + * slots[0]: skip(1 → 2) + * tail: 0 → 1 + * slots[0]: skip(2 → 1) + * tail: 1 → 2 + * slots[1]: skip(1) → empty + * ... + * tail: M → 0 + * slots[M]: skip(1) → empty + * tail: 0 → 1 + * slots[0]: skip(1) → empty + * tail: 1 → 2 + * slots[1]: empty → full + * head: 1 → 2 + * slots[1]: full → empty + * + * As described in [1], this approach is susceptible to livelock if readers stay + * ahead of writers. This is okay for us because we don't retry failed non- + * blocking reads. + * + * The slot representation uses tag bits to hold either a pointer or skip(N): + * + * IOQ_SKIP (highest bit) IOQ_BLOCKED (lowest bit) + * ↓ ↓ + * 0 0 0 ... 0 0 0 + * └──────────┬──────────┘ + * │ + * value bits + * + * If IOQ_SKIP is unset, the value bits hold a pointer (or zero/NULL for empty). + * If IOQ_SKIP is set, the value bits hold a negative skip count. Writers can + * reduce the skip count by adding 1 to the value bits, and when the count hits + * zero, the carry will automatically clear IOQ_SKIP: + * + * IOQ_SKIP IOQ_BLOCKED + * ↓ ↓ + * 1 1 1 ... 1 0 0 skip(2) + * 1 1 1 ... 1 1 0 skip(1) + * 0 0 0 ... 0 0 0 empty + * + * The IOQ_BLOCKED flag is used to track sleeping waiters, futex-style. To wait + * for a slot to change, waiters call ioq_slot_wait() which sets IOQ_BLOCKED and + * goes to sleep. Whenever a slot is updated, if the old value had IOQ_BLOCKED + * set, ioq_slot_wake() must be called to wake up that waiter. + * + * Blocking/waking uses a pool of monitors (mutex, condition variable pairs). + * Slots are assigned round-robin to a monitor from the pool. + * + * [1]: https://arxiv.org/abs/2201.02179 + */ + #include "ioq.h" + #include "alloc.h" #include "atomic.h" +#include "bfs.h" #include "bfstd.h" #include "bit.h" -#include "config.h" #include "diag.h" #include "dir.h" +#include "stat.h" #include "thread.h" -#include "sanity.h" -#include <assert.h> + #include <errno.h> +#include <fcntl.h> #include <pthread.h> +#include <stdint.h> #include <stdlib.h> +#include <sys/stat.h> +#include <unistd.h> + +#if BFS_WITH_LIBURING +# include <liburing.h> +#endif /** * A monitor for an I/O queue slot. @@ -47,24 +173,17 @@ static void ioq_monitor_destroy(struct ioq_monitor *monitor) { /** A single entry in a command queue. */ typedef atomic uintptr_t ioq_slot; -/** Slot flag bit to indicate waiters. */ +/** Someone might be waiting on this slot. */ #define IOQ_BLOCKED ((uintptr_t)1) -bfs_static_assert(alignof(struct ioq_ent) > 1); -/** Check if a slot has waiters. */ -static bool ioq_slot_blocked(uintptr_t value) { - return value & IOQ_BLOCKED; -} +/** Bit for IOQ_SKIP. */ +#define IOQ_SKIP_BIT (UINTPTR_WIDTH - 1) +/** The next push(es) should skip this slot. */ +#define IOQ_SKIP ((uintptr_t)1 << IOQ_SKIP_BIT) +/** Amount to add for an additional skip. */ +#define IOQ_SKIP_ONE (~IOQ_BLOCKED) -/** Extract the pointer from a slot. */ -static struct ioq_ent *ioq_slot_ptr(uintptr_t value) { - return (struct ioq_ent *)(value & ~IOQ_BLOCKED); -} - -/** Check if a slot is empty. */ -static bool ioq_slot_empty(uintptr_t value) { - return !ioq_slot_ptr(value); -} +static_assert(alignof(struct ioq_ent) >= (1 << 2), "struct ioq_ent is underaligned"); /** * An MPMC queue of I/O commands. @@ -87,19 +206,12 @@ struct ioqq { cache_align ioq_slot slots[]; }; -// If we assign slots sequentially, threads will likely be operating on -// consecutive slots. If these slots are in the same cache line, that will -// result in false sharing. We can mitigate this by assigning slots with a -// stride larger than a cache line e.g. 0, 9, 18, ..., 1, 10, 19, ... -// As long as the stride is relatively prime to circular buffer length, we'll -// still use every available slot. Since the length is a power of two, that -// means the stride must be odd. - -#define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(ioq_slot)) | 1) -bfs_static_assert(IOQ_STRIDE % 2 == 1); - /** Destroy an I/O command queue. */ static void ioqq_destroy(struct ioqq *ioqq) { + if (!ioqq) { + return; + } + for (size_t i = 0; i < ioqq->monitor_mask + 1; ++i) { ioq_monitor_destroy(&ioqq->monitors[i]); } @@ -148,16 +260,45 @@ static struct ioqq *ioqq_create(size_t size) { /** Get the monitor associated with a slot. */ static struct ioq_monitor *ioq_slot_monitor(struct ioqq *ioqq, ioq_slot *slot) { - size_t i = slot - ioqq->slots; + uint32_t i = slot - ioqq->slots; + + // Hash the index to de-correlate waiters + // https://nullprogram.com/blog/2018/07/31/ + // https://github.com/skeeto/hash-prospector/issues/19#issuecomment-1120105785 + i ^= i >> 16; + i *= UINT32_C(0x21f0aaad); + i ^= i >> 15; + i *= UINT32_C(0x735a2d97); + i ^= i >> 15; + return &ioqq->monitors[i & ioqq->monitor_mask]; } /** Atomically wait for a slot to change. */ +_noinline static uintptr_t ioq_slot_wait(struct ioqq *ioqq, ioq_slot *slot, uintptr_t value) { + uintptr_t ret; + + // Try spinning a few times (with exponential backoff) before blocking + _nounroll + for (int i = 1; i < 1024; i *= 2) { + _nounroll + for (int j = 0; j < i; ++j) { + spin_loop(); + } + + // Check if the slot changed + ret = load(slot, relaxed); + if (ret != value) { + return ret; + } + } + + // Nothing changed, start blocking struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot); mutex_lock(&monitor->mutex); - uintptr_t ret = load(slot, relaxed); + ret = load(slot, relaxed); if (ret != value) { goto done; } @@ -182,6 +323,7 @@ done: } /** Wake up any threads waiting on a slot. */ +_noinline static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) { struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot); @@ -201,85 +343,228 @@ static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) { cond_broadcast(&monitor->cond); } -/** Get the next slot for writing. */ -static ioq_slot *ioqq_write(struct ioqq *ioqq) { - size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed); - return &ioqq->slots[i & ioqq->slot_mask]; +/** Branch-free ((slot & IOQ_SKIP) ? skip : full) & ~IOQ_BLOCKED */ +static uintptr_t ioq_slot_blend(uintptr_t slot, uintptr_t skip, uintptr_t full) { + uintptr_t mask = -(slot >> IOQ_SKIP_BIT); + uintptr_t ret = (skip & mask) | (full & ~mask); + return ret & ~IOQ_BLOCKED; } /** Push an entry into a slot. */ -static void ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent) { - uintptr_t addr = (uintptr_t)ent; - bfs_assert(!ioq_slot_blocked(addr)); +static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent) { + uintptr_t prev = load(slot, relaxed); + + while (true) { + uintptr_t full = ioq_slot_blend(prev, 0, prev); + if (full) { + // full(ptr) → wait + prev = ioq_slot_wait(ioqq, slot, prev); + continue; + } + + // empty → full(ptr) + uintptr_t next = (uintptr_t)ent >> 1; + // skip(1) → empty + // skip(n) → skip(n - 1) + next = ioq_slot_blend(prev, prev - IOQ_SKIP_ONE, next); + + if (compare_exchange_weak(slot, &prev, next, release, relaxed)) { + break; + } + } + + if (prev & IOQ_BLOCKED) { + ioq_slot_wake(ioqq, slot); + } + return !(prev & IOQ_SKIP); +} + +/** (Try to) pop an entry from a slot. */ +static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) { uintptr_t prev = load(slot, relaxed); - do { - while (!ioq_slot_empty(prev)) { + while (true) { +#if __has_builtin(__builtin_prefetch) + // Optimistically prefetch the pointer in this slot. If this + // slot is not full, this will prefetch an invalid address, but + // experimentally this is worth it on both Intel (Alder Lake) + // and AMD (Zen 2). + __builtin_prefetch((void *)(prev << 1), 1 /* write */); +#endif + + // empty → skip(1) + // skip(n) → skip(n + 1) + // full(ptr) → full(ptr - 1) + uintptr_t next = prev + IOQ_SKIP_ONE; + // full(ptr) → 0 + next = ioq_slot_blend(next, next, 0); + + if (block && next) { prev = ioq_slot_wait(ioqq, slot, prev); + continue; + } + + if (compare_exchange_weak(slot, &prev, next, acquire, relaxed)) { + break; } - } while (!compare_exchange_weak(slot, &prev, addr, release, relaxed)); + } - if (ioq_slot_blocked(prev)) { + if (prev & IOQ_BLOCKED) { ioq_slot_wake(ioqq, slot); } + + // empty → 0 + // skip(n) → 0 + // full(ptr) → ptr + prev = ioq_slot_blend(prev, 0, prev); + return (struct ioq_ent *)(prev << 1); } /** Push an entry onto the queue. */ static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) { - ioq_slot *slot = ioqq_write(ioqq); - ioq_slot_push(ioqq, slot, ent); -} - -/** Get the next slot for reading. */ -static ioq_slot *ioqq_read(struct ioqq *ioqq) { - size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed); - return &ioqq->slots[i & ioqq->slot_mask]; + while (true) { + size_t i = fetch_add(&ioqq->head, 1, relaxed); + ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask]; + if (ioq_slot_push(ioqq, slot, ent)) { + break; + } + } } -/** (Try to) pop an entry from a slot. */ -static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) { - uintptr_t prev = load(slot, relaxed); +/** Push a batch of entries to the queue. */ +static void ioqq_push_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t size) { + size_t mask = ioqq->slot_mask; do { - while (ioq_slot_empty(prev)) { - if (block) { - prev = ioq_slot_wait(ioqq, slot, prev); - } else { - return NULL; + size_t i = fetch_add(&ioqq->head, size, relaxed); + for (size_t j = i + size; i != j; ++i) { + ioq_slot *slot = &ioqq->slots[i & mask]; + if (ioq_slot_push(ioqq, slot, *batch)) { + ++batch; + --size; } } - } while (!compare_exchange_weak(slot, &prev, 0, acquire, relaxed)); + } while (size > 0); +} - if (ioq_slot_blocked(prev)) { - ioq_slot_wake(ioqq, slot); +/** Pop a batch of entries from the queue. */ +static void ioqq_pop_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t size, bool block) { + size_t mask = ioqq->slot_mask; + size_t i = fetch_add(&ioqq->tail, size, relaxed); + for (size_t j = i + size; i != j; ++i) { + ioq_slot *slot = &ioqq->slots[i & mask]; + *batch++ = ioq_slot_pop(ioqq, slot, block); + block = false; } +} + +/** Use cache-line-sized batches. */ +#define IOQ_BATCH (FALSE_SHARING_SIZE / sizeof(ioq_slot)) + +/** + * A batch of I/O queue entries. + */ +struct ioq_batch { + /** The start of the batch. */ + size_t head; + /** The end of the batch. */ + size_t tail; + /** The array of entries. */ + struct ioq_ent *entries[IOQ_BATCH]; +}; - return ioq_slot_ptr(prev); +/** Reset a batch. */ +static void ioq_batch_reset(struct ioq_batch *batch) { + batch->head = batch->tail = 0; } -/** Pop an entry from the queue. */ -static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) { - ioq_slot *slot = ioqq_read(ioqq); - return ioq_slot_pop(ioqq, slot, true); +/** Check if a batch is empty. */ +static bool ioq_batch_empty(const struct ioq_batch *batch) { + return batch->head >= batch->tail; +} + +/** Send a batch to a queue. */ +static void ioq_batch_flush(struct ioqq *ioqq, struct ioq_batch *batch) { + if (batch->tail > 0) { + ioqq_push_batch(ioqq, batch->entries, batch->tail); + ioq_batch_reset(batch); + } } -/** Pop an entry from the queue if one is available. */ -static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) { - size_t i = load(&ioqq->tail, relaxed); - ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask]; +/** Push an entry to a batch, flushing if necessary. */ +static void ioq_batch_push(struct ioqq *ioqq, struct ioq_batch *batch, struct ioq_ent *ent) { + batch->entries[batch->tail++] = ent; - struct ioq_ent *ret = ioq_slot_pop(ioqq, slot, false); - if (ret) { - size_t j = exchange(&ioqq->tail, i + IOQ_STRIDE, relaxed); - bfs_assert(j == i, "Detected multiple consumers"); - (void)j; + if (batch->tail >= IOQ_BATCH) { + ioq_batch_flush(ioqq, batch); } +} - return ret; +/** Fill a batch from a queue. */ +static bool ioq_batch_fill(struct ioqq *ioqq, struct ioq_batch *batch, bool block) { + ioqq_pop_batch(ioqq, batch->entries, IOQ_BATCH, block); + + ioq_batch_reset(batch); + for (size_t i = 0; i < IOQ_BATCH; ++i) { + struct ioq_ent *ent = batch->entries[i]; + if (ent) { + batch->entries[batch->tail++] = ent; + } + } + + return batch->tail > 0; +} + +/** Pop an entry from a batch, filling it first if necessary. */ +static struct ioq_ent *ioq_batch_pop(struct ioqq *ioqq, struct ioq_batch *batch, bool block) { + if (ioq_batch_empty(batch)) { + // For non-blocking pops, make sure that each ioq_batch_pop() + // corresponds to a single (amortized) increment of ioqq->head. + // Otherwise, we start skipping many slots and batching ends up + // degrading performance. + if (!block && batch->head < IOQ_BATCH) { + ++batch->head; + return NULL; + } + + if (!ioq_batch_fill(ioqq, batch, block)) { + return NULL; + } + } + + return batch->entries[batch->head++]; } /** Sentinel stop command. */ static struct ioq_ent IOQ_STOP; +#if BFS_WITH_LIBURING +/** + * Supported io_uring operations. + */ +enum ioq_ring_ops { + IOQ_RING_OPENAT = 1 << 0, + IOQ_RING_CLOSE = 1 << 1, + IOQ_RING_STATX = 1 << 2, +}; +#endif + +/** I/O queue thread-specific data. */ +struct ioq_thread { + /** The thread handle. */ + pthread_t id; + /** Pointer back to the I/O queue. */ + struct ioq *parent; + +#if BFS_WITH_LIBURING + /** io_uring instance. */ + struct io_uring ring; + /** Any error that occurred initializing the ring. */ + int ring_err; + /** Bitmask of supported io_uring operations. */ + enum ioq_ring_ops ring_ops; +#endif +}; + struct ioq { /** The depth of the queue. */ size_t depth; @@ -290,72 +575,558 @@ struct ioq { /** ioq_ent arena. */ struct arena ents; +#if BFS_WITH_LIBURING && BFS_USE_STATX + /** struct statx arena. */ + struct arena xbufs; +#endif - /** Pending I/O requests. */ + /** Pending I/O request queue. */ struct ioqq *pending; - /** Ready I/O responses. */ + /** Ready I/O response queue. */ struct ioqq *ready; + /** Pending request batch. */ + struct ioq_batch pending_batch; + /** Ready request batch. */ + struct ioq_batch ready_batch; + /** The number of background threads. */ size_t nthreads; /** The background threads themselves. */ - pthread_t threads[]; + struct ioq_thread threads[]; }; -/** Background thread entry point. */ -static void *ioq_work(void *ptr) { - struct ioq *ioq = ptr; +/** Cancel a request if we need to. */ +static bool ioq_check_cancel(struct ioq *ioq, struct ioq_ent *ent) { + if (!load(&ioq->cancel, relaxed)) { + return false; + } - while (true) { - struct ioq_ent *ent = ioqq_pop(ioq->pending); - if (ent == &IOQ_STOP) { - break; - } + // Always close(), even if we're cancelled, just like a real EINTR + if (ent->op == IOQ_CLOSE || ent->op == IOQ_CLOSEDIR) { + return false; + } - bool cancel = load(&ioq->cancel, relaxed); + ent->result = -EINTR; + return true; +} - ent->ret = -1; +/** Dispatch a single request synchronously. */ +static void ioq_dispatch_sync(struct ioq *ioq, struct ioq_ent *ent) { + switch (ent->op) { + case IOQ_NOP: + if (ent->nop.type == IOQ_NOP_HEAVY) { + // A fast, no-op syscall + getppid(); + } + ent->result = 0; + return; - switch (ent->op) { case IOQ_CLOSE: - // Always close(), even if we're cancelled, just like a real EINTR - ent->ret = xclose(ent->close.fd); - break; + ent->result = try(xclose(ent->close.fd)); + return; + + case IOQ_OPENDIR: { + struct ioq_opendir *args = &ent->opendir; + ent->result = try(bfs_opendir(args->dir, args->dfd, args->path, args->flags)); + if (ent->result >= 0) { + bfs_polldir(args->dir); + } + return; + } + + case IOQ_CLOSEDIR: + ent->result = try(bfs_closedir(ent->closedir.dir)); + return; + + case IOQ_STAT: { + struct ioq_stat *args = &ent->stat; + ent->result = try(bfs_stat(args->dfd, args->path, args->flags, args->buf)); + return; + } + } + + bfs_bug("Unknown ioq_op %d", (int)ent->op); + ent->result = -ENOSYS; +} - case IOQ_OPENDIR: - if (!cancel) { - struct ioq_opendir *args = &ent->opendir; - ent->ret = bfs_opendir(args->dir, args->dfd, args->path); - if (ent->ret == 0) { - bfs_polldir(args->dir); - } +#if BFS_WITH_LIBURING + +/** io_uring worker state. */ +struct ioq_ring_state { + /** The I/O queue. */ + struct ioq *ioq; + /** The io_uring. */ + struct io_uring *ring; + /** Supported io_uring operations. */ + enum ioq_ring_ops ops; + /** Number of prepped, unsubmitted SQEs. */ + size_t prepped; + /** Number of submitted, unreaped SQEs. */ + size_t submitted; + /** Whether to stop the loop. */ + bool stop; + /** A batch of ready entries. */ + struct ioq_batch ready; +}; + +/** Reap a single CQE. */ +static void ioq_reap_cqe(struct ioq_ring_state *state, struct io_uring_cqe *cqe) { + struct ioq *ioq = state->ioq; + + struct ioq_ent *ent = io_uring_cqe_get_data(cqe); + ent->result = cqe->res; + + if (ent->result < 0) { + goto push; + } + + switch (ent->op) { + case IOQ_OPENDIR: { + int fd = ent->result; + if (ioq_check_cancel(ioq, ent)) { + xclose(fd); + goto push; + } + + struct ioq_opendir *args = &ent->opendir; + ent->result = try(bfs_opendir(args->dir, fd, NULL, args->flags)); + if (ent->result >= 0) { + // TODO: io_uring_prep_getdents() + bfs_polldir(args->dir); + } else { + xclose(fd); } + break; + } - case IOQ_CLOSEDIR: - ent->ret = bfs_closedir(ent->closedir.dir); +#if BFS_USE_STATX + case IOQ_STAT: { + struct ioq_stat *args = &ent->stat; + ent->result = try(bfs_statx_convert(args->buf, args->xbuf)); break; + } +#endif default: - bfs_bug("Unknown ioq_op %d", (int)ent->op); - errno = ENOSYS; break; + } + +push: + ioq_batch_push(ioq->ready, &state->ready, ent); +} + +/** Wait for submitted requests to complete. */ +static void ioq_ring_drain(struct ioq_ring_state *state, size_t wait_nr) { + struct ioq *ioq = state->ioq; + struct io_uring *ring = state->ring; + + bfs_assert(wait_nr <= state->submitted); + + while (state->submitted > 0) { + struct io_uring_cqe *cqe; + if (wait_nr > 0) { + io_uring_wait_cqes(ring, &cqe, wait_nr, NULL, NULL); + } + + unsigned int head; + size_t seen = 0; + io_uring_for_each_cqe (ring, head, cqe) { + ioq_reap_cqe(state, cqe); + ++seen; + } + + io_uring_cq_advance(ring, seen); + state->submitted -= seen; + + if (seen >= wait_nr) { + break; + } + wait_nr -= seen; + } + + ioq_batch_flush(ioq->ready, &state->ready); +} + +/** Submit prepped SQEs, and wait for some to complete. */ +static void ioq_ring_submit(struct ioq_ring_state *state) { + struct io_uring *ring = state->ring; + + size_t unreaped = state->prepped + state->submitted; + size_t wait_nr = 0; + + if (state->prepped == 0 && unreaped > 0) { + // If we have no new SQEs, wait for at least one old one to + // complete, to avoid livelock + wait_nr = 1; + } + + if (unreaped > ring->sq.ring_entries) { + // Keep the completion queue below half full + wait_nr = unreaped - ring->sq.ring_entries; + } + + // Submit all prepped SQEs + while (state->prepped > 0) { + int ret = io_uring_submit_and_wait(state->ring, wait_nr); + if (ret <= 0) { + continue; } - if (cancel) { - ent->error = EINTR; - } else if (ent->ret < 0) { - ent->error = errno; + state->submitted += ret; + state->prepped -= ret; + if (state->prepped > 0) { + // In the unlikely event of a short submission, any SQE + // links will be broken. Wait for all SQEs to complete + // to preserve any ordering requirements. + ioq_ring_drain(state, state->submitted); + wait_nr = 0; + } + } + + // Drain all the CQEs we waited for (and any others that are ready) + ioq_ring_drain(state, wait_nr); +} + +/** Reserve space for a number of SQEs, submitting if necessary. */ +static void ioq_reserve_sqes(struct ioq_ring_state *state, unsigned int count) { + while (io_uring_sq_space_left(state->ring) < count) { + ioq_ring_submit(state); + } +} + +/** Get an SQE, submitting if necessary. */ +static struct io_uring_sqe *ioq_get_sqe(struct ioq_ring_state *state) { + ioq_reserve_sqes(state, 1); + return io_uring_get_sqe(state->ring); +} + +/** Dispatch a single request asynchronously. */ +static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, struct ioq_ent *ent) { + enum ioq_ring_ops ops = state->ops; + struct io_uring_sqe *sqe = NULL; + + switch (ent->op) { + case IOQ_NOP: + if (ent->nop.type == IOQ_NOP_HEAVY) { + sqe = ioq_get_sqe(state); + io_uring_prep_nop(sqe); + } + return sqe; + + case IOQ_CLOSE: + if (ops & IOQ_RING_CLOSE) { + sqe = ioq_get_sqe(state); + io_uring_prep_close(sqe, ent->close.fd); + } + return sqe; + + case IOQ_OPENDIR: + if (ops & IOQ_RING_OPENAT) { + sqe = ioq_get_sqe(state); + struct ioq_opendir *args = &ent->opendir; + int flags = O_RDONLY | O_CLOEXEC | O_DIRECTORY; + io_uring_prep_openat(sqe, args->dfd, args->path, flags, 0); + } + return sqe; + + case IOQ_CLOSEDIR: +#if BFS_USE_UNWRAPDIR + if (ops & IOQ_RING_CLOSE) { + sqe = ioq_get_sqe(state); + io_uring_prep_close(sqe, bfs_unwrapdir(ent->closedir.dir)); + } +#endif + return sqe; + + case IOQ_STAT: +#if BFS_USE_STATX + if (ops & IOQ_RING_STATX) { + sqe = ioq_get_sqe(state); + struct ioq_stat *args = &ent->stat; + int flags = bfs_statx_flags(args->flags); + unsigned int mask = bfs_statx_mask(); + io_uring_prep_statx(sqe, args->dfd, args->path, flags, mask, args->xbuf); + } +#endif + return sqe; + } + + bfs_bug("Unknown ioq_op %d", (int)ent->op); + return NULL; +} + +/** Check if ioq_ring_reap() has work to do. */ +static bool ioq_ring_empty(struct ioq_ring_state *state) { + return !state->prepped && !state->submitted && ioq_batch_empty(&state->ready); +} + +/** Prep a single SQE. */ +static void ioq_prep_sqe(struct ioq_ring_state *state, struct ioq_ent *ent) { + struct ioq *ioq = state->ioq; + if (ioq_check_cancel(ioq, ent)) { + ioq_batch_push(ioq->ready, &state->ready, ent); + return; + } + + struct io_uring_sqe *sqe = ioq_dispatch_async(state, ent); + if (sqe) { + io_uring_sqe_set_data(sqe, ent); + ++state->prepped; + } else { + ioq_dispatch_sync(ioq, ent); + ioq_batch_push(ioq->ready, &state->ready, ent); + } +} + +/** Prep a batch of SQEs. */ +static bool ioq_ring_prep(struct ioq_ring_state *state) { + if (state->stop) { + return false; + } + + struct ioq *ioq = state->ioq; + + struct ioq_batch pending; + ioq_batch_reset(&pending); + + while (true) { + bool block = ioq_ring_empty(state); + struct ioq_ent *ent = ioq_batch_pop(ioq->pending, &pending, block); + if (ent == &IOQ_STOP) { + ioqq_push(ioq->pending, ent); + state->stop = true; + break; + } else if (ent) { + ioq_prep_sqe(state, ent); } else { - ent->error = 0; + break; + } + } + + bfs_assert(ioq_batch_empty(&pending)); + return !ioq_ring_empty(state); +} + +/** io_uring worker loop. */ +static int ioq_ring_work(struct ioq_thread *thread) { + struct io_uring *ring = &thread->ring; + +#ifdef IORING_SETUP_R_DISABLED + if (ring->flags & IORING_SETUP_R_DISABLED) { + if (io_uring_enable_rings(ring) != 0) { + return -1; + } + } +#endif + + struct ioq_ring_state state = { + .ioq = thread->parent, + .ring = ring, + .ops = thread->ring_ops, + }; + + while (ioq_ring_prep(&state)) { + ioq_ring_submit(&state); + } + + ioq_ring_drain(&state, state.submitted); + return 0; +} + +#endif // BFS_WITH_LIBURING + +/** Synchronous syscall loop. */ +static void ioq_sync_work(struct ioq_thread *thread) { + struct ioq *ioq = thread->parent; + + struct ioq_batch pending, ready; + ioq_batch_reset(&pending); + ioq_batch_reset(&ready); + + while (true) { + if (ioq_batch_empty(&pending)) { + ioq_batch_flush(ioq->ready, &ready); + } + + struct ioq_ent *ent = ioq_batch_pop(ioq->pending, &pending, true); + if (ent == &IOQ_STOP) { + ioqq_push(ioq->pending, ent); + break; + } + + if (!ioq_check_cancel(ioq, ent)) { + ioq_dispatch_sync(ioq, ent); } + ioq_batch_push(ioq->ready, &ready, ent); + } + + bfs_assert(ioq_batch_empty(&pending)); + ioq_batch_flush(ioq->ready, &ready); +} - ioqq_push(ioq->ready, ent); +/** Background thread entry point. */ +static void *ioq_work(void *ptr) { + struct ioq_thread *thread = ptr; + +#if BFS_WITH_LIBURING + if (thread->ring_err == 0) { + if (ioq_ring_work(thread) == 0) { + return NULL; + } } +#endif + ioq_sync_work(thread); return NULL; } +#if BFS_WITH_LIBURING +/** Test whether some io_uring setup flags are supported. */ +static bool ioq_ring_probe_flags(struct io_uring_params *params, unsigned int flags) { + unsigned int saved = params->flags; + params->flags |= flags; + + struct io_uring ring; + int ret = io_uring_queue_init_params(2, &ring, params); + if (ret == 0) { + io_uring_queue_exit(&ring); + } + + if (ret == -EINVAL) { + params->flags = saved; + return false; + } + + return true; +} +#endif + +/** Initialize io_uring thread state. */ +static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) { +#if BFS_WITH_LIBURING + struct ioq_thread *prev = NULL; + if (thread > ioq->threads) { + prev = thread - 1; + } + + if (prev && prev->ring_err) { + thread->ring_err = prev->ring_err; + return -1; + } + + struct io_uring_params params = {0}; + + if (prev) { + // Share io-wq workers between rings + params.flags = prev->ring.flags | IORING_SETUP_ATTACH_WQ; + params.wq_fd = prev->ring.ring_fd; + } else { +#ifdef IORING_SETUP_SUBMIT_ALL + // Don't abort submission just because an inline request fails + ioq_ring_probe_flags(¶ms, IORING_SETUP_SUBMIT_ALL); +#endif + +#ifdef IORING_SETUP_R_DISABLED + // Don't enable the ring yet (needed for SINGLE_ISSUER) + if (ioq_ring_probe_flags(¶ms, IORING_SETUP_R_DISABLED)) { +# ifdef IORING_SETUP_SINGLE_ISSUER + // Allow optimizations assuming only one task submits SQEs + ioq_ring_probe_flags(¶ms, IORING_SETUP_SINGLE_ISSUER); +# endif +# ifdef IORING_SETUP_DEFER_TASKRUN + // Don't interrupt us aggressively with completion events + ioq_ring_probe_flags(¶ms, IORING_SETUP_DEFER_TASKRUN); +# endif + } +#endif + } + + // Use a page for each SQE ring + size_t entries = 4096 / sizeof(struct io_uring_sqe); + thread->ring_err = -io_uring_queue_init_params(entries, &thread->ring, ¶ms); + if (thread->ring_err) { + return -1; + } + + if (prev) { + // Initial setup already complete + thread->ring_ops = prev->ring_ops; + return 0; + } + + // Check for supported operations + struct io_uring_probe *probe = io_uring_get_probe_ring(&thread->ring); + if (probe) { + if (io_uring_opcode_supported(probe, IORING_OP_OPENAT)) { + thread->ring_ops |= IOQ_RING_OPENAT; + } + if (io_uring_opcode_supported(probe, IORING_OP_CLOSE)) { + thread->ring_ops |= IOQ_RING_CLOSE; + } +#if BFS_USE_STATX + if (io_uring_opcode_supported(probe, IORING_OP_STATX)) { + thread->ring_ops |= IOQ_RING_STATX; + } +#endif + io_uring_free_probe(probe); + } + if (!thread->ring_ops) { + io_uring_queue_exit(&thread->ring); + thread->ring_err = ENOTSUP; + return -1; + } + +#if BFS_HAS_IO_URING_MAX_WORKERS + // Limit the number of io_uring workers + unsigned int values[] = { + ioq->nthreads, // [IO_WQ_BOUND] + 0, // [IO_WQ_UNBOUND] + }; + io_uring_register_iowq_max_workers(&thread->ring, values); +#endif + +#endif // BFS_WITH_LIBURING + + return 0; +} + +/** Destroy an io_uring. */ +static void ioq_ring_exit(struct ioq_thread *thread) { +#if BFS_WITH_LIBURING + if (thread->ring_err == 0) { + io_uring_queue_exit(&thread->ring); + } +#endif +} + +/** Create an I/O queue thread. */ +static int ioq_thread_create(struct ioq *ioq, size_t i) { + struct ioq_thread *thread = &ioq->threads[i]; + thread->parent = ioq; + + ioq_ring_init(ioq, thread); + + if (thread_create(&thread->id, NULL, ioq_work, thread) != 0) { + ioq_ring_exit(thread); + return -1; + } + + char name[16]; + if (snprintf(name, sizeof(name), "ioq-%zu", i) >= 0) { + thread_setname(thread->id, name); + } + + return 0; +} + +/** Join an I/O queue thread. */ +static void ioq_thread_join(struct ioq_thread *thread) { + thread_join(thread->id, NULL); + ioq_ring_exit(thread); +} + struct ioq *ioq_create(size_t depth, size_t nthreads) { struct ioq *ioq = ZALLOC_FLEX(struct ioq, threads, nthreads); if (!ioq) { @@ -363,7 +1134,11 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) { } ioq->depth = depth; + ARENA_INIT(&ioq->ents, struct ioq_ent); +#if BFS_WITH_LIBURING && BFS_USE_STATX + ARENA_INIT(&ioq->xbufs, struct statx); +#endif ioq->pending = ioqq_create(depth); if (!ioq->pending) { @@ -375,11 +1150,12 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) { goto fail; } + ioq->nthreads = nthreads; for (size_t i = 0; i < nthreads; ++i) { - if (thread_create(&ioq->threads[i], NULL, ioq_work, ioq) != 0) { + if (ioq_thread_create(ioq, i) != 0) { + ioq->nthreads = i; goto fail; } - ++ioq->nthreads; } return ioq; @@ -418,6 +1194,18 @@ static struct ioq_ent *ioq_request(struct ioq *ioq, enum ioq_op op, void *ptr) { return ent; } +int ioq_nop(struct ioq *ioq, enum ioq_nop_type type, void *ptr) { + struct ioq_ent *ent = ioq_request(ioq, IOQ_NOP, ptr); + if (!ent) { + return -1; + } + + ent->nop.type = type; + + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); + return 0; +} + int ioq_close(struct ioq *ioq, int fd, void *ptr) { struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSE, ptr); if (!ent) { @@ -426,11 +1214,11 @@ int ioq_close(struct ioq *ioq, int fd, void *ptr) { ent->close.fd = fd; - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } -int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) { +int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, enum bfs_dir_flags flags, void *ptr) { struct ioq_ent *ent = ioq_request(ioq, IOQ_OPENDIR, ptr); if (!ent) { return -1; @@ -440,8 +1228,9 @@ int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, args->dir = dir; args->dfd = dfd; args->path = path; + args->flags = flags; - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } @@ -453,38 +1242,66 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) { ent->closedir.dir = dir; - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } -struct ioq_ent *ioq_pop(struct ioq *ioq) { - if (ioq->size == 0) { - return NULL; +int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags flags, struct bfs_stat *buf, void *ptr) { + struct ioq_ent *ent = ioq_request(ioq, IOQ_STAT, ptr); + if (!ent) { + return -1; + } + + struct ioq_stat *args = &ent->stat; + args->dfd = dfd; + args->path = path; + args->flags = flags; + args->buf = buf; + +#if BFS_WITH_LIBURING && BFS_USE_STATX + args->xbuf = arena_alloc(&ioq->xbufs); + if (!args->xbuf) { + ioq_free(ioq, ent); + return -1; } +#endif - return ioqq_pop(ioq->ready); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); + return 0; } -struct ioq_ent *ioq_trypop(struct ioq *ioq) { +void ioq_submit(struct ioq *ioq) { + ioq_batch_flush(ioq->pending, &ioq->pending_batch); +} + +struct ioq_ent *ioq_pop(struct ioq *ioq, bool block) { + // Don't forget to submit before popping + bfs_assert(ioq_batch_empty(&ioq->pending_batch)); + if (ioq->size == 0) { return NULL; } - return ioqq_trypop(ioq->ready); + return ioq_batch_pop(ioq->ready, &ioq->ready_batch, block); } void ioq_free(struct ioq *ioq, struct ioq_ent *ent) { bfs_assert(ioq->size > 0); --ioq->size; +#if BFS_WITH_LIBURING && BFS_USE_STATX + if (ent->op == IOQ_STAT && ent->stat.xbuf) { + arena_free(&ioq->xbufs, ent->stat.xbuf); + } +#endif + arena_free(&ioq->ents, ent); } void ioq_cancel(struct ioq *ioq) { if (!exchange(&ioq->cancel, true, relaxed)) { - for (size_t i = 0; i < ioq->nthreads; ++i) { - ioqq_push(ioq->pending, &IOQ_STOP); - } + ioq_batch_push(ioq->pending, &ioq->pending_batch, &IOQ_STOP); + ioq_submit(ioq); } } @@ -493,15 +1310,20 @@ void ioq_destroy(struct ioq *ioq) { return; } - ioq_cancel(ioq); + if (ioq->nthreads > 0) { + ioq_cancel(ioq); + } for (size_t i = 0; i < ioq->nthreads; ++i) { - thread_join(ioq->threads[i], NULL); + ioq_thread_join(&ioq->threads[i]); } ioqq_destroy(ioq->ready); ioqq_destroy(ioq->pending); +#if BFS_WITH_LIBURING && BFS_USE_STATX + arena_destroy(&ioq->xbufs); +#endif arena_destroy(&ioq->ents); free(ioq); |