summaryrefslogtreecommitdiffstats
path: root/src/ioq.c
diff options
context:
space:
mode:
authorTavian Barnes <tavianator@tavianator.com>2023-06-15 14:22:00 -0400
committerTavian Barnes <tavianator@tavianator.com>2023-06-15 16:01:19 -0400
commit425956c9022fda1e98544c4b2d495e91dfde4b4f (patch)
treee105da07a88fa00e79f906e7abbdcd42757d1716 /src/ioq.c
parentb3aa51d83650bf9f2d264e53110ca248453bb2f0 (diff)
downloadbfs-425956c9022fda1e98544c4b2d495e91dfde4b4f.tar.xz
ioq: Implement a non-blocking fast path
Diffstat (limited to 'src/ioq.c')
-rw-r--r--src/ioq.c205
1 files changed, 107 insertions, 98 deletions
diff --git a/src/ioq.c b/src/ioq.c
index d3df7e0..5550c91 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -41,114 +41,38 @@ union ioq_cmd {
*/
struct ioq_monitor {
cache_align pthread_mutex_t mutex;
- pthread_cond_t full;
- pthread_cond_t empty;
+ pthread_cond_t cond;
};
/** Initialize an ioq_monitor. */
static int ioq_monitor_init(struct ioq_monitor *monitor) {
if (mutex_init(&monitor->mutex, NULL) != 0) {
- goto fail;
- }
-
- if (cond_init(&monitor->full, NULL) != 0) {
- goto fail_mutex;
+ return -1;
}
- if (cond_init(&monitor->empty, NULL) != 0) {
- goto fail_full;
+ if (cond_init(&monitor->cond, NULL) != 0) {
+ mutex_destroy(&monitor->mutex);
+ return -1;
}
return 0;
-
-fail_full:
- cond_destroy(&monitor->full);
-fail_mutex:
- mutex_destroy(&monitor->mutex);
-fail:
- return -1;
}
/** Destroy an ioq_monitor. */
static void ioq_monitor_destroy(struct ioq_monitor *monitor) {
- cond_destroy(&monitor->empty);
- cond_destroy(&monitor->full);
+ cond_destroy(&monitor->cond);
mutex_destroy(&monitor->mutex);
}
/**
- * A slot in an I/O queue.
- */
-struct ioq_slot {
- struct ioq_monitor *monitor;
- union ioq_cmd *cmd;
-};
-
-/** Initialize an ioq_slot. */
-static void ioq_slot_init(struct ioq_slot *slot, struct ioq_monitor *monitor) {
- slot->monitor = monitor;
- slot->cmd = NULL;
-}
-
-/** Push a command into a slot. */
-static void ioq_slot_push(struct ioq_slot *slot, union ioq_cmd *cmd) {
- struct ioq_monitor *monitor = slot->monitor;
-
- mutex_lock(&monitor->mutex);
- while (slot->cmd) {
- cond_wait(&monitor->empty, &monitor->mutex);
- }
- slot->cmd = cmd;
- mutex_unlock(&monitor->mutex);
-
- cond_broadcast(&monitor->full);
-}
-
-/** Pop a command from a slot. */
-static union ioq_cmd *ioq_slot_pop(struct ioq_slot *slot) {
- struct ioq_monitor *monitor = slot->monitor;
-
- mutex_lock(&monitor->mutex);
- while (!slot->cmd) {
- cond_wait(&monitor->full, &monitor->mutex);
- }
- union ioq_cmd *ret = slot->cmd;
- slot->cmd = NULL;
- mutex_unlock(&monitor->mutex);
-
- cond_broadcast(&monitor->empty);
-
- return ret;
-}
-
-/** Pop a command from a slot, if one exists. */
-static union ioq_cmd *ioq_slot_trypop(struct ioq_slot *slot) {
- struct ioq_monitor *monitor = slot->monitor;
-
- if (!mutex_trylock(&monitor->mutex)) {
- return NULL;
- }
-
- union ioq_cmd *ret = slot->cmd;
- slot->cmd = NULL;
-
- mutex_unlock(&monitor->mutex);
-
- if (ret) {
- cond_broadcast(&monitor->empty);
- }
- return ret;
-}
-
-/**
* An MPMC queue of I/O commands.
*/
struct ioqq {
/** Circular buffer index mask. */
- size_t mask;
+ size_t slot_mask;
- /** Number of monitors. */
- size_t nmonitors;
+ /** Monitor index mask. */
+ size_t monitor_mask;
/** Array of monitors used by the slots. */
struct ioq_monitor *monitors;
@@ -158,7 +82,7 @@ struct ioqq {
cache_align atomic size_t tail;
/** The circular buffer itself. */
- cache_align struct ioq_slot slots[];
+ cache_align atomic uintptr_t slots[];
};
// If we assign slots sequentially, threads will likely be operating on
@@ -169,12 +93,16 @@ struct ioqq {
// still use every available slot. Since the length is a power of two, that
// means the stride must be odd.
-#define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(struct ioq_slot)) | 1)
+#define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(atomic uintptr_t)) | 1)
bfs_static_assert(IOQ_STRIDE % 2 == 1);
+/** Slot flag bit to indicate waiters. */
+#define IOQ_BLOCKED ((uintptr_t)1)
+bfs_static_assert(alignof(union ioq_cmd) > 1);
+
/** Destroy an I/O command queue. */
static void ioqq_destroy(struct ioqq *ioqq) {
- for (size_t i = 0; i < ioqq->nmonitors; ++i) {
+ for (size_t i = 0; i < ioqq->monitor_mask + 1; ++i) {
ioq_monitor_destroy(&ioqq->monitors[i]);
}
free(ioqq->monitors);
@@ -191,9 +119,11 @@ static struct ioqq *ioqq_create(size_t size) {
return NULL;
}
+ ioqq->slot_mask = size - 1;
+ ioqq->monitor_mask = -1;
+
// Use a pool of monitors
size_t nmonitors = size < 64 ? size : 64;
- ioqq->nmonitors = 0;
ioqq->monitors = xmemalign(alignof(struct ioq_monitor), nmonitors * sizeof(struct ioq_monitor));
if (!ioqq->monitors) {
ioqq_destroy(ioqq);
@@ -205,38 +135,116 @@ static struct ioqq *ioqq_create(size_t size) {
ioqq_destroy(ioqq);
return NULL;
}
- ++ioqq->nmonitors;
+ ++ioqq->monitor_mask;
}
- ioqq->mask = size - 1;
-
atomic_init(&ioqq->head, 0);
atomic_init(&ioqq->tail, 0);
for (size_t i = 0; i < size; ++i) {
- ioq_slot_init(&ioqq->slots[i], &ioqq->monitors[i % nmonitors]);
+ atomic_init(&ioqq->slots[i], 0);
}
return ioqq;
}
+/** Atomically wait for a slot to change. */
+static uintptr_t ioqq_wait(struct ioqq *ioqq, size_t i, uintptr_t value) {
+ atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
+
+ struct ioq_monitor *monitor = &ioqq->monitors[i & ioqq->monitor_mask];
+ mutex_lock(&monitor->mutex);
+
+ uintptr_t ret;
+ while ((ret = load(slot, relaxed)) == value) {
+ // To avoid missed wakeups, it is important that
+ // cond_broadcast() is not called right here
+ cond_wait(&monitor->cond, &monitor->mutex);
+ }
+
+ mutex_unlock(&monitor->mutex);
+ return ret;
+}
+
+/** Wake up any threads waiting on a slot. */
+static void ioqq_wake(struct ioqq *ioqq, size_t i) {
+ struct ioq_monitor *monitor = &ioqq->monitors[i & ioqq->monitor_mask];
+
+ // The following implementation would clearly avoid the missed wakeup
+ // issue mentioned above in ioqq_wait():
+ //
+ // mutex_lock(&monitor->mutex);
+ // cond_broadcast(&monitor->cond);
+ // mutex_unlock(&monitor->mutex);
+ //
+ // As a minor optimization, we move the broadcast outside of the lock.
+ // This optimization is correct, even though it leads to a seemingly-
+ // useless empty critical section.
+
+ mutex_lock(&monitor->mutex);
+ mutex_unlock(&monitor->mutex);
+ cond_broadcast(&monitor->cond);
+}
+
/** Push a command onto the queue. */
static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) {
size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed);
- ioq_slot_push(&ioqq->slots[i & ioqq->mask], cmd);
+ atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
+
+ uintptr_t addr = (uintptr_t)cmd;
+ bfs_assert(!(addr & IOQ_BLOCKED));
+
+ uintptr_t prev = load(slot, relaxed);
+ do {
+ while (prev & ~IOQ_BLOCKED) {
+ prev = fetch_or(slot, IOQ_BLOCKED, relaxed);
+ if (prev & ~IOQ_BLOCKED) {
+ prev = ioqq_wait(ioqq, i, prev | IOQ_BLOCKED);
+ }
+ }
+ } while (!compare_exchange_weak(slot, &prev, addr, release, relaxed));
+
+ if (prev & IOQ_BLOCKED) {
+ ioqq_wake(ioqq, i);
+ }
}
/** Pop a command from a queue. */
static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) {
size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed);
- return ioq_slot_pop(&ioqq->slots[i & ioqq->mask]);
+ atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
+
+ uintptr_t prev = load(slot, relaxed);
+ do {
+ while (!(prev & ~IOQ_BLOCKED)) {
+ prev = fetch_or(slot, IOQ_BLOCKED, relaxed);
+ if (!(prev & ~IOQ_BLOCKED)) {
+ prev = ioqq_wait(ioqq, i, IOQ_BLOCKED);
+ }
+ }
+ } while (!compare_exchange_weak(slot, &prev, 0, acquire, relaxed));
+
+ if (prev & IOQ_BLOCKED) {
+ ioqq_wake(ioqq, i);
+ }
+ prev &= ~IOQ_BLOCKED;
+
+ return (union ioq_cmd *)prev;
}
/** Pop a command from a queue if one is available. */
static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) {
size_t i = load(&ioqq->tail, relaxed);
- union ioq_cmd *cmd = ioq_slot_trypop(&ioqq->slots[i & ioqq->mask]);
- if (cmd) {
+ atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
+
+ uintptr_t prev = exchange(slot, 0, acquire);
+
+ if (prev & IOQ_BLOCKED) {
+ ioqq_wake(ioqq, i);
+ }
+ prev &= ~IOQ_BLOCKED;
+
+ if (prev) {
#ifdef NDEBUG
store(&ioqq->tail, i + IOQ_STRIDE, relaxed);
#else
@@ -244,7 +252,8 @@ static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) {
bfs_assert(j == i, "ioqq_trypop() only supports a single consumer");
#endif
}
- return cmd;
+
+ return (union ioq_cmd *)prev;
}
/** Sentinel stop command. */