summaryrefslogtreecommitdiffstats
path: root/src/ioq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ioq.c')
-rw-r--r--src/ioq.c48
1 files changed, 24 insertions, 24 deletions
diff --git a/src/ioq.c b/src/ioq.c
index 43a1b35..6bb1ceb 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -135,7 +135,7 @@
#include <stdlib.h>
#include <sys/stat.h>
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
# include <liburing.h>
#endif
@@ -313,9 +313,11 @@ static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) {
cond_broadcast(&monitor->cond);
}
-/** Branch-free (slot & IOQ_SKIP) ? ~IOQ_BLOCKED : 0 */
-static uintptr_t ioq_skip_mask(uintptr_t slot) {
- return -(slot >> IOQ_SKIP_BIT) << 1;
+/** Branch-free ((slot & IOQ_SKIP) ? skip : full) & ~IOQ_BLOCKED */
+static uintptr_t ioq_slot_blend(uintptr_t slot, uintptr_t skip, uintptr_t full) {
+ uintptr_t mask = -(slot >> IOQ_SKIP_BIT);
+ uintptr_t ret = (skip & mask) | (full & ~mask);
+ return ret & ~IOQ_BLOCKED;
}
/** Push an entry into a slot. */
@@ -323,19 +325,18 @@ static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent
uintptr_t prev = load(slot, relaxed);
while (true) {
- size_t skip_mask = ioq_skip_mask(prev);
- size_t full_mask = ~skip_mask & ~IOQ_BLOCKED;
- if (prev & full_mask) {
+ uintptr_t full = ioq_slot_blend(prev, 0, prev);
+ if (full) {
// full(ptr) → wait
prev = ioq_slot_wait(ioqq, slot, prev);
continue;
}
// empty → full(ptr)
- uintptr_t next = ((uintptr_t)ent >> 1) & full_mask;
+ uintptr_t next = (uintptr_t)ent >> 1;
// skip(1) → empty
// skip(n) → skip(n - 1)
- next |= (prev - IOQ_SKIP_ONE) & skip_mask;
+ next = ioq_slot_blend(prev, prev - IOQ_SKIP_ONE, next);
if (compare_exchange_weak(slot, &prev, next, release, relaxed)) {
break;
@@ -357,9 +358,8 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc
// skip(n) → skip(n + 1)
// full(ptr) → full(ptr - 1)
uintptr_t next = prev + IOQ_SKIP_ONE;
- // skip(n) → ~IOQ_BLOCKED
// full(ptr) → 0
- next &= ioq_skip_mask(next);
+ next = ioq_slot_blend(next, next, 0);
if (block && next) {
prev = ioq_slot_wait(ioqq, slot, prev);
@@ -378,7 +378,7 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc
// empty → 0
// skip(n) → 0
// full(ptr) → ptr
- prev &= ioq_skip_mask(~prev);
+ prev = ioq_slot_blend(prev, 0, prev);
return (struct ioq_ent *)(prev << 1);
}
@@ -459,7 +459,7 @@ static void ioq_batch_push(struct ioqq *ioqq, struct ioq_batch *batch, struct io
/** Sentinel stop command. */
static struct ioq_ent IOQ_STOP;
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
/**
* Supported io_uring operations.
*/
@@ -477,7 +477,7 @@ struct ioq_thread {
/** Pointer back to the I/O queue. */
struct ioq *parent;
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
/** io_uring instance. */
struct io_uring ring;
/** Any error that occurred initializing the ring. */
@@ -497,7 +497,7 @@ struct ioq {
/** ioq_ent arena. */
struct arena ents;
-#if BFS_USE_LIBURING && BFS_USE_STATX
+#if BFS_WITH_LIBURING && BFS_USE_STATX
/** struct statx arena. */
struct arena xbufs;
#endif
@@ -559,7 +559,7 @@ static void ioq_dispatch_sync(struct ioq *ioq, struct ioq_ent *ent) {
ent->result = -ENOSYS;
}
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
/** io_uring worker state. */
struct ioq_ring_state {
@@ -775,7 +775,7 @@ static void ioq_ring_work(struct ioq_thread *thread) {
}
}
-#endif // BFS_USE_LIBURING
+#endif // BFS_WITH_LIBURING
/** Synchronous syscall loop. */
static void ioq_sync_work(struct ioq_thread *thread) {
@@ -811,7 +811,7 @@ static void ioq_sync_work(struct ioq_thread *thread) {
static void *ioq_work(void *ptr) {
struct ioq_thread *thread = ptr;
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
if (thread->ring_err == 0) {
ioq_ring_work(thread);
return NULL;
@@ -824,7 +824,7 @@ static void *ioq_work(void *ptr) {
/** Initialize io_uring thread state. */
static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) {
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
struct ioq_thread *prev = NULL;
if (thread > ioq->threads) {
prev = thread - 1;
@@ -890,7 +890,7 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) {
/** Destroy an io_uring. */
static void ioq_ring_exit(struct ioq_thread *thread) {
-#if BFS_USE_LIBURING
+#if BFS_WITH_LIBURING
if (thread->ring_err == 0) {
io_uring_queue_exit(&thread->ring);
}
@@ -926,7 +926,7 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) {
ioq->depth = depth;
ARENA_INIT(&ioq->ents, struct ioq_ent);
-#if BFS_USE_LIBURING && BFS_USE_STATX
+#if BFS_WITH_LIBURING && BFS_USE_STATX
ARENA_INIT(&ioq->xbufs, struct statx);
#endif
@@ -1036,7 +1036,7 @@ int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags fla
args->flags = flags;
args->buf = buf;
-#if BFS_USE_LIBURING && BFS_USE_STATX
+#if BFS_WITH_LIBURING && BFS_USE_STATX
args->xbuf = arena_alloc(&ioq->xbufs);
if (!args->xbuf) {
ioq_free(ioq, ent);
@@ -1060,7 +1060,7 @@ void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {
bfs_assert(ioq->size > 0);
--ioq->size;
-#if BFS_USE_LIBURING && BFS_USE_STATX
+#if BFS_WITH_LIBURING && BFS_USE_STATX
if (ent->op == IOQ_STAT && ent->stat.xbuf) {
arena_free(&ioq->xbufs, ent->stat.xbuf);
}
@@ -1091,7 +1091,7 @@ void ioq_destroy(struct ioq *ioq) {
ioqq_destroy(ioq->ready);
ioqq_destroy(ioq->pending);
-#if BFS_USE_LIBURING && BFS_USE_STATX
+#if BFS_WITH_LIBURING && BFS_USE_STATX
arena_destroy(&ioq->xbufs);
#endif
arena_destroy(&ioq->ents);