diff options
Diffstat (limited to 'src/ioq.c')
-rw-r--r-- | src/ioq.c | 36 |
1 files changed, 20 insertions, 16 deletions
@@ -118,16 +118,18 @@ * [1]: https://arxiv.org/abs/2201.02179 */ -#include "prelude.h" #include "ioq.h" + #include "alloc.h" #include "atomic.h" +#include "bfs.h" #include "bfstd.h" #include "bit.h" #include "diag.h" #include "dir.h" #include "stat.h" #include "thread.h" + #include <errno.h> #include <fcntl.h> #include <pthread.h> @@ -180,8 +182,7 @@ typedef atomic uintptr_t ioq_slot; /** Amount to add for an additional skip. */ #define IOQ_SKIP_ONE (~IOQ_BLOCKED) -// Need room for two flag bits -bfs_static_assert(alignof(struct ioq_ent) >= (1 << 2)); +static_assert(alignof(struct ioq_ent) >= (1 << 2), "struct ioq_ent is underaligned"); /** * An MPMC queue of I/O commands. @@ -263,7 +264,7 @@ static struct ioq_monitor *ioq_slot_monitor(struct ioqq *ioqq, ioq_slot *slot) { } /** Atomically wait for a slot to change. */ -attr(noinline) +_noinline static uintptr_t ioq_slot_wait(struct ioqq *ioqq, ioq_slot *slot, uintptr_t value) { struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot); mutex_lock(&monitor->mutex); @@ -293,7 +294,7 @@ done: } /** Wake up any threads waiting on a slot. */ -attr(noinline) +_noinline static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) { struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot); @@ -313,9 +314,11 @@ static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) { cond_broadcast(&monitor->cond); } -/** Branch-free (slot & IOQ_SKIP) ? ~IOQ_BLOCKED : 0 */ -static uintptr_t ioq_skip_mask(uintptr_t slot) { - return -(slot >> IOQ_SKIP_BIT) << 1; +/** Branch-free ((slot & IOQ_SKIP) ? skip : full) & ~IOQ_BLOCKED */ +static uintptr_t ioq_slot_blend(uintptr_t slot, uintptr_t skip, uintptr_t full) { + uintptr_t mask = -(slot >> IOQ_SKIP_BIT); + uintptr_t ret = (skip & mask) | (full & ~mask); + return ret & ~IOQ_BLOCKED; } /** Push an entry into a slot. */ @@ -323,19 +326,18 @@ static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent uintptr_t prev = load(slot, relaxed); while (true) { - size_t skip_mask = ioq_skip_mask(prev); - size_t full_mask = ~skip_mask & ~IOQ_BLOCKED; - if (prev & full_mask) { + uintptr_t full = ioq_slot_blend(prev, 0, prev); + if (full) { // full(ptr) → wait prev = ioq_slot_wait(ioqq, slot, prev); continue; } // empty → full(ptr) - uintptr_t next = ((uintptr_t)ent >> 1) & full_mask; + uintptr_t next = (uintptr_t)ent >> 1; // skip(1) → empty // skip(n) → skip(n - 1) - next |= (prev - IOQ_SKIP_ONE) & skip_mask; + next = ioq_slot_blend(prev, prev - IOQ_SKIP_ONE, next); if (compare_exchange_weak(slot, &prev, next, release, relaxed)) { break; @@ -357,9 +359,8 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc // skip(n) → skip(n + 1) // full(ptr) → full(ptr - 1) uintptr_t next = prev + IOQ_SKIP_ONE; - // skip(n) → ~IOQ_BLOCKED // full(ptr) → 0 - next &= ioq_skip_mask(next); + next = ioq_slot_blend(next, next, 0); if (block && next) { prev = ioq_slot_wait(ioqq, slot, prev); @@ -378,7 +379,7 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc // empty → 0 // skip(n) → 0 // full(ptr) → ptr - prev &= ioq_skip_mask(~prev); + prev = ioq_slot_blend(prev, 0, prev); return (struct ioq_ent *)(prev << 1); } @@ -877,6 +878,7 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) { return -1; } +#if BFS_HAS_IO_URING_MAX_WORKERS // Limit the number of io_uring workers unsigned int values[] = { ioq->nthreads, // [IO_WQ_BOUND] @@ -885,6 +887,8 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) { io_uring_register_iowq_max_workers(&thread->ring, values); #endif +#endif // BFS_WITH_LIBURING + return 0; } |