diff options
Diffstat (limited to 'src/ioq.c')
-rw-r--r-- | src/ioq.c | 124 |
1 files changed, 75 insertions, 49 deletions
@@ -44,6 +44,28 @@ static void ioq_monitor_destroy(struct ioq_monitor *monitor) { mutex_destroy(&monitor->mutex); } +/** A single entry in a command queue. */ +typedef atomic uintptr_t ioq_slot; + +/** Slot flag bit to indicate waiters. */ +#define IOQ_BLOCKED ((uintptr_t)1) +bfs_static_assert(alignof(struct ioq_ent) > 1); + +/** Check if a slot has waiters. */ +static bool ioq_slot_blocked(uintptr_t value) { + return value & IOQ_BLOCKED; +} + +/** Extract the pointer from a slot. */ +static struct ioq_ent *ioq_slot_ptr(uintptr_t value) { + return (struct ioq_ent *)(value & ~IOQ_BLOCKED); +} + +/** Check if a slot is empty. */ +static bool ioq_slot_empty(uintptr_t value) { + return !ioq_slot_ptr(value); +} + /** * An MPMC queue of I/O commands. */ @@ -62,7 +84,7 @@ struct ioqq { cache_align atomic size_t tail; /** The circular buffer itself. */ - cache_align atomic uintptr_t slots[]; + cache_align ioq_slot slots[]; }; // If we assign slots sequentially, threads will likely be operating on @@ -73,13 +95,9 @@ struct ioqq { // still use every available slot. Since the length is a power of two, that // means the stride must be odd. -#define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(atomic uintptr_t)) | 1) +#define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(ioq_slot)) | 1) bfs_static_assert(IOQ_STRIDE % 2 == 1); -/** Slot flag bit to indicate waiters. */ -#define IOQ_BLOCKED ((uintptr_t)1) -bfs_static_assert(alignof(struct ioq_ent) > 1); - /** Destroy an I/O command queue. */ static void ioqq_destroy(struct ioqq *ioqq) { for (size_t i = 0; i < ioqq->monitor_mask + 1; ++i) { @@ -129,9 +147,11 @@ static struct ioqq *ioqq_create(size_t size) { } /** Atomically wait for a slot to change. */ -static uintptr_t ioqq_wait(struct ioqq *ioqq, size_t i, uintptr_t value) { - atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; +static uintptr_t ioq_slot_wait(struct ioqq *ioqq, ioq_slot *slot, uintptr_t value) { + fetch_or(slot, IOQ_BLOCKED, relaxed); + value |= IOQ_BLOCKED; + size_t i = slot - ioqq->slots; struct ioq_monitor *monitor = &ioqq->monitors[i & ioqq->monitor_mask]; mutex_lock(&monitor->mutex); @@ -147,11 +167,12 @@ static uintptr_t ioqq_wait(struct ioqq *ioqq, size_t i, uintptr_t value) { } /** Wake up any threads waiting on a slot. */ -static void ioqq_wake(struct ioqq *ioqq, size_t i) { +static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) { + size_t i = slot - ioqq->slots; struct ioq_monitor *monitor = &ioqq->monitors[i & ioqq->monitor_mask]; // The following implementation would clearly avoid the missed wakeup - // issue mentioned above in ioqq_wait(): + // issue mentioned above in ioq_slot_wait(): // // mutex_lock(&monitor->mutex); // cond_broadcast(&monitor->cond); @@ -166,75 +187,80 @@ static void ioqq_wake(struct ioqq *ioqq, size_t i) { cond_broadcast(&monitor->cond); } -/** Push an entry onto the queue. */ -static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) { +/** Get the next slot for writing. */ +static ioq_slot *ioqq_write(struct ioqq *ioqq) { size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed); - atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; + return &ioqq->slots[i & ioqq->slot_mask]; +} +/** Push an entry into a slot. */ +static void ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent) { uintptr_t addr = (uintptr_t)ent; - bfs_assert(!(addr & IOQ_BLOCKED)); + bfs_assert(!ioq_slot_blocked(addr)); uintptr_t prev = load(slot, relaxed); do { - while (prev & ~IOQ_BLOCKED) { - prev = fetch_or(slot, IOQ_BLOCKED, relaxed); - if (prev & ~IOQ_BLOCKED) { - prev = ioqq_wait(ioqq, i, prev | IOQ_BLOCKED); - } + while (!ioq_slot_empty(prev)) { + prev = ioq_slot_wait(ioqq, slot, prev); } } while (!compare_exchange_weak(slot, &prev, addr, release, relaxed)); - if (prev & IOQ_BLOCKED) { - ioqq_wake(ioqq, i); + if (ioq_slot_blocked(prev)) { + ioq_slot_wake(ioqq, slot); } } -/** Pop an entry from the queue. */ -static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) { +/** Push an entry onto the queue. */ +static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) { + ioq_slot *slot = ioqq_write(ioqq); + ioq_slot_push(ioqq, slot, ent); +} + +/** Get the next slot for reading. */ +static ioq_slot *ioqq_read(struct ioqq *ioqq) { size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed); - atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; + return &ioqq->slots[i & ioqq->slot_mask]; +} +/** (Try to) pop an entry from a slot. */ +static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) { uintptr_t prev = load(slot, relaxed); do { - while (!(prev & ~IOQ_BLOCKED)) { - prev = fetch_or(slot, IOQ_BLOCKED, relaxed); - if (!(prev & ~IOQ_BLOCKED)) { - prev = ioqq_wait(ioqq, i, IOQ_BLOCKED); + while (ioq_slot_empty(prev)) { + if (block) { + prev = ioq_slot_wait(ioqq, slot, prev); + } else { + return NULL; } } } while (!compare_exchange_weak(slot, &prev, 0, acquire, relaxed)); - if (prev & IOQ_BLOCKED) { - ioqq_wake(ioqq, i); + if (ioq_slot_blocked(prev)) { + ioq_slot_wake(ioqq, slot); } - prev &= ~IOQ_BLOCKED; - return (struct ioq_ent *)prev; + return ioq_slot_ptr(prev); +} + +/** Pop an entry from the queue. */ +static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) { + ioq_slot *slot = ioqq_read(ioqq); + return ioq_slot_pop(ioqq, slot, true); } /** Pop an entry from the queue if one is available. */ static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) { size_t i = load(&ioqq->tail, relaxed); - atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; + ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask]; - uintptr_t prev = load(slot, relaxed); - if (!(prev & ~IOQ_BLOCKED)) { - return NULL; - } - if (!compare_exchange_weak(slot, &prev, 0, acquire, relaxed)) { - return NULL; + struct ioq_ent *ret = ioq_slot_pop(ioqq, slot, false); + if (ret) { + size_t j = exchange(&ioqq->tail, i + IOQ_STRIDE, relaxed); + bfs_assert(j == i, "Detected multiple consumers"); + (void)j; } - if (prev & IOQ_BLOCKED) { - ioqq_wake(ioqq, i); - } - prev &= ~IOQ_BLOCKED; - - size_t j = exchange(&ioqq->tail, i + IOQ_STRIDE, relaxed); - bfs_assert(j == i, "ioqq_trypop() only supports a single consumer"); - (void)j; - - return (struct ioq_ent *)prev; + return ret; } /** Sentinel stop command. */ |