summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bftw.c2
-rw-r--r--src/ioq.c276
-rw-r--r--src/ioq.h13
3 files changed, 189 insertions, 102 deletions
diff --git a/src/bftw.c b/src/bftw.c
index 5a55037..0b74cd9 100644
--- a/src/bftw.c
+++ b/src/bftw.c
@@ -588,7 +588,7 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) {
return -1;
}
- struct ioq_ent *ent = block ? ioq_pop(ioq) : ioq_trypop(ioq);
+ struct ioq_ent *ent = ioq_pop(ioq, block);
if (!ent) {
return -1;
}
diff --git a/src/ioq.c b/src/ioq.c
index 8c1bdbe..244b2cc 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -1,6 +1,127 @@
// Copyright © Tavian Barnes <tavianator@tavianator.com>
// SPDX-License-Identifier: 0BSD
+/**
+ * An asynchronous I/O queue implementation.
+ *
+ * struct ioq is composed of two separate queues:
+ *
+ * struct ioqq *pending; // Pending I/O requests
+ * struct ioqq *ready; // Ready I/O responses
+ *
+ * Worker threads pop requests from `pending`, execute them, and push them back
+ * to the `ready` queue. The main thread pushes requests to `pending` and pops
+ * them from `ready`.
+ *
+ * struct ioqq is a blocking MPMC queue (though it could be SPMC/MPSC for
+ * pending/ready respectively). It is implemented as a circular buffer:
+ *
+ * size_t mask; // (1 << N) - 1
+ * [padding]
+ * size_t head; // Writer index
+ * [padding]
+ * size_t tail; // Reader index
+ * [padding]
+ * ioq_slot slots[1 << N]; // Queue contents
+ *
+ * Pushes are implemented with an unconditional
+ *
+ * fetch_add(&ioqq->head, IOQ_STRIDE)
+ *
+ * which scales better on many architectures than compare-and-swap (see [1] for
+ * details). Pops are implemented similarly. We add IOQ_STRIDE rather than 1
+ * so that successive queue elements are on different cache lines, but the
+ * exposition below uses 1 for simplicity.
+ *
+ * Since the fetch-and-adds are unconditional, non-blocking readers can get
+ * ahead of writers:
+ *
+ * Reader Writer
+ * ──────────────── ──────────────────────
+ * head: 0 → 1
+ * slots[0]: empty
+ * tail: 0 → 1
+ * slots[0]: empty → full
+ * head: 1 → 2
+ * slots[1]: empty!
+ *
+ * To avoid this, non-blocking reads (ioqq_pop(ioqq, false)) must mark the slots
+ * somehow so that writers can skip them:
+ *
+ * Reader Writer
+ * ─────────────────────── ───────────────────────
+ * head: 0 → 1
+ * slots[0]: empty → skip
+ * tail: 0 → 1
+ * slots[0]: skip → empty
+ * tail: 1 → 2
+ * slots[1]: empty → full
+ * head: 1 → 2
+ * slots[1]: full → empty
+ *
+ * As well, a reader might "lap" a writer (or another reader), so slots need to
+ * count how many times they should be skipped:
+ *
+ * Reader Writer
+ * ────────────────────────── ─────────────────────────
+ * head: 0 → 1
+ * slots[0]: empty → skip(1)
+ * head: 1 → 2
+ * slots[1]: empty → skip(1)
+ * ...
+ * head: M → 0
+ * slots[M]: empty → skip(1)
+ * head: 0 → 1
+ * slots[0]: skip(1 → 2)
+ * tail: 0 → 1
+ * slots[0]: skip(2 → 1)
+ * tail: 1 → 2
+ * slots[1]: skip(1) → empty
+ * ...
+ * tail: M → 0
+ * slots[M]: skip(1) → empty
+ * tail: 0 → 1
+ * slots[0]: skip(1) → empty
+ * tail: 1 → 2
+ * slots[1]: empty → full
+ * head: 1 → 2
+ * slots[1]: full → empty
+ *
+ * As described in [1], this approach is susceptible to livelock if readers stay
+ * ahead of writers. This is okay for us because we don't retry failed non-
+ * blocking reads.
+ *
+ * The slot representation uses tag bits to hold either a pointer or skip(N):
+ *
+ * IOQ_SKIP (highest bit) IOQ_BLOCKED (lowest bit)
+ * ↓ ↓
+ * 0 0 0 ... 0 0 0
+ * └──────────┬──────────┘
+ * │
+ * value bits
+ *
+ * If IOQ_SKIP is unset, the value bits hold a pointer (or zero/NULL for empty).
+ * If IOQ_SKIP is set, the value bits hold a negative skip count. Writers can
+ * reduce the skip count by adding 1 to the value bits, and when the count hits
+ * zero, the carry will automatically clear IOQ_SKIP:
+ *
+ * IOQ_SKIP IOQ_BLOCKED
+ * ↓ ↓
+ * 1 1 1 ... 1 0 0 skip(2)
+ * 1 1 1 ... 1 1 0 skip(1)
+ * 0 0 0 ... 0 0 0 empty
+ *
+ * The IOQ_BLOCKED flag is used to track sleeping waiters, futex-style. To wait
+ * for a slot to change, waiters call ioq_slot_wait() which sets IOQ_BLOCKED and
+ * goes to sleep. Whenever a slot is updated, if the old value had IOQ_BLOCKED
+ * set, ioq_slot_wake() must be called to wake up that waiter.
+ *
+ * Blocking/waking uses a pool of monitors (mutex, condition variable pairs).
+ * Slots are assigned round-robin to a monitor from the pool.
+ *
+ * [1]: https://arxiv.org/abs/2201.02179
+ */
+
#include "ioq.h"
#include "alloc.h"
#include "atomic.h"
@@ -51,24 +172,15 @@ static void ioq_monitor_destroy(struct ioq_monitor *monitor) {
/** A single entry in a command queue. */
typedef atomic uintptr_t ioq_slot;
-/** Slot flag bit to indicate waiters. */
+/** Someone might be waiting on this slot. */
#define IOQ_BLOCKED ((uintptr_t)1)
-bfs_static_assert(alignof(struct ioq_ent) > 1);
-
-/** Check if a slot has waiters. */
-static bool ioq_slot_blocked(uintptr_t value) {
- return value & IOQ_BLOCKED;
-}
-
-/** Extract the pointer from a slot. */
-static struct ioq_ent *ioq_slot_ptr(uintptr_t value) {
- return (struct ioq_ent *)(value & ~IOQ_BLOCKED);
-}
+/** The next push(es) should skip this slot. */
+#define IOQ_SKIP ((uintptr_t)1 << (UINTPTR_WIDTH - 1))
+/** Amount to add for an additional skip. */
+#define IOQ_SKIP_ONE (~IOQ_BLOCKED)
-/** Check if a slot is empty. */
-static bool ioq_slot_empty(uintptr_t value) {
- return !ioq_slot_ptr(value);
-}
+// Need room for two flag bits
+bfs_static_assert(alignof(struct ioq_ent) > 2);
/**
* An MPMC queue of I/O commands.
@@ -205,80 +317,85 @@ static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) {
cond_broadcast(&monitor->cond);
}
-/** Get the next slot for writing. */
-static ioq_slot *ioqq_write(struct ioqq *ioqq) {
- size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed);
- return &ioqq->slots[i & ioqq->slot_mask];
-}
-
/** Push an entry into a slot. */
-static void ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent) {
- uintptr_t addr = (uintptr_t)ent;
- bfs_assert(!ioq_slot_blocked(addr));
-
+static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent) {
uintptr_t prev = load(slot, relaxed);
- do {
- while (!ioq_slot_empty(prev)) {
+ while (true) {
+ uintptr_t next;
+ if (prev & IOQ_SKIP) {
+ // skip(1) → empty
+ // skip(n) → skip(n - 1)
+ next = (prev - IOQ_SKIP_ONE) & ~IOQ_BLOCKED;
+ } else if (prev > IOQ_BLOCKED) {
+ // full(ptr) → wait
prev = ioq_slot_wait(ioqq, slot, prev);
+ continue;
+ } else {
+ // empty → full(ptr)
+ next = (uintptr_t)ent >> 1;
}
- } while (!compare_exchange_weak(slot, &prev, addr, release, relaxed));
- if (ioq_slot_blocked(prev)) {
+ if (compare_exchange_weak(slot, &prev, next, release, relaxed)) {
+ break;
+ }
+ }
+
+ if (prev & IOQ_BLOCKED) {
ioq_slot_wake(ioqq, slot);
}
+
+ return !(prev & IOQ_SKIP);
}
/** Push an entry onto the queue. */
static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) {
- ioq_slot *slot = ioqq_write(ioqq);
- ioq_slot_push(ioqq, slot, ent);
-}
-
-/** Get the next slot for reading. */
-static ioq_slot *ioqq_read(struct ioqq *ioqq) {
- size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed);
- return &ioqq->slots[i & ioqq->slot_mask];
+ while (true) {
+ size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed);
+ ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask];
+ if (ioq_slot_push(ioqq, slot, ent)) {
+ break;
+ }
+ }
}
/** (Try to) pop an entry from a slot. */
static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) {
uintptr_t prev = load(slot, relaxed);
- do {
- while (ioq_slot_empty(prev)) {
- if (block) {
- prev = ioq_slot_wait(ioqq, slot, prev);
- } else {
- return NULL;
- }
+ while (true) {
+ // empty → skip(1)
+ // skip(n) → skip(n + 1)
+ // full(ptr) → full(ptr - 1)
+ uintptr_t next = prev + IOQ_SKIP_ONE;
+ // skip(n) → ~IOQ_BLOCKED
+ // full(ptr) → 0
+ next &= (next & IOQ_SKIP) ? ~IOQ_BLOCKED : 0;
+
+ if (block && next) {
+ prev = ioq_slot_wait(ioqq, slot, prev);
+ continue;
+ }
+
+ if (compare_exchange_weak(slot, &prev, next, acquire, relaxed)) {
+ break;
}
- } while (!compare_exchange_weak(slot, &prev, 0, acquire, relaxed));
+ }
- if (ioq_slot_blocked(prev)) {
+ if (prev & IOQ_BLOCKED) {
ioq_slot_wake(ioqq, slot);
}
- return ioq_slot_ptr(prev);
+ // empty → 0
+ // skip(n) → 0
+ // full(ptr) → ptr
+ prev &= (prev & IOQ_SKIP) ? 0 : ~IOQ_BLOCKED;
+ return (struct ioq_ent *)(prev << 1);
}
/** Pop an entry from the queue. */
-static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) {
- ioq_slot *slot = ioqq_read(ioqq);
- return ioq_slot_pop(ioqq, slot, true);
-}
-
-/** Pop an entry from the queue if one is available. */
-static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) {
- size_t i = load(&ioqq->tail, relaxed);
+static struct ioq_ent *ioqq_pop(struct ioqq *ioqq, bool block) {
+ size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed);
ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask];
-
- struct ioq_ent *ret = ioq_slot_pop(ioqq, slot, false);
- if (ret) {
- size_t j = exchange(&ioqq->tail, i + IOQ_STRIDE, relaxed);
- bfs_assert(j == i, "Detected multiple consumers");
- (void)j;
- }
-
- return ret;
+ return ioq_slot_pop(ioqq, slot, block);
}
/** Sentinel stop command. */
@@ -378,8 +495,6 @@ struct ioq_ring_state {
struct ioq *ioq;
/** The io_uring. */
struct io_uring *ring;
- /** The current ioq->pending slot. */
- ioq_slot *slot;
/** Number of prepped, unsubmitted SQEs. */
size_t prepped;
/** Number of submitted, unreaped SQEs. */
@@ -394,20 +509,9 @@ static struct ioq_ent *ioq_ring_pop(struct ioq_ring_state *state) {
return NULL;
}
- // Advance to the next slot if necessary
- struct ioq *ioq = state->ioq;
- if (!state->slot) {
- state->slot = ioqq_read(ioq->pending);
- }
-
// Block if we have nothing else to do
bool block = !state->prepped && !state->submitted;
- struct ioq_ent *ret = ioq_slot_pop(ioq->pending, state->slot, block);
-
- if (ret) {
- // Got an entry, move to the next slot next time
- state->slot = NULL;
- }
+ struct ioq_ent *ret = ioqq_pop(state->ioq->pending, block);
if (ret == &IOQ_STOP) {
state->stop = true;
@@ -536,7 +640,7 @@ static void ioq_sync_work(struct ioq_thread *thread) {
struct ioq *ioq = thread->parent;
while (true) {
- struct ioq_ent *ent = ioqq_pop(ioq->pending);
+ struct ioq_ent *ent = ioqq_pop(ioq->pending, true);
if (ent == &IOQ_STOP) {
break;
}
@@ -687,20 +791,12 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) {
return 0;
}
-struct ioq_ent *ioq_pop(struct ioq *ioq) {
- if (ioq->size == 0) {
- return NULL;
- }
-
- return ioqq_pop(ioq->ready);
-}
-
-struct ioq_ent *ioq_trypop(struct ioq *ioq) {
+struct ioq_ent *ioq_pop(struct ioq *ioq, bool block) {
if (ioq->size == 0) {
return NULL;
}
- return ioqq_trypop(ioq->ready);
+ return ioqq_pop(ioq->ready, block);
}
void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {
diff --git a/src/ioq.h b/src/ioq.h
index eab89ec..87727cb 100644
--- a/src/ioq.h
+++ b/src/ioq.h
@@ -8,6 +8,7 @@
#ifndef BFS_IOQ_H
#define BFS_IOQ_H
+#include "config.h"
#include "dir.h"
#include <stddef.h>
@@ -136,17 +137,7 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr);
* @return
* The next response, or NULL.
*/
-struct ioq_ent *ioq_pop(struct ioq *ioq);
-
-/**
- * Pop a response from the queue, without blocking.
- *
- * @param ioq
- * The I/O queue.
- * @return
- * The next response, or NULL.
- */
-struct ioq_ent *ioq_trypop(struct ioq *ioq);
+struct ioq_ent *ioq_pop(struct ioq *ioq, bool block);
/**
* Free a queue entry.