diff options
Diffstat (limited to 'src/ioq.c')
-rw-r--r-- | src/ioq.c | 59 |
1 files changed, 30 insertions, 29 deletions
@@ -118,16 +118,18 @@ * [1]: https://arxiv.org/abs/2201.02179 */ -#include "prelude.h" #include "ioq.h" + #include "alloc.h" #include "atomic.h" +#include "bfs.h" #include "bfstd.h" #include "bit.h" #include "diag.h" #include "dir.h" #include "stat.h" #include "thread.h" + #include <errno.h> #include <fcntl.h> #include <pthread.h> @@ -135,7 +137,7 @@ #include <stdlib.h> #include <sys/stat.h> -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING # include <liburing.h> #endif @@ -180,8 +182,7 @@ typedef atomic uintptr_t ioq_slot; /** Amount to add for an additional skip. */ #define IOQ_SKIP_ONE (~IOQ_BLOCKED) -// Need room for two flag bits -bfs_static_assert(alignof(struct ioq_ent) >= (1 << 2)); +static_assert(alignof(struct ioq_ent) >= (1 << 2), "struct ioq_ent is underaligned"); /** * An MPMC queue of I/O commands. @@ -263,7 +264,7 @@ static struct ioq_monitor *ioq_slot_monitor(struct ioqq *ioqq, ioq_slot *slot) { } /** Atomically wait for a slot to change. */ -attr(noinline) +_noinline static uintptr_t ioq_slot_wait(struct ioqq *ioqq, ioq_slot *slot, uintptr_t value) { struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot); mutex_lock(&monitor->mutex); @@ -293,7 +294,7 @@ done: } /** Wake up any threads waiting on a slot. */ -attr(noinline) +_noinline static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) { struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot); @@ -313,9 +314,11 @@ static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) { cond_broadcast(&monitor->cond); } -/** Branch-free (slot & IOQ_SKIP) ? ~IOQ_BLOCKED : 0 */ -static uintptr_t ioq_skip_mask(uintptr_t slot) { - return -(slot >> IOQ_SKIP_BIT) << 1; +/** Branch-free ((slot & IOQ_SKIP) ? skip : full) & ~IOQ_BLOCKED */ +static uintptr_t ioq_slot_blend(uintptr_t slot, uintptr_t skip, uintptr_t full) { + uintptr_t mask = -(slot >> IOQ_SKIP_BIT); + uintptr_t ret = (skip & mask) | (full & ~mask); + return ret & ~IOQ_BLOCKED; } /** Push an entry into a slot. */ @@ -323,19 +326,18 @@ static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent uintptr_t prev = load(slot, relaxed); while (true) { - size_t skip_mask = ioq_skip_mask(prev); - size_t full_mask = ~skip_mask & ~IOQ_BLOCKED; - if (prev & full_mask) { + uintptr_t full = ioq_slot_blend(prev, 0, prev); + if (full) { // full(ptr) → wait prev = ioq_slot_wait(ioqq, slot, prev); continue; } // empty → full(ptr) - uintptr_t next = ((uintptr_t)ent >> 1) & full_mask; + uintptr_t next = (uintptr_t)ent >> 1; // skip(1) → empty // skip(n) → skip(n - 1) - next |= (prev - IOQ_SKIP_ONE) & skip_mask; + next = ioq_slot_blend(prev, prev - IOQ_SKIP_ONE, next); if (compare_exchange_weak(slot, &prev, next, release, relaxed)) { break; @@ -357,9 +359,8 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc // skip(n) → skip(n + 1) // full(ptr) → full(ptr - 1) uintptr_t next = prev + IOQ_SKIP_ONE; - // skip(n) → ~IOQ_BLOCKED // full(ptr) → 0 - next &= ioq_skip_mask(next); + next = ioq_slot_blend(next, next, 0); if (block && next) { prev = ioq_slot_wait(ioqq, slot, prev); @@ -378,7 +379,7 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc // empty → 0 // skip(n) → 0 // full(ptr) → ptr - prev &= ioq_skip_mask(~prev); + prev = ioq_slot_blend(prev, 0, prev); return (struct ioq_ent *)(prev << 1); } @@ -459,7 +460,7 @@ static void ioq_batch_push(struct ioqq *ioqq, struct ioq_batch *batch, struct io /** Sentinel stop command. */ static struct ioq_ent IOQ_STOP; -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING /** * Supported io_uring operations. */ @@ -477,7 +478,7 @@ struct ioq_thread { /** Pointer back to the I/O queue. */ struct ioq *parent; -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING /** io_uring instance. */ struct io_uring ring; /** Any error that occurred initializing the ring. */ @@ -497,7 +498,7 @@ struct ioq { /** ioq_ent arena. */ struct arena ents; -#if BFS_USE_LIBURING && BFS_USE_STATX +#if BFS_WITH_LIBURING && BFS_USE_STATX /** struct statx arena. */ struct arena xbufs; #endif @@ -559,7 +560,7 @@ static void ioq_dispatch_sync(struct ioq *ioq, struct ioq_ent *ent) { ent->result = -ENOSYS; } -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING /** io_uring worker state. */ struct ioq_ring_state { @@ -775,7 +776,7 @@ static void ioq_ring_work(struct ioq_thread *thread) { } } -#endif // BFS_USE_LIBURING +#endif // BFS_WITH_LIBURING /** Synchronous syscall loop. */ static void ioq_sync_work(struct ioq_thread *thread) { @@ -811,7 +812,7 @@ static void ioq_sync_work(struct ioq_thread *thread) { static void *ioq_work(void *ptr) { struct ioq_thread *thread = ptr; -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING if (thread->ring_err == 0) { ioq_ring_work(thread); return NULL; @@ -824,7 +825,7 @@ static void *ioq_work(void *ptr) { /** Initialize io_uring thread state. */ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) { -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING struct ioq_thread *prev = NULL; if (thread > ioq->threads) { prev = thread - 1; @@ -890,7 +891,7 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) { /** Destroy an io_uring. */ static void ioq_ring_exit(struct ioq_thread *thread) { -#if BFS_USE_LIBURING +#if BFS_WITH_LIBURING if (thread->ring_err == 0) { io_uring_queue_exit(&thread->ring); } @@ -926,7 +927,7 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) { ioq->depth = depth; ARENA_INIT(&ioq->ents, struct ioq_ent); -#if BFS_USE_LIBURING && BFS_USE_STATX +#if BFS_WITH_LIBURING && BFS_USE_STATX ARENA_INIT(&ioq->xbufs, struct statx); #endif @@ -1036,7 +1037,7 @@ int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags fla args->flags = flags; args->buf = buf; -#if BFS_USE_LIBURING && BFS_USE_STATX +#if BFS_WITH_LIBURING && BFS_USE_STATX args->xbuf = arena_alloc(&ioq->xbufs); if (!args->xbuf) { ioq_free(ioq, ent); @@ -1060,7 +1061,7 @@ void ioq_free(struct ioq *ioq, struct ioq_ent *ent) { bfs_assert(ioq->size > 0); --ioq->size; -#if BFS_USE_LIBURING && BFS_USE_STATX +#if BFS_WITH_LIBURING && BFS_USE_STATX if (ent->op == IOQ_STAT && ent->stat.xbuf) { arena_free(&ioq->xbufs, ent->stat.xbuf); } @@ -1091,7 +1092,7 @@ void ioq_destroy(struct ioq *ioq) { ioqq_destroy(ioq->ready); ioqq_destroy(ioq->pending); -#if BFS_USE_LIBURING && BFS_USE_STATX +#if BFS_WITH_LIBURING && BFS_USE_STATX arena_destroy(&ioq->xbufs); #endif arena_destroy(&ioq->ents); |