summaryrefslogtreecommitdiffstats
path: root/src/ioq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ioq.c')
-rw-r--r--src/ioq.c36
1 files changed, 20 insertions, 16 deletions
diff --git a/src/ioq.c b/src/ioq.c
index 4057a5e..be5b758 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -118,16 +118,18 @@
* [1]: https://arxiv.org/abs/2201.02179
*/
-#include "prelude.h"
#include "ioq.h"
+
#include "alloc.h"
#include "atomic.h"
+#include "bfs.h"
#include "bfstd.h"
#include "bit.h"
#include "diag.h"
#include "dir.h"
#include "stat.h"
#include "thread.h"
+
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
@@ -180,8 +182,7 @@ typedef atomic uintptr_t ioq_slot;
/** Amount to add for an additional skip. */
#define IOQ_SKIP_ONE (~IOQ_BLOCKED)
-// Need room for two flag bits
-bfs_static_assert(alignof(struct ioq_ent) >= (1 << 2));
+static_assert(alignof(struct ioq_ent) >= (1 << 2), "struct ioq_ent is underaligned");
/**
* An MPMC queue of I/O commands.
@@ -263,7 +264,7 @@ static struct ioq_monitor *ioq_slot_monitor(struct ioqq *ioqq, ioq_slot *slot) {
}
/** Atomically wait for a slot to change. */
-attr(noinline)
+_noinline
static uintptr_t ioq_slot_wait(struct ioqq *ioqq, ioq_slot *slot, uintptr_t value) {
struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot);
mutex_lock(&monitor->mutex);
@@ -293,7 +294,7 @@ done:
}
/** Wake up any threads waiting on a slot. */
-attr(noinline)
+_noinline
static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) {
struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot);
@@ -313,9 +314,11 @@ static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) {
cond_broadcast(&monitor->cond);
}
-/** Branch-free (slot & IOQ_SKIP) ? ~IOQ_BLOCKED : 0 */
-static uintptr_t ioq_skip_mask(uintptr_t slot) {
- return -(slot >> IOQ_SKIP_BIT) << 1;
+/** Branch-free ((slot & IOQ_SKIP) ? skip : full) & ~IOQ_BLOCKED */
+static uintptr_t ioq_slot_blend(uintptr_t slot, uintptr_t skip, uintptr_t full) {
+ uintptr_t mask = -(slot >> IOQ_SKIP_BIT);
+ uintptr_t ret = (skip & mask) | (full & ~mask);
+ return ret & ~IOQ_BLOCKED;
}
/** Push an entry into a slot. */
@@ -323,19 +326,18 @@ static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent
uintptr_t prev = load(slot, relaxed);
while (true) {
- size_t skip_mask = ioq_skip_mask(prev);
- size_t full_mask = ~skip_mask & ~IOQ_BLOCKED;
- if (prev & full_mask) {
+ uintptr_t full = ioq_slot_blend(prev, 0, prev);
+ if (full) {
// full(ptr) → wait
prev = ioq_slot_wait(ioqq, slot, prev);
continue;
}
// empty → full(ptr)
- uintptr_t next = ((uintptr_t)ent >> 1) & full_mask;
+ uintptr_t next = (uintptr_t)ent >> 1;
// skip(1) → empty
// skip(n) → skip(n - 1)
- next |= (prev - IOQ_SKIP_ONE) & skip_mask;
+ next = ioq_slot_blend(prev, prev - IOQ_SKIP_ONE, next);
if (compare_exchange_weak(slot, &prev, next, release, relaxed)) {
break;
@@ -357,9 +359,8 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc
// skip(n) → skip(n + 1)
// full(ptr) → full(ptr - 1)
uintptr_t next = prev + IOQ_SKIP_ONE;
- // skip(n) → ~IOQ_BLOCKED
// full(ptr) → 0
- next &= ioq_skip_mask(next);
+ next = ioq_slot_blend(next, next, 0);
if (block && next) {
prev = ioq_slot_wait(ioqq, slot, prev);
@@ -378,7 +379,7 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc
// empty → 0
// skip(n) → 0
// full(ptr) → ptr
- prev &= ioq_skip_mask(~prev);
+ prev = ioq_slot_blend(prev, 0, prev);
return (struct ioq_ent *)(prev << 1);
}
@@ -877,6 +878,7 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) {
return -1;
}
+#if BFS_HAS_IO_URING_MAX_WORKERS
// Limit the number of io_uring workers
unsigned int values[] = {
ioq->nthreads, // [IO_WQ_BOUND]
@@ -885,6 +887,8 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) {
io_uring_register_iowq_max_workers(&thread->ring, values);
#endif
+#endif // BFS_WITH_LIBURING
+
return 0;
}