summaryrefslogtreecommitdiffstats
path: root/src/ioq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ioq.c')
-rw-r--r--src/ioq.c124
1 files changed, 75 insertions, 49 deletions
diff --git a/src/ioq.c b/src/ioq.c
index 0544044..c66ebda 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -44,6 +44,28 @@ static void ioq_monitor_destroy(struct ioq_monitor *monitor) {
mutex_destroy(&monitor->mutex);
}
+/** A single entry in a command queue. */
+typedef atomic uintptr_t ioq_slot;
+
+/** Slot flag bit to indicate waiters. */
+#define IOQ_BLOCKED ((uintptr_t)1)
+bfs_static_assert(alignof(struct ioq_ent) > 1);
+
+/** Check if a slot has waiters. */
+static bool ioq_slot_blocked(uintptr_t value) {
+ return value & IOQ_BLOCKED;
+}
+
+/** Extract the pointer from a slot. */
+static struct ioq_ent *ioq_slot_ptr(uintptr_t value) {
+ return (struct ioq_ent *)(value & ~IOQ_BLOCKED);
+}
+
+/** Check if a slot is empty. */
+static bool ioq_slot_empty(uintptr_t value) {
+ return !ioq_slot_ptr(value);
+}
+
/**
* An MPMC queue of I/O commands.
*/
@@ -62,7 +84,7 @@ struct ioqq {
cache_align atomic size_t tail;
/** The circular buffer itself. */
- cache_align atomic uintptr_t slots[];
+ cache_align ioq_slot slots[];
};
// If we assign slots sequentially, threads will likely be operating on
@@ -73,13 +95,9 @@ struct ioqq {
// still use every available slot. Since the length is a power of two, that
// means the stride must be odd.
-#define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(atomic uintptr_t)) | 1)
+#define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(ioq_slot)) | 1)
bfs_static_assert(IOQ_STRIDE % 2 == 1);
-/** Slot flag bit to indicate waiters. */
-#define IOQ_BLOCKED ((uintptr_t)1)
-bfs_static_assert(alignof(struct ioq_ent) > 1);
-
/** Destroy an I/O command queue. */
static void ioqq_destroy(struct ioqq *ioqq) {
for (size_t i = 0; i < ioqq->monitor_mask + 1; ++i) {
@@ -129,9 +147,11 @@ static struct ioqq *ioqq_create(size_t size) {
}
/** Atomically wait for a slot to change. */
-static uintptr_t ioqq_wait(struct ioqq *ioqq, size_t i, uintptr_t value) {
- atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
+static uintptr_t ioq_slot_wait(struct ioqq *ioqq, ioq_slot *slot, uintptr_t value) {
+ fetch_or(slot, IOQ_BLOCKED, relaxed);
+ value |= IOQ_BLOCKED;
+ size_t i = slot - ioqq->slots;
struct ioq_monitor *monitor = &ioqq->monitors[i & ioqq->monitor_mask];
mutex_lock(&monitor->mutex);
@@ -147,11 +167,12 @@ static uintptr_t ioqq_wait(struct ioqq *ioqq, size_t i, uintptr_t value) {
}
/** Wake up any threads waiting on a slot. */
-static void ioqq_wake(struct ioqq *ioqq, size_t i) {
+static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) {
+ size_t i = slot - ioqq->slots;
struct ioq_monitor *monitor = &ioqq->monitors[i & ioqq->monitor_mask];
// The following implementation would clearly avoid the missed wakeup
- // issue mentioned above in ioqq_wait():
+ // issue mentioned above in ioq_slot_wait():
//
// mutex_lock(&monitor->mutex);
// cond_broadcast(&monitor->cond);
@@ -166,75 +187,80 @@ static void ioqq_wake(struct ioqq *ioqq, size_t i) {
cond_broadcast(&monitor->cond);
}
-/** Push an entry onto the queue. */
-static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) {
+/** Get the next slot for writing. */
+static ioq_slot *ioqq_write(struct ioqq *ioqq) {
size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed);
- atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
+ return &ioqq->slots[i & ioqq->slot_mask];
+}
+/** Push an entry into a slot. */
+static void ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent) {
uintptr_t addr = (uintptr_t)ent;
- bfs_assert(!(addr & IOQ_BLOCKED));
+ bfs_assert(!ioq_slot_blocked(addr));
uintptr_t prev = load(slot, relaxed);
do {
- while (prev & ~IOQ_BLOCKED) {
- prev = fetch_or(slot, IOQ_BLOCKED, relaxed);
- if (prev & ~IOQ_BLOCKED) {
- prev = ioqq_wait(ioqq, i, prev | IOQ_BLOCKED);
- }
+ while (!ioq_slot_empty(prev)) {
+ prev = ioq_slot_wait(ioqq, slot, prev);
}
} while (!compare_exchange_weak(slot, &prev, addr, release, relaxed));
- if (prev & IOQ_BLOCKED) {
- ioqq_wake(ioqq, i);
+ if (ioq_slot_blocked(prev)) {
+ ioq_slot_wake(ioqq, slot);
}
}
-/** Pop an entry from the queue. */
-static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) {
+/** Push an entry onto the queue. */
+static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) {
+ ioq_slot *slot = ioqq_write(ioqq);
+ ioq_slot_push(ioqq, slot, ent);
+}
+
+/** Get the next slot for reading. */
+static ioq_slot *ioqq_read(struct ioqq *ioqq) {
size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed);
- atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
+ return &ioqq->slots[i & ioqq->slot_mask];
+}
+/** (Try to) pop an entry from a slot. */
+static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) {
uintptr_t prev = load(slot, relaxed);
do {
- while (!(prev & ~IOQ_BLOCKED)) {
- prev = fetch_or(slot, IOQ_BLOCKED, relaxed);
- if (!(prev & ~IOQ_BLOCKED)) {
- prev = ioqq_wait(ioqq, i, IOQ_BLOCKED);
+ while (ioq_slot_empty(prev)) {
+ if (block) {
+ prev = ioq_slot_wait(ioqq, slot, prev);
+ } else {
+ return NULL;
}
}
} while (!compare_exchange_weak(slot, &prev, 0, acquire, relaxed));
- if (prev & IOQ_BLOCKED) {
- ioqq_wake(ioqq, i);
+ if (ioq_slot_blocked(prev)) {
+ ioq_slot_wake(ioqq, slot);
}
- prev &= ~IOQ_BLOCKED;
- return (struct ioq_ent *)prev;
+ return ioq_slot_ptr(prev);
+}
+
+/** Pop an entry from the queue. */
+static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) {
+ ioq_slot *slot = ioqq_read(ioqq);
+ return ioq_slot_pop(ioqq, slot, true);
}
/** Pop an entry from the queue if one is available. */
static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) {
size_t i = load(&ioqq->tail, relaxed);
- atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
+ ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask];
- uintptr_t prev = load(slot, relaxed);
- if (!(prev & ~IOQ_BLOCKED)) {
- return NULL;
- }
- if (!compare_exchange_weak(slot, &prev, 0, acquire, relaxed)) {
- return NULL;
+ struct ioq_ent *ret = ioq_slot_pop(ioqq, slot, false);
+ if (ret) {
+ size_t j = exchange(&ioqq->tail, i + IOQ_STRIDE, relaxed);
+ bfs_assert(j == i, "Detected multiple consumers");
+ (void)j;
}
- if (prev & IOQ_BLOCKED) {
- ioqq_wake(ioqq, i);
- }
- prev &= ~IOQ_BLOCKED;
-
- size_t j = exchange(&ioqq->tail, i + IOQ_STRIDE, relaxed);
- bfs_assert(j == i, "ioqq_trypop() only supports a single consumer");
- (void)j;
-
- return (struct ioq_ent *)prev;
+ return ret;
}
/** Sentinel stop command. */