From 5fe11b94b38bfb4d43637e05ac24da0d7d72b9ea Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Tue, 7 Nov 2023 16:43:35 -0500 Subject: ioq: Implement a better non-blocking pop --- src/bftw.c | 2 +- src/ioq.c | 276 +++++++++++++++++++++++++++++++++++++++++-------------------- src/ioq.h | 13 +-- 3 files changed, 189 insertions(+), 102 deletions(-) diff --git a/src/bftw.c b/src/bftw.c index 5a55037..0b74cd9 100644 --- a/src/bftw.c +++ b/src/bftw.c @@ -588,7 +588,7 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) { return -1; } - struct ioq_ent *ent = block ? ioq_pop(ioq) : ioq_trypop(ioq); + struct ioq_ent *ent = ioq_pop(ioq, block); if (!ent) { return -1; } diff --git a/src/ioq.c b/src/ioq.c index 8c1bdbe..244b2cc 100644 --- a/src/ioq.c +++ b/src/ioq.c @@ -1,6 +1,127 @@ // Copyright © Tavian Barnes // SPDX-License-Identifier: 0BSD +/** + * An asynchronous I/O queue implementation. + * + * struct ioq is composed of two separate queues: + * + * struct ioqq *pending; // Pending I/O requests + * struct ioqq *ready; // Ready I/O responses + * + * Worker threads pop requests from `pending`, execute them, and push them back + * to the `ready` queue. The main thread pushes requests to `pending` and pops + * them from `ready`. + * + * struct ioqq is a blocking MPMC queue (though it could be SPMC/MPSC for + * pending/ready respectively). It is implemented as a circular buffer: + * + * size_t mask; // (1 << N) - 1 + * [padding] + * size_t head; // Writer index + * [padding] + * size_t tail; // Reader index + * [padding] + * ioq_slot slots[1 << N]; // Queue contents + * + * Pushes are implemented with an unconditional + * + * fetch_add(&ioqq->head, IOQ_STRIDE) + * + * which scales better on many architectures than compare-and-swap (see [1] for + * details). Pops are implemented similarly. We add IOQ_STRIDE rather than 1 + * so that successive queue elements are on different cache lines, but the + * exposition below uses 1 for simplicity. + * + * Since the fetch-and-adds are unconditional, non-blocking readers can get + * ahead of writers: + * + * Reader Writer + * ──────────────── ────────────────────── + * head: 0 → 1 + * slots[0]: empty + * tail: 0 → 1 + * slots[0]: empty → full + * head: 1 → 2 + * slots[1]: empty! + * + * To avoid this, non-blocking reads (ioqq_pop(ioqq, false)) must mark the slots + * somehow so that writers can skip them: + * + * Reader Writer + * ─────────────────────── ─────────────────────── + * head: 0 → 1 + * slots[0]: empty → skip + * tail: 0 → 1 + * slots[0]: skip → empty + * tail: 1 → 2 + * slots[1]: empty → full + * head: 1 → 2 + * slots[1]: full → empty + * + * As well, a reader might "lap" a writer (or another reader), so slots need to + * count how many times they should be skipped: + * + * Reader Writer + * ────────────────────────── ───────────────────────── + * head: 0 → 1 + * slots[0]: empty → skip(1) + * head: 1 → 2 + * slots[1]: empty → skip(1) + * ... + * head: M → 0 + * slots[M]: empty → skip(1) + * head: 0 → 1 + * slots[0]: skip(1 → 2) + * tail: 0 → 1 + * slots[0]: skip(2 → 1) + * tail: 1 → 2 + * slots[1]: skip(1) → empty + * ... + * tail: M → 0 + * slots[M]: skip(1) → empty + * tail: 0 → 1 + * slots[0]: skip(1) → empty + * tail: 1 → 2 + * slots[1]: empty → full + * head: 1 → 2 + * slots[1]: full → empty + * + * As described in [1], this approach is susceptible to livelock if readers stay + * ahead of writers. This is okay for us because we don't retry failed non- + * blocking reads. + * + * The slot representation uses tag bits to hold either a pointer or skip(N): + * + * IOQ_SKIP (highest bit) IOQ_BLOCKED (lowest bit) + * ↓ ↓ + * 0 0 0 ... 0 0 0 + * └──────────┬──────────┘ + * │ + * value bits + * + * If IOQ_SKIP is unset, the value bits hold a pointer (or zero/NULL for empty). + * If IOQ_SKIP is set, the value bits hold a negative skip count. Writers can + * reduce the skip count by adding 1 to the value bits, and when the count hits + * zero, the carry will automatically clear IOQ_SKIP: + * + * IOQ_SKIP IOQ_BLOCKED + * ↓ ↓ + * 1 1 1 ... 1 0 0 skip(2) + * 1 1 1 ... 1 1 0 skip(1) + * 0 0 0 ... 0 0 0 empty + * + * The IOQ_BLOCKED flag is used to track sleeping waiters, futex-style. To wait + * for a slot to change, waiters call ioq_slot_wait() which sets IOQ_BLOCKED and + * goes to sleep. Whenever a slot is updated, if the old value had IOQ_BLOCKED + * set, ioq_slot_wake() must be called to wake up that waiter. + * + * Blocking/waking uses a pool of monitors (mutex, condition variable pairs). + * Slots are assigned round-robin to a monitor from the pool. + * + * [1]: https://arxiv.org/abs/2201.02179 + */ + #include "ioq.h" #include "alloc.h" #include "atomic.h" @@ -51,24 +172,15 @@ static void ioq_monitor_destroy(struct ioq_monitor *monitor) { /** A single entry in a command queue. */ typedef atomic uintptr_t ioq_slot; -/** Slot flag bit to indicate waiters. */ +/** Someone might be waiting on this slot. */ #define IOQ_BLOCKED ((uintptr_t)1) -bfs_static_assert(alignof(struct ioq_ent) > 1); - -/** Check if a slot has waiters. */ -static bool ioq_slot_blocked(uintptr_t value) { - return value & IOQ_BLOCKED; -} - -/** Extract the pointer from a slot. */ -static struct ioq_ent *ioq_slot_ptr(uintptr_t value) { - return (struct ioq_ent *)(value & ~IOQ_BLOCKED); -} +/** The next push(es) should skip this slot. */ +#define IOQ_SKIP ((uintptr_t)1 << (UINTPTR_WIDTH - 1)) +/** Amount to add for an additional skip. */ +#define IOQ_SKIP_ONE (~IOQ_BLOCKED) -/** Check if a slot is empty. */ -static bool ioq_slot_empty(uintptr_t value) { - return !ioq_slot_ptr(value); -} +// Need room for two flag bits +bfs_static_assert(alignof(struct ioq_ent) > 2); /** * An MPMC queue of I/O commands. @@ -205,80 +317,85 @@ static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) { cond_broadcast(&monitor->cond); } -/** Get the next slot for writing. */ -static ioq_slot *ioqq_write(struct ioqq *ioqq) { - size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed); - return &ioqq->slots[i & ioqq->slot_mask]; -} - /** Push an entry into a slot. */ -static void ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent) { - uintptr_t addr = (uintptr_t)ent; - bfs_assert(!ioq_slot_blocked(addr)); - +static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent) { uintptr_t prev = load(slot, relaxed); - do { - while (!ioq_slot_empty(prev)) { + while (true) { + uintptr_t next; + if (prev & IOQ_SKIP) { + // skip(1) → empty + // skip(n) → skip(n - 1) + next = (prev - IOQ_SKIP_ONE) & ~IOQ_BLOCKED; + } else if (prev > IOQ_BLOCKED) { + // full(ptr) → wait prev = ioq_slot_wait(ioqq, slot, prev); + continue; + } else { + // empty → full(ptr) + next = (uintptr_t)ent >> 1; } - } while (!compare_exchange_weak(slot, &prev, addr, release, relaxed)); - if (ioq_slot_blocked(prev)) { + if (compare_exchange_weak(slot, &prev, next, release, relaxed)) { + break; + } + } + + if (prev & IOQ_BLOCKED) { ioq_slot_wake(ioqq, slot); } + + return !(prev & IOQ_SKIP); } /** Push an entry onto the queue. */ static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) { - ioq_slot *slot = ioqq_write(ioqq); - ioq_slot_push(ioqq, slot, ent); -} - -/** Get the next slot for reading. */ -static ioq_slot *ioqq_read(struct ioqq *ioqq) { - size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed); - return &ioqq->slots[i & ioqq->slot_mask]; + while (true) { + size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed); + ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask]; + if (ioq_slot_push(ioqq, slot, ent)) { + break; + } + } } /** (Try to) pop an entry from a slot. */ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) { uintptr_t prev = load(slot, relaxed); - do { - while (ioq_slot_empty(prev)) { - if (block) { - prev = ioq_slot_wait(ioqq, slot, prev); - } else { - return NULL; - } + while (true) { + // empty → skip(1) + // skip(n) → skip(n + 1) + // full(ptr) → full(ptr - 1) + uintptr_t next = prev + IOQ_SKIP_ONE; + // skip(n) → ~IOQ_BLOCKED + // full(ptr) → 0 + next &= (next & IOQ_SKIP) ? ~IOQ_BLOCKED : 0; + + if (block && next) { + prev = ioq_slot_wait(ioqq, slot, prev); + continue; + } + + if (compare_exchange_weak(slot, &prev, next, acquire, relaxed)) { + break; } - } while (!compare_exchange_weak(slot, &prev, 0, acquire, relaxed)); + } - if (ioq_slot_blocked(prev)) { + if (prev & IOQ_BLOCKED) { ioq_slot_wake(ioqq, slot); } - return ioq_slot_ptr(prev); + // empty → 0 + // skip(n) → 0 + // full(ptr) → ptr + prev &= (prev & IOQ_SKIP) ? 0 : ~IOQ_BLOCKED; + return (struct ioq_ent *)(prev << 1); } /** Pop an entry from the queue. */ -static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) { - ioq_slot *slot = ioqq_read(ioqq); - return ioq_slot_pop(ioqq, slot, true); -} - -/** Pop an entry from the queue if one is available. */ -static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) { - size_t i = load(&ioqq->tail, relaxed); +static struct ioq_ent *ioqq_pop(struct ioqq *ioqq, bool block) { + size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed); ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask]; - - struct ioq_ent *ret = ioq_slot_pop(ioqq, slot, false); - if (ret) { - size_t j = exchange(&ioqq->tail, i + IOQ_STRIDE, relaxed); - bfs_assert(j == i, "Detected multiple consumers"); - (void)j; - } - - return ret; + return ioq_slot_pop(ioqq, slot, block); } /** Sentinel stop command. */ @@ -378,8 +495,6 @@ struct ioq_ring_state { struct ioq *ioq; /** The io_uring. */ struct io_uring *ring; - /** The current ioq->pending slot. */ - ioq_slot *slot; /** Number of prepped, unsubmitted SQEs. */ size_t prepped; /** Number of submitted, unreaped SQEs. */ @@ -394,20 +509,9 @@ static struct ioq_ent *ioq_ring_pop(struct ioq_ring_state *state) { return NULL; } - // Advance to the next slot if necessary - struct ioq *ioq = state->ioq; - if (!state->slot) { - state->slot = ioqq_read(ioq->pending); - } - // Block if we have nothing else to do bool block = !state->prepped && !state->submitted; - struct ioq_ent *ret = ioq_slot_pop(ioq->pending, state->slot, block); - - if (ret) { - // Got an entry, move to the next slot next time - state->slot = NULL; - } + struct ioq_ent *ret = ioqq_pop(state->ioq->pending, block); if (ret == &IOQ_STOP) { state->stop = true; @@ -536,7 +640,7 @@ static void ioq_sync_work(struct ioq_thread *thread) { struct ioq *ioq = thread->parent; while (true) { - struct ioq_ent *ent = ioqq_pop(ioq->pending); + struct ioq_ent *ent = ioqq_pop(ioq->pending, true); if (ent == &IOQ_STOP) { break; } @@ -687,20 +791,12 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) { return 0; } -struct ioq_ent *ioq_pop(struct ioq *ioq) { - if (ioq->size == 0) { - return NULL; - } - - return ioqq_pop(ioq->ready); -} - -struct ioq_ent *ioq_trypop(struct ioq *ioq) { +struct ioq_ent *ioq_pop(struct ioq *ioq, bool block) { if (ioq->size == 0) { return NULL; } - return ioqq_trypop(ioq->ready); + return ioqq_pop(ioq->ready, block); } void ioq_free(struct ioq *ioq, struct ioq_ent *ent) { diff --git a/src/ioq.h b/src/ioq.h index eab89ec..87727cb 100644 --- a/src/ioq.h +++ b/src/ioq.h @@ -8,6 +8,7 @@ #ifndef BFS_IOQ_H #define BFS_IOQ_H +#include "config.h" #include "dir.h" #include @@ -136,17 +137,7 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr); * @return * The next response, or NULL. */ -struct ioq_ent *ioq_pop(struct ioq *ioq); - -/** - * Pop a response from the queue, without blocking. - * - * @param ioq - * The I/O queue. - * @return - * The next response, or NULL. - */ -struct ioq_ent *ioq_trypop(struct ioq *ioq); +struct ioq_ent *ioq_pop(struct ioq *ioq, bool block); /** * Free a queue entry. -- cgit v1.2.3