summaryrefslogtreecommitdiffstats
path: root/src/ioq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ioq.c')
-rw-r--r--src/ioq.c604
1 files changed, 417 insertions, 187 deletions
diff --git a/src/ioq.c b/src/ioq.c
index 43a1b35..57eb4a5 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -118,24 +118,27 @@
* [1]: https://arxiv.org/abs/2201.02179
*/
-#include "prelude.h"
#include "ioq.h"
+
#include "alloc.h"
#include "atomic.h"
+#include "bfs.h"
#include "bfstd.h"
#include "bit.h"
#include "diag.h"
#include "dir.h"
#include "stat.h"
#include "thread.h"
+
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/stat.h>
+#include <unistd.h>
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
# include <liburing.h>
#endif
@@ -180,8 +183,7 @@ typedef atomic uintptr_t ioq_slot;
/** Amount to add for an additional skip. */
#define IOQ_SKIP_ONE (~IOQ_BLOCKED)
-// Need room for two flag bits
-bfs_static_assert(alignof(struct ioq_ent) >= (1 << 2));
+static_assert(alignof(struct ioq_ent) >= (1 << 2), "struct ioq_ent is underaligned");
/**
* An MPMC queue of I/O commands.
@@ -201,7 +203,7 @@ struct ioqq {
cache_align atomic size_t tail;
/** The circular buffer itself. */
- cache_align ioq_slot slots[];
+ cache_align ioq_slot slots[]; // _counted_by(slot_mask + 1)
};
/** Destroy an I/O command queue. */
@@ -258,17 +260,45 @@ static struct ioqq *ioqq_create(size_t size) {
/** Get the monitor associated with a slot. */
static struct ioq_monitor *ioq_slot_monitor(struct ioqq *ioqq, ioq_slot *slot) {
- size_t i = slot - ioqq->slots;
+ uint32_t i = slot - ioqq->slots;
+
+ // Hash the index to de-correlate waiters
+ // https://nullprogram.com/blog/2018/07/31/
+ // https://github.com/skeeto/hash-prospector/issues/19#issuecomment-1120105785
+ i ^= i >> 16;
+ i *= UINT32_C(0x21f0aaad);
+ i ^= i >> 15;
+ i *= UINT32_C(0x735a2d97);
+ i ^= i >> 15;
+
return &ioqq->monitors[i & ioqq->monitor_mask];
}
/** Atomically wait for a slot to change. */
-attr(noinline)
+_noinline
static uintptr_t ioq_slot_wait(struct ioqq *ioqq, ioq_slot *slot, uintptr_t value) {
+ uintptr_t ret;
+
+ // Try spinning a few times (with exponential backoff) before blocking
+ _nounroll
+ for (int i = 1; i < 1024; i *= 2) {
+ _nounroll
+ for (int j = 0; j < i; ++j) {
+ spin_loop();
+ }
+
+ // Check if the slot changed
+ ret = load(slot, relaxed);
+ if (ret != value) {
+ return ret;
+ }
+ }
+
+ // Nothing changed, start blocking
struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot);
mutex_lock(&monitor->mutex);
- uintptr_t ret = load(slot, relaxed);
+ ret = load(slot, relaxed);
if (ret != value) {
goto done;
}
@@ -293,7 +323,7 @@ done:
}
/** Wake up any threads waiting on a slot. */
-attr(noinline)
+_noinline
static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) {
struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot);
@@ -313,9 +343,11 @@ static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) {
cond_broadcast(&monitor->cond);
}
-/** Branch-free (slot & IOQ_SKIP) ? ~IOQ_BLOCKED : 0 */
-static uintptr_t ioq_skip_mask(uintptr_t slot) {
- return -(slot >> IOQ_SKIP_BIT) << 1;
+/** Branch-free ((slot & IOQ_SKIP) ? skip : full) & ~IOQ_BLOCKED */
+static uintptr_t ioq_slot_blend(uintptr_t slot, uintptr_t skip, uintptr_t full) {
+ uintptr_t mask = -(slot >> IOQ_SKIP_BIT);
+ uintptr_t ret = (skip & mask) | (full & ~mask);
+ return ret & ~IOQ_BLOCKED;
}
/** Push an entry into a slot. */
@@ -323,19 +355,18 @@ static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent
uintptr_t prev = load(slot, relaxed);
while (true) {
- size_t skip_mask = ioq_skip_mask(prev);
- size_t full_mask = ~skip_mask & ~IOQ_BLOCKED;
- if (prev & full_mask) {
+ uintptr_t full = ioq_slot_blend(prev, 0, prev);
+ if (full) {
// full(ptr) → wait
prev = ioq_slot_wait(ioqq, slot, prev);
continue;
}
// empty → full(ptr)
- uintptr_t next = ((uintptr_t)ent >> 1) & full_mask;
+ uintptr_t next = (uintptr_t)ent >> 1;
// skip(1) → empty
// skip(n) → skip(n - 1)
- next |= (prev - IOQ_SKIP_ONE) & skip_mask;
+ next = ioq_slot_blend(prev, prev - IOQ_SKIP_ONE, next);
if (compare_exchange_weak(slot, &prev, next, release, relaxed)) {
break;
@@ -353,13 +384,20 @@ static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent
static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) {
uintptr_t prev = load(slot, relaxed);
while (true) {
+#if __has_builtin(__builtin_prefetch)
+ // Optimistically prefetch the pointer in this slot. If this
+ // slot is not full, this will prefetch an invalid address, but
+ // experimentally this is worth it on both Intel (Alder Lake)
+ // and AMD (Zen 2).
+ __builtin_prefetch((void *)(prev << 1), 1 /* write */);
+#endif
+
// empty → skip(1)
// skip(n) → skip(n + 1)
// full(ptr) → full(ptr - 1)
uintptr_t next = prev + IOQ_SKIP_ONE;
- // skip(n) → ~IOQ_BLOCKED
// full(ptr) → 0
- next &= ioq_skip_mask(next);
+ next = ioq_slot_blend(next, next, 0);
if (block && next) {
prev = ioq_slot_wait(ioqq, slot, prev);
@@ -378,7 +416,7 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc
// empty → 0
// skip(n) → 0
// full(ptr) → ptr
- prev &= ioq_skip_mask(~prev);
+ prev = ioq_slot_blend(prev, 0, prev);
return (struct ioq_ent *)(prev << 1);
}
@@ -408,13 +446,6 @@ static void ioqq_push_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t s
} while (size > 0);
}
-/** Pop an entry from the queue. */
-static struct ioq_ent *ioqq_pop(struct ioqq *ioqq, bool block) {
- size_t i = fetch_add(&ioqq->tail, 1, relaxed);
- ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask];
- return ioq_slot_pop(ioqq, slot, block);
-}
-
/** Pop a batch of entries from the queue. */
static void ioqq_pop_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t size, bool block) {
size_t mask = ioqq->slot_mask;
@@ -430,36 +461,83 @@ static void ioqq_pop_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t si
#define IOQ_BATCH (FALSE_SHARING_SIZE / sizeof(ioq_slot))
/**
- * A batch of entries to send all at once.
+ * A batch of I/O queue entries.
*/
struct ioq_batch {
- /** The current batch size. */
- size_t size;
+ /** The start of the batch. */
+ size_t head;
+ /** The end of the batch. */
+ size_t tail;
/** The array of entries. */
struct ioq_ent *entries[IOQ_BATCH];
};
-/** Send the batch to a queue. */
+/** Reset a batch. */
+static void ioq_batch_reset(struct ioq_batch *batch) {
+ batch->head = batch->tail = 0;
+}
+
+/** Check if a batch is empty. */
+static bool ioq_batch_empty(const struct ioq_batch *batch) {
+ return batch->head >= batch->tail;
+}
+
+/** Send a batch to a queue. */
static void ioq_batch_flush(struct ioqq *ioqq, struct ioq_batch *batch) {
- if (batch->size > 0) {
- ioqq_push_batch(ioqq, batch->entries, batch->size);
- batch->size = 0;
+ if (batch->tail > 0) {
+ ioqq_push_batch(ioqq, batch->entries, batch->tail);
+ ioq_batch_reset(batch);
}
}
-/** An an entry to a batch, flushing if necessary. */
+/** Push an entry to a batch, flushing if necessary. */
static void ioq_batch_push(struct ioqq *ioqq, struct ioq_batch *batch, struct ioq_ent *ent) {
- if (batch->size >= IOQ_BATCH) {
+ batch->entries[batch->tail++] = ent;
+
+ if (batch->tail >= IOQ_BATCH) {
ioq_batch_flush(ioqq, batch);
}
+}
+
+/** Fill a batch from a queue. */
+static bool ioq_batch_fill(struct ioqq *ioqq, struct ioq_batch *batch, bool block) {
+ ioqq_pop_batch(ioqq, batch->entries, IOQ_BATCH, block);
+
+ ioq_batch_reset(batch);
+ for (size_t i = 0; i < IOQ_BATCH; ++i) {
+ struct ioq_ent *ent = batch->entries[i];
+ if (ent) {
+ batch->entries[batch->tail++] = ent;
+ }
+ }
+
+ return batch->tail > 0;
+}
+
+/** Pop an entry from a batch, filling it first if necessary. */
+static struct ioq_ent *ioq_batch_pop(struct ioqq *ioqq, struct ioq_batch *batch, bool block) {
+ if (ioq_batch_empty(batch)) {
+ // For non-blocking pops, make sure that each ioq_batch_pop()
+ // corresponds to a single (amortized) increment of ioqq->head.
+ // Otherwise, we start skipping many slots and batching ends up
+ // degrading performance.
+ if (!block && batch->head < IOQ_BATCH) {
+ ++batch->head;
+ return NULL;
+ }
+
+ if (!ioq_batch_fill(ioqq, batch, block)) {
+ return NULL;
+ }
+ }
- batch->entries[batch->size++] = ent;
+ return batch->entries[batch->head++];
}
/** Sentinel stop command. */
static struct ioq_ent IOQ_STOP;
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
/**
* Supported io_uring operations.
*/
@@ -477,7 +555,7 @@ struct ioq_thread {
/** Pointer back to the I/O queue. */
struct ioq *parent;
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
/** io_uring instance. */
struct io_uring ring;
/** Any error that occurred initializing the ring. */
@@ -497,20 +575,25 @@ struct ioq {
/** ioq_ent arena. */
struct arena ents;
-#if BFS_USE_LIBURING && BFS_USE_STATX
+#if BFS_WITH_LIBURING && BFS_USE_STATX
/** struct statx arena. */
struct arena xbufs;
#endif
- /** Pending I/O requests. */
+ /** Pending I/O request queue. */
struct ioqq *pending;
- /** Ready I/O responses. */
+ /** Ready I/O response queue. */
struct ioqq *ready;
+ /** Pending request batch. */
+ struct ioq_batch pending_batch;
+ /** Ready request batch. */
+ struct ioq_batch ready_batch;
+
/** The number of background threads. */
size_t nthreads;
/** The background threads themselves. */
- struct ioq_thread threads[];
+ struct ioq_thread threads[] _counted_by(nthreads);
};
/** Cancel a request if we need to. */
@@ -531,6 +614,14 @@ static bool ioq_check_cancel(struct ioq *ioq, struct ioq_ent *ent) {
/** Dispatch a single request synchronously. */
static void ioq_dispatch_sync(struct ioq *ioq, struct ioq_ent *ent) {
switch (ent->op) {
+ case IOQ_NOP:
+ if (ent->nop.type == IOQ_NOP_HEAVY) {
+ // A fast, no-op syscall
+ getppid();
+ }
+ ent->result = 0;
+ return;
+
case IOQ_CLOSE:
ent->result = try(xclose(ent->close.fd));
return;
@@ -559,7 +650,7 @@ static void ioq_dispatch_sync(struct ioq *ioq, struct ioq_ent *ent) {
ent->result = -ENOSYS;
}
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
/** io_uring worker state. */
struct ioq_ring_state {
@@ -579,23 +670,161 @@ struct ioq_ring_state {
struct ioq_batch ready;
};
+/** Reap a single CQE. */
+static void ioq_reap_cqe(struct ioq_ring_state *state, struct io_uring_cqe *cqe) {
+ struct ioq *ioq = state->ioq;
+
+ struct ioq_ent *ent = io_uring_cqe_get_data(cqe);
+ ent->result = cqe->res;
+
+ if (ent->result < 0) {
+ goto push;
+ }
+
+ switch (ent->op) {
+ case IOQ_OPENDIR: {
+ int fd = ent->result;
+ if (ioq_check_cancel(ioq, ent)) {
+ xclose(fd);
+ goto push;
+ }
+
+ struct ioq_opendir *args = &ent->opendir;
+ ent->result = try(bfs_opendir(args->dir, fd, NULL, args->flags));
+ if (ent->result >= 0) {
+ // TODO: io_uring_prep_getdents()
+ bfs_polldir(args->dir);
+ } else {
+ xclose(fd);
+ }
+
+ break;
+ }
+
+#if BFS_USE_STATX
+ case IOQ_STAT: {
+ struct ioq_stat *args = &ent->stat;
+ ent->result = try(bfs_statx_convert(args->buf, args->xbuf));
+ break;
+ }
+#endif
+
+ default:
+ break;
+ }
+
+push:
+ ioq_batch_push(ioq->ready, &state->ready, ent);
+}
+
+/** Wait for submitted requests to complete. */
+static void ioq_ring_drain(struct ioq_ring_state *state, size_t wait_nr) {
+ struct ioq *ioq = state->ioq;
+ struct io_uring *ring = state->ring;
+
+ bfs_assert(wait_nr <= state->submitted);
+
+ while (state->submitted > 0) {
+ struct io_uring_cqe *cqe;
+ if (wait_nr > 0) {
+ io_uring_wait_cqes(ring, &cqe, wait_nr, NULL, NULL);
+ }
+
+ unsigned int head;
+ size_t seen = 0;
+ io_uring_for_each_cqe (ring, head, cqe) {
+ ioq_reap_cqe(state, cqe);
+ ++seen;
+ }
+
+ io_uring_cq_advance(ring, seen);
+ state->submitted -= seen;
+
+ if (seen >= wait_nr) {
+ break;
+ }
+ wait_nr -= seen;
+ }
+
+ ioq_batch_flush(ioq->ready, &state->ready);
+}
+
+/** Submit prepped SQEs, and wait for some to complete. */
+static void ioq_ring_submit(struct ioq_ring_state *state) {
+ struct io_uring *ring = state->ring;
+
+ size_t unreaped = state->prepped + state->submitted;
+ size_t wait_nr = 0;
+
+ if (state->prepped == 0 && unreaped > 0) {
+ // If we have no new SQEs, wait for at least one old one to
+ // complete, to avoid livelock
+ wait_nr = 1;
+ }
+
+ if (unreaped > ring->sq.ring_entries) {
+ // Keep the completion queue below half full
+ wait_nr = unreaped - ring->sq.ring_entries;
+ }
+
+ // Submit all prepped SQEs
+ while (state->prepped > 0) {
+ int ret = io_uring_submit_and_wait(state->ring, wait_nr);
+ if (ret <= 0) {
+ continue;
+ }
+
+ state->submitted += ret;
+ state->prepped -= ret;
+ if (state->prepped > 0) {
+ // In the unlikely event of a short submission, any SQE
+ // links will be broken. Wait for all SQEs to complete
+ // to preserve any ordering requirements.
+ ioq_ring_drain(state, state->submitted);
+ wait_nr = 0;
+ }
+ }
+
+ // Drain all the CQEs we waited for (and any others that are ready)
+ ioq_ring_drain(state, wait_nr);
+}
+
+/** Reserve space for a number of SQEs, submitting if necessary. */
+static void ioq_reserve_sqes(struct ioq_ring_state *state, unsigned int count) {
+ while (io_uring_sq_space_left(state->ring) < count) {
+ ioq_ring_submit(state);
+ }
+}
+
+/** Get an SQE, submitting if necessary. */
+static struct io_uring_sqe *ioq_get_sqe(struct ioq_ring_state *state) {
+ ioq_reserve_sqes(state, 1);
+ return io_uring_get_sqe(state->ring);
+}
+
/** Dispatch a single request asynchronously. */
static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, struct ioq_ent *ent) {
- struct io_uring *ring = state->ring;
enum ioq_ring_ops ops = state->ops;
struct io_uring_sqe *sqe = NULL;
switch (ent->op) {
+ case IOQ_NOP:
+ if (ent->nop.type == IOQ_NOP_HEAVY) {
+ sqe = ioq_get_sqe(state);
+ io_uring_prep_nop(sqe);
+ }
+ return sqe;
+
case IOQ_CLOSE:
if (ops & IOQ_RING_CLOSE) {
- sqe = io_uring_get_sqe(ring);
+ sqe = ioq_get_sqe(state);
io_uring_prep_close(sqe, ent->close.fd);
}
return sqe;
case IOQ_OPENDIR:
if (ops & IOQ_RING_OPENAT) {
- sqe = io_uring_get_sqe(ring);
+ sqe = ioq_get_sqe(state);
struct ioq_opendir *args = &ent->opendir;
int flags = O_RDONLY | O_CLOEXEC | O_DIRECTORY;
io_uring_prep_openat(sqe, args->dfd, args->path, flags, 0);
@@ -605,7 +834,7 @@ static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, str
case IOQ_CLOSEDIR:
#if BFS_USE_UNWRAPDIR
if (ops & IOQ_RING_CLOSE) {
- sqe = io_uring_get_sqe(ring);
+ sqe = ioq_get_sqe(state);
io_uring_prep_close(sqe, bfs_unwrapdir(ent->closedir.dir));
}
#endif
@@ -614,10 +843,10 @@ static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, str
case IOQ_STAT:
#if BFS_USE_STATX
if (ops & IOQ_RING_STATX) {
- sqe = io_uring_get_sqe(ring);
+ sqe = ioq_get_sqe(state);
struct ioq_stat *args = &ent->stat;
int flags = bfs_statx_flags(args->flags);
- unsigned int mask = STATX_BASIC_STATS | STATX_BTIME;
+ unsigned int mask = bfs_statx_mask();
io_uring_prep_statx(sqe, args->dfd, args->path, flags, mask, args->xbuf);
}
#endif
@@ -630,7 +859,7 @@ static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, str
/** Check if ioq_ring_reap() has work to do. */
static bool ioq_ring_empty(struct ioq_ring_state *state) {
- return !state->prepped && !state->submitted && !state->ready.size;
+ return !state->prepped && !state->submitted && ioq_batch_empty(&state->ready);
}
/** Prep a single SQE. */
@@ -658,163 +887,94 @@ static bool ioq_ring_prep(struct ioq_ring_state *state) {
}
struct ioq *ioq = state->ioq;
- struct io_uring *ring = state->ring;
- struct ioq_ent *pending[IOQ_BATCH];
-
- while (io_uring_sq_space_left(ring) >= IOQ_BATCH) {
- bool block = ioq_ring_empty(state);
- ioqq_pop_batch(ioq->pending, pending, IOQ_BATCH, block);
-
- bool any = false;
- for (size_t i = 0; i < IOQ_BATCH; ++i) {
- struct ioq_ent *ent = pending[i];
- if (ent == &IOQ_STOP) {
- ioqq_push(ioq->pending, &IOQ_STOP);
- state->stop = true;
- goto done;
- } else if (ent) {
- ioq_prep_sqe(state, ent);
- any = true;
- }
- }
-
- if (!any) {
- break;
- }
- }
-
-done:
- return !ioq_ring_empty(state);
-}
-
-/** Reap a single CQE. */
-static void ioq_reap_cqe(struct ioq_ring_state *state, struct io_uring_cqe *cqe) {
- struct ioq *ioq = state->ioq;
- struct io_uring *ring = state->ring;
-
- struct ioq_ent *ent = io_uring_cqe_get_data(cqe);
- ent->result = cqe->res;
- io_uring_cqe_seen(ring, cqe);
- --state->submitted;
-
- if (ent->result < 0) {
- goto push;
- }
- switch (ent->op) {
- case IOQ_OPENDIR: {
- int fd = ent->result;
- if (ioq_check_cancel(ioq, ent)) {
- xclose(fd);
- goto push;
- }
-
- struct ioq_opendir *args = &ent->opendir;
- ent->result = try(bfs_opendir(args->dir, fd, NULL, args->flags));
- if (ent->result >= 0) {
- // TODO: io_uring_prep_getdents()
- bfs_polldir(args->dir);
- } else {
- xclose(fd);
- }
+ struct ioq_batch pending;
+ ioq_batch_reset(&pending);
+ while (true) {
+ bool block = ioq_ring_empty(state);
+ struct ioq_ent *ent = ioq_batch_pop(ioq->pending, &pending, block);
+ if (ent == &IOQ_STOP) {
+ ioqq_push(ioq->pending, ent);
+ state->stop = true;
break;
- }
-
-#if BFS_USE_STATX
- case IOQ_STAT: {
- struct ioq_stat *args = &ent->stat;
- ent->result = try(bfs_statx_convert(args->buf, args->xbuf));
+ } else if (ent) {
+ ioq_prep_sqe(state, ent);
+ } else {
break;
}
-#endif
-
- default:
- break;
}
-push:
- ioq_batch_push(ioq->ready, &state->ready, ent);
+ bfs_assert(ioq_batch_empty(&pending));
+ return !ioq_ring_empty(state);
}
-/** Reap a batch of CQEs. */
-static void ioq_ring_reap(struct ioq_ring_state *state) {
- struct ioq *ioq = state->ioq;
- struct io_uring *ring = state->ring;
+/** io_uring worker loop. */
+static int ioq_ring_work(struct ioq_thread *thread) {
+ struct io_uring *ring = &thread->ring;
- while (state->prepped) {
- int ret = io_uring_submit_and_wait(ring, 1);
- if (ret > 0) {
- state->prepped -= ret;
- state->submitted += ret;
+#ifdef IORING_SETUP_R_DISABLED
+ if (ring->flags & IORING_SETUP_R_DISABLED) {
+ if (io_uring_enable_rings(ring) != 0) {
+ return -1;
}
}
+#endif
- while (state->submitted) {
- struct io_uring_cqe *cqe;
- if (io_uring_wait_cqe(ring, &cqe) < 0) {
- continue;
- }
-
- ioq_reap_cqe(state, cqe);
- }
-
- ioq_batch_flush(ioq->ready, &state->ready);
-}
-
-/** io_uring worker loop. */
-static void ioq_ring_work(struct ioq_thread *thread) {
struct ioq_ring_state state = {
.ioq = thread->parent,
- .ring = &thread->ring,
+ .ring = ring,
.ops = thread->ring_ops,
};
while (ioq_ring_prep(&state)) {
- ioq_ring_reap(&state);
+ ioq_ring_submit(&state);
}
+
+ ioq_ring_drain(&state, state.submitted);
+ return 0;
}
-#endif // BFS_USE_LIBURING
+#endif // BFS_WITH_LIBURING
/** Synchronous syscall loop. */
static void ioq_sync_work(struct ioq_thread *thread) {
struct ioq *ioq = thread->parent;
- bool stop = false;
- while (!stop) {
- struct ioq_ent *pending[IOQ_BATCH];
- ioqq_pop_batch(ioq->pending, pending, IOQ_BATCH, true);
-
- struct ioq_batch ready;
- ready.size = 0;
-
- for (size_t i = 0; i < IOQ_BATCH; ++i) {
- struct ioq_ent *ent = pending[i];
- if (ent == &IOQ_STOP) {
- ioqq_push(ioq->pending, &IOQ_STOP);
- stop = true;
- break;
- } else if (ent) {
- if (!ioq_check_cancel(ioq, ent)) {
- ioq_dispatch_sync(ioq, ent);
- }
- ioq_batch_push(ioq->ready, &ready, ent);
- }
+ struct ioq_batch pending, ready;
+ ioq_batch_reset(&pending);
+ ioq_batch_reset(&ready);
+
+ while (true) {
+ if (ioq_batch_empty(&pending)) {
+ ioq_batch_flush(ioq->ready, &ready);
}
- ioq_batch_flush(ioq->ready, &ready);
+ struct ioq_ent *ent = ioq_batch_pop(ioq->pending, &pending, true);
+ if (ent == &IOQ_STOP) {
+ ioqq_push(ioq->pending, ent);
+ break;
+ }
+
+ if (!ioq_check_cancel(ioq, ent)) {
+ ioq_dispatch_sync(ioq, ent);
+ }
+ ioq_batch_push(ioq->ready, &ready, ent);
}
+
+ bfs_assert(ioq_batch_empty(&pending));
+ ioq_batch_flush(ioq->ready, &ready);
}
/** Background thread entry point. */
static void *ioq_work(void *ptr) {
struct ioq_thread *thread = ptr;
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
if (thread->ring_err == 0) {
- ioq_ring_work(thread);
- return NULL;
+ if (ioq_ring_work(thread) == 0) {
+ return NULL;
+ }
}
#endif
@@ -822,9 +982,30 @@ static void *ioq_work(void *ptr) {
return NULL;
}
+#if BFS_WITH_LIBURING
+/** Test whether some io_uring setup flags are supported. */
+static bool ioq_ring_probe_flags(struct io_uring_params *params, unsigned int flags) {
+ unsigned int saved = params->flags;
+ params->flags |= flags;
+
+ struct io_uring ring;
+ int ret = io_uring_queue_init_params(2, &ring, params);
+ if (ret == 0) {
+ io_uring_queue_exit(&ring);
+ }
+
+ if (ret == -EINVAL) {
+ params->flags = saved;
+ return false;
+ }
+
+ return true;
+}
+#endif
+
/** Initialize io_uring thread state. */
static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) {
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
struct ioq_thread *prev = NULL;
if (thread > ioq->threads) {
prev = thread - 1;
@@ -835,11 +1016,31 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) {
return -1;
}
- // Share io-wq workers between rings
struct io_uring_params params = {0};
+
if (prev) {
- params.flags |= IORING_SETUP_ATTACH_WQ;
+ // Share io-wq workers between rings
+ params.flags = prev->ring.flags | IORING_SETUP_ATTACH_WQ;
params.wq_fd = prev->ring.ring_fd;
+ } else {
+#ifdef IORING_SETUP_SUBMIT_ALL
+ // Don't abort submission just because an inline request fails
+ ioq_ring_probe_flags(&params, IORING_SETUP_SUBMIT_ALL);
+#endif
+
+#ifdef IORING_SETUP_R_DISABLED
+ // Don't enable the ring yet (needed for SINGLE_ISSUER)
+ if (ioq_ring_probe_flags(&params, IORING_SETUP_R_DISABLED)) {
+# ifdef IORING_SETUP_SINGLE_ISSUER
+ // Allow optimizations assuming only one task submits SQEs
+ ioq_ring_probe_flags(&params, IORING_SETUP_SINGLE_ISSUER);
+# endif
+# ifdef IORING_SETUP_DEFER_TASKRUN
+ // Don't interrupt us aggressively with completion events
+ ioq_ring_probe_flags(&params, IORING_SETUP_DEFER_TASKRUN);
+# endif
+ }
+#endif
}
// Use a page for each SQE ring
@@ -877,6 +1078,7 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) {
return -1;
}
+#if BFS_HAS_IO_URING_MAX_WORKERS
// Limit the number of io_uring workers
unsigned int values[] = {
ioq->nthreads, // [IO_WQ_BOUND]
@@ -885,12 +1087,14 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) {
io_uring_register_iowq_max_workers(&thread->ring, values);
#endif
+#endif // BFS_WITH_LIBURING
+
return 0;
}
/** Destroy an io_uring. */
static void ioq_ring_exit(struct ioq_thread *thread) {
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
if (thread->ring_err == 0) {
io_uring_queue_exit(&thread->ring);
}
@@ -898,7 +1102,8 @@ static void ioq_ring_exit(struct ioq_thread *thread) {
}
/** Create an I/O queue thread. */
-static int ioq_thread_create(struct ioq *ioq, struct ioq_thread *thread) {
+static int ioq_thread_create(struct ioq *ioq, size_t i) {
+ struct ioq_thread *thread = &ioq->threads[i];
thread->parent = ioq;
ioq_ring_init(ioq, thread);
@@ -908,6 +1113,11 @@ static int ioq_thread_create(struct ioq *ioq, struct ioq_thread *thread) {
return -1;
}
+ char name[16];
+ if (snprintf(name, sizeof(name), "ioq-%zu", i) >= 0) {
+ thread_setname(thread->id, name);
+ }
+
return 0;
}
@@ -926,7 +1136,7 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) {
ioq->depth = depth;
ARENA_INIT(&ioq->ents, struct ioq_ent);
-#if BFS_USE_LIBURING && BFS_USE_STATX
+#if BFS_WITH_LIBURING && BFS_USE_STATX
ARENA_INIT(&ioq->xbufs, struct statx);
#endif
@@ -942,7 +1152,7 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) {
ioq->nthreads = nthreads;
for (size_t i = 0; i < nthreads; ++i) {
- if (ioq_thread_create(ioq, &ioq->threads[i]) != 0) {
+ if (ioq_thread_create(ioq, i) != 0) {
ioq->nthreads = i;
goto fail;
}
@@ -984,6 +1194,18 @@ static struct ioq_ent *ioq_request(struct ioq *ioq, enum ioq_op op, void *ptr) {
return ent;
}
+int ioq_nop(struct ioq *ioq, enum ioq_nop_type type, void *ptr) {
+ struct ioq_ent *ent = ioq_request(ioq, IOQ_NOP, ptr);
+ if (!ent) {
+ return -1;
+ }
+
+ ent->nop.type = type;
+
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
+ return 0;
+}
+
int ioq_close(struct ioq *ioq, int fd, void *ptr) {
struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSE, ptr);
if (!ent) {
@@ -992,7 +1214,7 @@ int ioq_close(struct ioq *ioq, int fd, void *ptr) {
ent->close.fd = fd;
- ioqq_push(ioq->pending, ent);
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}
@@ -1008,7 +1230,7 @@ int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path,
args->path = path;
args->flags = flags;
- ioqq_push(ioq->pending, ent);
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}
@@ -1020,7 +1242,7 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) {
ent->closedir.dir = dir;
- ioqq_push(ioq->pending, ent);
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}
@@ -1036,7 +1258,7 @@ int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags fla
args->flags = flags;
args->buf = buf;
-#if BFS_USE_LIBURING && BFS_USE_STATX
+#if BFS_WITH_LIBURING && BFS_USE_STATX
args->xbuf = arena_alloc(&ioq->xbufs);
if (!args->xbuf) {
ioq_free(ioq, ent);
@@ -1044,23 +1266,30 @@ int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags fla
}
#endif
- ioqq_push(ioq->pending, ent);
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}
+void ioq_submit(struct ioq *ioq) {
+ ioq_batch_flush(ioq->pending, &ioq->pending_batch);
+}
+
struct ioq_ent *ioq_pop(struct ioq *ioq, bool block) {
+ // Don't forget to submit before popping
+ bfs_assert(ioq_batch_empty(&ioq->pending_batch));
+
if (ioq->size == 0) {
return NULL;
}
- return ioqq_pop(ioq->ready, block);
+ return ioq_batch_pop(ioq->ready, &ioq->ready_batch, block);
}
void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {
bfs_assert(ioq->size > 0);
--ioq->size;
-#if BFS_USE_LIBURING && BFS_USE_STATX
+#if BFS_WITH_LIBURING && BFS_USE_STATX
if (ent->op == IOQ_STAT && ent->stat.xbuf) {
arena_free(&ioq->xbufs, ent->stat.xbuf);
}
@@ -1071,7 +1300,8 @@ void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {
void ioq_cancel(struct ioq *ioq) {
if (!exchange(&ioq->cancel, true, relaxed)) {
- ioqq_push(ioq->pending, &IOQ_STOP);
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, &IOQ_STOP);
+ ioq_submit(ioq);
}
}
@@ -1091,7 +1321,7 @@ void ioq_destroy(struct ioq *ioq) {
ioqq_destroy(ioq->ready);
ioqq_destroy(ioq->pending);
-#if BFS_USE_LIBURING && BFS_USE_STATX
+#if BFS_WITH_LIBURING && BFS_USE_STATX
arena_destroy(&ioq->xbufs);
#endif
arena_destroy(&ioq->ents);