summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTavian Barnes <tavianator@tavianator.com>2023-04-06 14:55:25 -0400
committerTavian Barnes <tavianator@tavianator.com>2023-06-12 14:39:34 -0400
commite8df57b5a49a70e2daa5bb6c00b8e0e06c51306a (patch)
treec2d343355e23bb415d52b116b195d33ae434f1ed
parenta69327f7844ca88ad5b6293bd334e5cb351d9591 (diff)
downloadbfs-e8df57b5a49a70e2daa5bb6c00b8e0e06c51306a.tar.xz
ioq: Implement an async I/O queue
-rw-r--r--Makefile1
-rw-r--r--src/ioq.c284
-rw-r--r--src/ioq.h94
-rw-r--r--src/main.c1
4 files changed, 380 insertions, 0 deletions
diff --git a/Makefile b/Makefile
index 20b4d07..8fc68fd 100644
--- a/Makefile
+++ b/Makefile
@@ -229,6 +229,7 @@ LIBBFS := \
$(OBJ)/src/eval.o \
$(OBJ)/src/exec.o \
$(OBJ)/src/fsade.o \
+ $(OBJ)/src/ioq.o \
$(OBJ)/src/mtab.o \
$(OBJ)/src/opt.o \
$(OBJ)/src/parse.o \
diff --git a/src/ioq.c b/src/ioq.c
new file mode 100644
index 0000000..e09c2a9
--- /dev/null
+++ b/src/ioq.c
@@ -0,0 +1,284 @@
+// Copyright © Tavian Barnes <tavianator@tavianator.com>
+// SPDX-License-Identifier: 0BSD
+
+#include "ioq.h"
+#include "dir.h"
+#include "list.h"
+#include "lock.h"
+#include "sanity.h"
+#include <assert.h>
+#include <errno.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+/**
+ * An I/O queue request.
+ */
+struct ioq_req {
+ /** Base file descriptor for openat(). */
+ int dfd;
+ /** Relative path to dfd. */
+ const char *path;
+
+ /** Arbitrary user data. */
+ void *ptr;
+};
+
+/**
+ * An I/O queue command.
+ */
+struct ioq_cmd {
+ union {
+ struct ioq_req req;
+ struct ioq_res res;
+ };
+
+ struct ioq_cmd *next;
+};
+
+/**
+ * An MPMC queue of I/O commands.
+ */
+struct ioqq {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+
+ bool stop;
+
+ struct ioq_cmd *head;
+ struct ioq_cmd **tail;
+};
+
+static struct ioqq *ioqq_create(void) {
+ struct ioqq *ioqq = malloc(sizeof(*ioqq));
+ if (!ioqq) {
+ goto fail;
+ }
+
+ if (mutex_init(&ioqq->mutex, NULL) != 0) {
+ goto fail_free;
+ }
+
+ if (cond_init(&ioqq->cond, NULL) != 0) {
+ goto fail_mutex;
+ }
+
+ ioqq->stop = false;
+ SLIST_INIT(ioqq);
+ return ioqq;
+
+fail_mutex:
+ mutex_destroy(&ioqq->mutex);
+fail_free:
+ free(ioqq);
+fail:
+ return NULL;
+}
+
+/** Push a command onto the queue. */
+static void ioqq_push(struct ioqq *ioqq, struct ioq_cmd *cmd) {
+ mutex_lock(&ioqq->mutex);
+ SLIST_APPEND(ioqq, cmd);
+ mutex_unlock(&ioqq->mutex);
+ cond_signal(&ioqq->cond);
+}
+
+/** Pop a command from a queue. */
+static struct ioq_cmd *ioqq_pop(struct ioqq *ioqq) {
+ mutex_lock(&ioqq->mutex);
+
+ while (!ioqq->stop && !ioqq->head) {
+ cond_wait(&ioqq->cond, &ioqq->mutex);
+ }
+
+ struct ioq_cmd *cmd = SLIST_POP(ioqq);
+ mutex_unlock(&ioqq->mutex);
+ return cmd;
+}
+
+/** Pop a command from a queue without blocking. */
+static struct ioq_cmd *ioqq_trypop(struct ioqq *ioqq) {
+ if (!mutex_trylock(&ioqq->mutex)) {
+ return NULL;
+ }
+
+ struct ioq_cmd *cmd = SLIST_POP(ioqq);
+ mutex_unlock(&ioqq->mutex);
+ return cmd;
+}
+
+/** Stop a queue, waking up any waiters. */
+static void ioqq_stop(struct ioqq *ioqq) {
+ mutex_lock(&ioqq->mutex);
+ ioqq->stop = true;
+ mutex_unlock(&ioqq->mutex);
+ cond_broadcast(&ioqq->cond);
+}
+
+static void ioqq_destroy(struct ioqq *ioqq) {
+ if (ioqq) {
+ cond_destroy(&ioqq->cond);
+ mutex_destroy(&ioqq->mutex);
+ free(ioqq);
+ }
+}
+
+struct ioq {
+ /** The depth of the queue. */
+ size_t depth;
+ /** The current size of the queue. */
+ size_t size;
+
+ /** Pending I/O requests. */
+ struct ioqq *pending;
+ /** Ready I/O responses. */
+ struct ioqq *ready;
+
+ /** The number of background threads. */
+ size_t nthreads;
+ /** The background threads themselves. */
+ pthread_t *threads;
+};
+
+/** Background thread entry point. */
+static void *ioq_work(void *ptr) {
+ struct ioq *ioq = ptr;
+
+ while (true) {
+ struct ioq_cmd *cmd = ioqq_pop(ioq->pending);
+ if (!cmd) {
+ break;
+ }
+
+ struct ioq_req req = cmd->req;
+ sanitize_uninit(cmd);
+
+ struct ioq_res *res = &cmd->res;
+ res->dir = bfs_opendir(req.dfd, req.path);
+ res->error = errno;
+ ioqq_push(ioq->ready, cmd);
+ }
+
+ return NULL;
+}
+
+struct ioq *ioq_create(size_t depth, size_t threads) {
+ struct ioq *ioq = malloc(sizeof(*ioq));
+ if (!ioq) {
+ goto fail;
+ }
+
+ ioq->depth = depth;
+ ioq->size = 0;
+ ioq->pending = NULL;
+ ioq->ready = NULL;
+ ioq->nthreads = 0;
+
+ ioq->pending = ioqq_create();
+ if (!ioq->pending) {
+ goto fail;
+ }
+
+ ioq->ready = ioqq_create();
+ if (!ioq->ready) {
+ goto fail;
+ }
+
+ ioq->threads = malloc(threads * sizeof(ioq->threads[0]));
+ if (!ioq->threads) {
+ goto fail;
+ }
+
+ for (size_t i = 0; i < threads; ++i) {
+ errno = pthread_create(&ioq->threads[i], NULL, ioq_work, ioq);
+ if (errno != 0) {
+ goto fail;
+ }
+ ++ioq->nthreads;
+ }
+
+ return ioq;
+
+ int err;
+fail:
+ err = errno;
+ ioq_destroy(ioq);
+ errno = err;
+ return NULL;
+}
+
+int ioq_opendir(struct ioq *ioq, int dfd, const char *path, void *ptr) {
+ if (ioq->size >= ioq->depth) {
+ return -1;
+ }
+
+ struct ioq_cmd *cmd = malloc(sizeof(*cmd));
+ if (!cmd) {
+ return -1;
+ }
+
+ struct ioq_req *req = &cmd->req;
+ req->dfd = dfd;
+ req->path = path;
+ req->ptr = ptr;
+
+ ++ioq->size;
+ ioqq_push(ioq->pending, cmd);
+ return 0;
+}
+
+struct ioq_res *ioq_pop(struct ioq *ioq) {
+ if (ioq->size == 0) {
+ return NULL;
+ }
+
+ struct ioq_cmd *cmd = ioqq_pop(ioq->ready);
+ if (!cmd) {
+ return NULL;
+ }
+
+ --ioq->size;
+ return &cmd->res;
+}
+
+struct ioq_res *ioq_trypop(struct ioq *ioq) {
+ if (ioq->size == 0) {
+ return NULL;
+ }
+
+ struct ioq_cmd *cmd = ioqq_trypop(ioq->ready);
+ if (!cmd) {
+ return NULL;
+ }
+
+ --ioq->size;
+ return &cmd->res;
+}
+
+void ioq_free(struct ioq *ioq, struct ioq_res *res) {
+ struct ioq_cmd *cmd = (struct ioq_cmd *)res;
+ free(cmd);
+}
+
+void ioq_destroy(struct ioq *ioq) {
+ if (!ioq) {
+ return;
+ }
+
+ if (ioq->pending) {
+ ioqq_stop(ioq->pending);
+ }
+
+ for (size_t i = 0; i < ioq->nthreads; ++i) {
+ if (pthread_join(ioq->threads[i], NULL) != 0) {
+ abort();
+ }
+ }
+ free(ioq->threads);
+
+ ioqq_destroy(ioq->ready);
+ ioqq_destroy(ioq->pending);
+
+ free(ioq);
+}
diff --git a/src/ioq.h b/src/ioq.h
new file mode 100644
index 0000000..9492034
--- /dev/null
+++ b/src/ioq.h
@@ -0,0 +1,94 @@
+// Copyright © Tavian Barnes <tavianator@tavianator.com>
+// SPDX-License-Identifier: 0BSD
+
+/**
+ * Asynchronous I/O queues.
+ */
+
+#ifndef BFS_IOQ_H
+#define BFS_IOQ_H
+
+#include <stddef.h>
+
+/**
+ * An queue of asynchronous I/O operations.
+ */
+struct ioq;
+
+/**
+ * An I/O queue response.
+ */
+struct ioq_res {
+ /** The opened directory. */
+ struct bfs_dir *dir;
+ /** The error code, if the operation failed. */
+ int error;
+
+ /** Arbitrary user data. */
+ void *ptr;
+};
+
+/**
+ * Create an I/O queue.
+ *
+ * @param depth
+ * The maximum depth of the queue.
+ * @param threads
+ * The maximum number of background threads.
+ * @return
+ * The new I/O queue, or NULL on failure.
+ */
+struct ioq *ioq_create(size_t depth, size_t threads);
+
+/**
+ * Asynchronous bfs_opendir().
+ *
+ * @param ioq
+ * The I/O queue.
+ * @param dfd
+ * The base file descriptor.
+ * @param path
+ * The path to open, relative to dfd.
+ * @param ptr
+ * An arbitrary pointer to associate with the request.
+ * @return
+ * 0 on success, or -1 on failure.
+ */
+int ioq_opendir(struct ioq *ioq, int dfd, const char *path, void *ptr);
+
+/**
+ * Pop a response from the queue.
+ *
+ * @param ioq
+ * The I/O queue.
+ * @return
+ * The next response, or NULL.
+ */
+struct ioq_res *ioq_pop(struct ioq *ioq);
+
+/**
+ * Pop a response from the queue, without blocking.
+ *
+ * @param ioq
+ * The I/O queue.
+ * @return
+ * The next response, or NULL.
+ */
+struct ioq_res *ioq_trypop(struct ioq *ioq);
+
+/**
+ * Free a response.
+ *
+ * @param ioq
+ * The I/O queue.
+ * @param res
+ * The response to free.
+ */
+void ioq_free(struct ioq *ioq, struct ioq_res *res);
+
+/**
+ * Stop and destroy an I/O queue.
+ */
+void ioq_destroy(struct ioq *ioq);
+
+#endif // BFS_IOQ_H
diff --git a/src/main.c b/src/main.c
index d84f7c2..76dde86 100644
--- a/src/main.c
+++ b/src/main.c
@@ -31,6 +31,7 @@
* - dir.[ch] (a directory API facade)
* - dstring.[ch] (a dynamic string library)
* - fsade.[ch] (a facade over non-standard filesystem features)
+ * - ioq.[ch] (an async I/O queue)
* - list.h (linked list macros)
* - lock.h (mutexes, condition variables, etc.)
* - mtab.[ch] (parses the system's mount table)