summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/ioq.c279
1 files changed, 201 insertions, 78 deletions
diff --git a/src/ioq.c b/src/ioq.c
index 74d2e09..d3df7e0 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -2,14 +2,17 @@
// SPDX-License-Identifier: 0BSD
#include "ioq.h"
+#include "atomic.h"
+#include "bfstd.h"
+#include "bit.h"
+#include "config.h"
+#include "diag.h"
#include "dir.h"
-#include "list.h"
#include "lock.h"
#include "sanity.h"
#include <assert.h>
#include <errno.h>
#include <pthread.h>
-#include <stdbool.h>
#include <stdlib.h>
/**
@@ -28,102 +31,225 @@ struct ioq_req {
/**
* An I/O queue command.
*/
-struct ioq_cmd {
- union {
- struct ioq_req req;
- struct ioq_res res;
- };
-
- struct ioq_cmd *next;
+union ioq_cmd {
+ struct ioq_req req;
+ struct ioq_res res;
};
/**
- * An MPMC queue of I/O commands.
+ * A monitor for an I/O queue slot.
*/
-struct ioqq {
- pthread_mutex_t mutex;
- pthread_cond_t cond;
-
- bool stop;
-
- struct ioq_cmd *head;
- struct ioq_cmd **tail;
+struct ioq_monitor {
+ cache_align pthread_mutex_t mutex;
+ pthread_cond_t full;
+ pthread_cond_t empty;
};
-static struct ioqq *ioqq_create(void) {
- struct ioqq *ioqq = malloc(sizeof(*ioqq));
- if (!ioqq) {
+/** Initialize an ioq_monitor. */
+static int ioq_monitor_init(struct ioq_monitor *monitor) {
+ if (mutex_init(&monitor->mutex, NULL) != 0) {
goto fail;
}
- if (mutex_init(&ioqq->mutex, NULL) != 0) {
- goto fail_free;
+ if (cond_init(&monitor->full, NULL) != 0) {
+ goto fail_mutex;
}
- if (cond_init(&ioqq->cond, NULL) != 0) {
- goto fail_mutex;
+ if (cond_init(&monitor->empty, NULL) != 0) {
+ goto fail_full;
}
- ioqq->stop = false;
- SLIST_INIT(ioqq);
- return ioqq;
+ return 0;
+fail_full:
+ cond_destroy(&monitor->full);
fail_mutex:
- mutex_destroy(&ioqq->mutex);
-fail_free:
- free(ioqq);
+ mutex_destroy(&monitor->mutex);
fail:
- return NULL;
+ return -1;
}
-/** Push a command onto the queue. */
-static void ioqq_push(struct ioqq *ioqq, struct ioq_cmd *cmd) {
- mutex_lock(&ioqq->mutex);
- SLIST_APPEND(ioqq, cmd);
- mutex_unlock(&ioqq->mutex);
- cond_signal(&ioqq->cond);
+/** Destroy an ioq_monitor. */
+static void ioq_monitor_destroy(struct ioq_monitor *monitor) {
+ cond_destroy(&monitor->empty);
+ cond_destroy(&monitor->full);
+ mutex_destroy(&monitor->mutex);
}
-/** Pop a command from a queue. */
-static struct ioq_cmd *ioqq_pop(struct ioqq *ioqq) {
- mutex_lock(&ioqq->mutex);
+/**
+ * A slot in an I/O queue.
+ */
+struct ioq_slot {
+ struct ioq_monitor *monitor;
+ union ioq_cmd *cmd;
+};
+
+/** Initialize an ioq_slot. */
+static void ioq_slot_init(struct ioq_slot *slot, struct ioq_monitor *monitor) {
+ slot->monitor = monitor;
+ slot->cmd = NULL;
+}
- while (!ioqq->stop && !ioqq->head) {
- cond_wait(&ioqq->cond, &ioqq->mutex);
+/** Push a command into a slot. */
+static void ioq_slot_push(struct ioq_slot *slot, union ioq_cmd *cmd) {
+ struct ioq_monitor *monitor = slot->monitor;
+
+ mutex_lock(&monitor->mutex);
+ while (slot->cmd) {
+ cond_wait(&monitor->empty, &monitor->mutex);
}
+ slot->cmd = cmd;
+ mutex_unlock(&monitor->mutex);
- struct ioq_cmd *cmd = SLIST_POP(ioqq);
- mutex_unlock(&ioqq->mutex);
- return cmd;
+ cond_broadcast(&monitor->full);
}
-/** Pop a command from a queue without blocking. */
-static struct ioq_cmd *ioqq_trypop(struct ioqq *ioqq) {
- if (!mutex_trylock(&ioqq->mutex)) {
- return NULL;
+/** Pop a command from a slot. */
+static union ioq_cmd *ioq_slot_pop(struct ioq_slot *slot) {
+ struct ioq_monitor *monitor = slot->monitor;
+
+ mutex_lock(&monitor->mutex);
+ while (!slot->cmd) {
+ cond_wait(&monitor->full, &monitor->mutex);
}
+ union ioq_cmd *ret = slot->cmd;
+ slot->cmd = NULL;
+ mutex_unlock(&monitor->mutex);
- struct ioq_cmd *cmd = SLIST_POP(ioqq);
- mutex_unlock(&ioqq->mutex);
- return cmd;
+ cond_broadcast(&monitor->empty);
+
+ return ret;
}
-/** Stop a queue, waking up any waiters. */
-static void ioqq_stop(struct ioqq *ioqq) {
- mutex_lock(&ioqq->mutex);
- ioqq->stop = true;
- mutex_unlock(&ioqq->mutex);
- cond_broadcast(&ioqq->cond);
+/** Pop a command from a slot, if one exists. */
+static union ioq_cmd *ioq_slot_trypop(struct ioq_slot *slot) {
+ struct ioq_monitor *monitor = slot->monitor;
+
+ if (!mutex_trylock(&monitor->mutex)) {
+ return NULL;
+ }
+
+ union ioq_cmd *ret = slot->cmd;
+ slot->cmd = NULL;
+
+ mutex_unlock(&monitor->mutex);
+
+ if (ret) {
+ cond_broadcast(&monitor->empty);
+ }
+ return ret;
}
+/**
+ * An MPMC queue of I/O commands.
+ */
+struct ioqq {
+ /** Circular buffer index mask. */
+ size_t mask;
+
+ /** Number of monitors. */
+ size_t nmonitors;
+ /** Array of monitors used by the slots. */
+ struct ioq_monitor *monitors;
+
+ /** Index of next writer. */
+ cache_align atomic size_t head;
+ /** Index of next reader. */
+ cache_align atomic size_t tail;
+
+ /** The circular buffer itself. */
+ cache_align struct ioq_slot slots[];
+};
+
+// If we assign slots sequentially, threads will likely be operating on
+// consecutive slots. If these slots are in the same cache line, that will
+// result in false sharing. We can mitigate this by assigning slots with a
+// stride larger than a cache line e.g. 0, 9, 18, ..., 1, 10, 19, ...
+// As long as the stride is relatively prime to circular buffer length, we'll
+// still use every available slot. Since the length is a power of two, that
+// means the stride must be odd.
+
+#define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(struct ioq_slot)) | 1)
+bfs_static_assert(IOQ_STRIDE % 2 == 1);
+
+/** Destroy an I/O command queue. */
static void ioqq_destroy(struct ioqq *ioqq) {
- if (ioqq) {
- cond_destroy(&ioqq->cond);
- mutex_destroy(&ioqq->mutex);
- free(ioqq);
+ for (size_t i = 0; i < ioqq->nmonitors; ++i) {
+ ioq_monitor_destroy(&ioqq->monitors[i]);
+ }
+ free(ioqq->monitors);
+ free(ioqq);
+}
+
+/** Create an I/O command queue. */
+static struct ioqq *ioqq_create(size_t size) {
+ // Circular buffer size must be a power of two
+ size = bit_ceil(size);
+
+ struct ioqq *ioqq = xmemalign(alignof(struct ioqq), flex_sizeof(struct ioqq, slots, size));
+ if (!ioqq) {
+ return NULL;
+ }
+
+ // Use a pool of monitors
+ size_t nmonitors = size < 64 ? size : 64;
+ ioqq->nmonitors = 0;
+ ioqq->monitors = xmemalign(alignof(struct ioq_monitor), nmonitors * sizeof(struct ioq_monitor));
+ if (!ioqq->monitors) {
+ ioqq_destroy(ioqq);
+ return NULL;
+ }
+
+ for (size_t i = 0; i < nmonitors; ++i) {
+ if (ioq_monitor_init(&ioqq->monitors[i]) != 0) {
+ ioqq_destroy(ioqq);
+ return NULL;
+ }
+ ++ioqq->nmonitors;
+ }
+
+ ioqq->mask = size - 1;
+
+ atomic_init(&ioqq->head, 0);
+ atomic_init(&ioqq->tail, 0);
+
+ for (size_t i = 0; i < size; ++i) {
+ ioq_slot_init(&ioqq->slots[i], &ioqq->monitors[i % nmonitors]);
+ }
+
+ return ioqq;
+}
+
+/** Push a command onto the queue. */
+static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) {
+ size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed);
+ ioq_slot_push(&ioqq->slots[i & ioqq->mask], cmd);
+}
+
+/** Pop a command from a queue. */
+static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) {
+ size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed);
+ return ioq_slot_pop(&ioqq->slots[i & ioqq->mask]);
+}
+
+/** Pop a command from a queue if one is available. */
+static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) {
+ size_t i = load(&ioqq->tail, relaxed);
+ union ioq_cmd *cmd = ioq_slot_trypop(&ioqq->slots[i & ioqq->mask]);
+ if (cmd) {
+#ifdef NDEBUG
+ store(&ioqq->tail, i + IOQ_STRIDE, relaxed);
+#else
+ size_t j = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed);
+ bfs_assert(j == i, "ioqq_trypop() only supports a single consumer");
+#endif
}
+ return cmd;
}
+/** Sentinel stop command. */
+static union ioq_cmd IOQ_STOP;
+
struct ioq {
/** The depth of the queue. */
size_t depth;
@@ -146,8 +272,8 @@ static void *ioq_work(void *ptr) {
struct ioq *ioq = ptr;
while (true) {
- struct ioq_cmd *cmd = ioqq_pop(ioq->pending);
- if (!cmd) {
+ union ioq_cmd *cmd = ioqq_pop(ioq->pending);
+ if (cmd == &IOQ_STOP) {
break;
}
@@ -176,16 +302,17 @@ struct ioq *ioq_create(size_t depth, size_t threads) {
ioq->depth = depth;
ioq->size = 0;
+
ioq->pending = NULL;
ioq->ready = NULL;
ioq->nthreads = 0;
- ioq->pending = ioqq_create();
+ ioq->pending = ioqq_create(depth);
if (!ioq->pending) {
goto fail;
}
- ioq->ready = ioqq_create();
+ ioq->ready = ioqq_create(depth);
if (!ioq->ready) {
goto fail;
}
@@ -218,7 +345,7 @@ int ioq_opendir(struct ioq *ioq, int dfd, const char *path, void *ptr) {
return -1;
}
- struct ioq_cmd *cmd = malloc(sizeof(*cmd));
+ union ioq_cmd *cmd = malloc(sizeof(*cmd));
if (!cmd) {
return -1;
}
@@ -228,8 +355,8 @@ int ioq_opendir(struct ioq *ioq, int dfd, const char *path, void *ptr) {
req->path = path;
req->ptr = ptr;
- ++ioq->size;
ioqq_push(ioq->pending, cmd);
+ ++ioq->size;
return 0;
}
@@ -238,11 +365,7 @@ struct ioq_res *ioq_pop(struct ioq *ioq) {
return NULL;
}
- struct ioq_cmd *cmd = ioqq_pop(ioq->ready);
- if (!cmd) {
- return NULL;
- }
-
+ union ioq_cmd *cmd = ioqq_pop(ioq->ready);
--ioq->size;
return &cmd->res;
}
@@ -252,7 +375,7 @@ struct ioq_res *ioq_trypop(struct ioq *ioq) {
return NULL;
}
- struct ioq_cmd *cmd = ioqq_trypop(ioq->ready);
+ union ioq_cmd *cmd = ioqq_trypop(ioq->ready);
if (!cmd) {
return NULL;
}
@@ -262,7 +385,7 @@ struct ioq_res *ioq_trypop(struct ioq *ioq) {
}
void ioq_free(struct ioq *ioq, struct ioq_res *res) {
- struct ioq_cmd *cmd = (struct ioq_cmd *)res;
+ union ioq_cmd *cmd = (union ioq_cmd *)res;
free(cmd);
}
@@ -271,8 +394,8 @@ void ioq_destroy(struct ioq *ioq) {
return;
}
- if (ioq->pending) {
- ioqq_stop(ioq->pending);
+ for (size_t i = 0; i < ioq->nthreads; ++i) {
+ ioqq_push(ioq->pending, &IOQ_STOP);
}
for (size_t i = 0; i < ioq->nthreads; ++i) {