From e8df57b5a49a70e2daa5bb6c00b8e0e06c51306a Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Thu, 6 Apr 2023 14:55:25 -0400 Subject: ioq: Implement an async I/O queue --- src/ioq.c | 284 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/ioq.h | 94 ++++++++++++++++++++ src/main.c | 1 + 3 files changed, 379 insertions(+) create mode 100644 src/ioq.c create mode 100644 src/ioq.h (limited to 'src') diff --git a/src/ioq.c b/src/ioq.c new file mode 100644 index 0000000..e09c2a9 --- /dev/null +++ b/src/ioq.c @@ -0,0 +1,284 @@ +// Copyright © Tavian Barnes +// SPDX-License-Identifier: 0BSD + +#include "ioq.h" +#include "dir.h" +#include "list.h" +#include "lock.h" +#include "sanity.h" +#include +#include +#include +#include +#include + +/** + * An I/O queue request. + */ +struct ioq_req { + /** Base file descriptor for openat(). */ + int dfd; + /** Relative path to dfd. */ + const char *path; + + /** Arbitrary user data. */ + void *ptr; +}; + +/** + * An I/O queue command. + */ +struct ioq_cmd { + union { + struct ioq_req req; + struct ioq_res res; + }; + + struct ioq_cmd *next; +}; + +/** + * An MPMC queue of I/O commands. + */ +struct ioqq { + pthread_mutex_t mutex; + pthread_cond_t cond; + + bool stop; + + struct ioq_cmd *head; + struct ioq_cmd **tail; +}; + +static struct ioqq *ioqq_create(void) { + struct ioqq *ioqq = malloc(sizeof(*ioqq)); + if (!ioqq) { + goto fail; + } + + if (mutex_init(&ioqq->mutex, NULL) != 0) { + goto fail_free; + } + + if (cond_init(&ioqq->cond, NULL) != 0) { + goto fail_mutex; + } + + ioqq->stop = false; + SLIST_INIT(ioqq); + return ioqq; + +fail_mutex: + mutex_destroy(&ioqq->mutex); +fail_free: + free(ioqq); +fail: + return NULL; +} + +/** Push a command onto the queue. */ +static void ioqq_push(struct ioqq *ioqq, struct ioq_cmd *cmd) { + mutex_lock(&ioqq->mutex); + SLIST_APPEND(ioqq, cmd); + mutex_unlock(&ioqq->mutex); + cond_signal(&ioqq->cond); +} + +/** Pop a command from a queue. */ +static struct ioq_cmd *ioqq_pop(struct ioqq *ioqq) { + mutex_lock(&ioqq->mutex); + + while (!ioqq->stop && !ioqq->head) { + cond_wait(&ioqq->cond, &ioqq->mutex); + } + + struct ioq_cmd *cmd = SLIST_POP(ioqq); + mutex_unlock(&ioqq->mutex); + return cmd; +} + +/** Pop a command from a queue without blocking. */ +static struct ioq_cmd *ioqq_trypop(struct ioqq *ioqq) { + if (!mutex_trylock(&ioqq->mutex)) { + return NULL; + } + + struct ioq_cmd *cmd = SLIST_POP(ioqq); + mutex_unlock(&ioqq->mutex); + return cmd; +} + +/** Stop a queue, waking up any waiters. */ +static void ioqq_stop(struct ioqq *ioqq) { + mutex_lock(&ioqq->mutex); + ioqq->stop = true; + mutex_unlock(&ioqq->mutex); + cond_broadcast(&ioqq->cond); +} + +static void ioqq_destroy(struct ioqq *ioqq) { + if (ioqq) { + cond_destroy(&ioqq->cond); + mutex_destroy(&ioqq->mutex); + free(ioqq); + } +} + +struct ioq { + /** The depth of the queue. */ + size_t depth; + /** The current size of the queue. */ + size_t size; + + /** Pending I/O requests. */ + struct ioqq *pending; + /** Ready I/O responses. */ + struct ioqq *ready; + + /** The number of background threads. */ + size_t nthreads; + /** The background threads themselves. */ + pthread_t *threads; +}; + +/** Background thread entry point. */ +static void *ioq_work(void *ptr) { + struct ioq *ioq = ptr; + + while (true) { + struct ioq_cmd *cmd = ioqq_pop(ioq->pending); + if (!cmd) { + break; + } + + struct ioq_req req = cmd->req; + sanitize_uninit(cmd); + + struct ioq_res *res = &cmd->res; + res->dir = bfs_opendir(req.dfd, req.path); + res->error = errno; + ioqq_push(ioq->ready, cmd); + } + + return NULL; +} + +struct ioq *ioq_create(size_t depth, size_t threads) { + struct ioq *ioq = malloc(sizeof(*ioq)); + if (!ioq) { + goto fail; + } + + ioq->depth = depth; + ioq->size = 0; + ioq->pending = NULL; + ioq->ready = NULL; + ioq->nthreads = 0; + + ioq->pending = ioqq_create(); + if (!ioq->pending) { + goto fail; + } + + ioq->ready = ioqq_create(); + if (!ioq->ready) { + goto fail; + } + + ioq->threads = malloc(threads * sizeof(ioq->threads[0])); + if (!ioq->threads) { + goto fail; + } + + for (size_t i = 0; i < threads; ++i) { + errno = pthread_create(&ioq->threads[i], NULL, ioq_work, ioq); + if (errno != 0) { + goto fail; + } + ++ioq->nthreads; + } + + return ioq; + + int err; +fail: + err = errno; + ioq_destroy(ioq); + errno = err; + return NULL; +} + +int ioq_opendir(struct ioq *ioq, int dfd, const char *path, void *ptr) { + if (ioq->size >= ioq->depth) { + return -1; + } + + struct ioq_cmd *cmd = malloc(sizeof(*cmd)); + if (!cmd) { + return -1; + } + + struct ioq_req *req = &cmd->req; + req->dfd = dfd; + req->path = path; + req->ptr = ptr; + + ++ioq->size; + ioqq_push(ioq->pending, cmd); + return 0; +} + +struct ioq_res *ioq_pop(struct ioq *ioq) { + if (ioq->size == 0) { + return NULL; + } + + struct ioq_cmd *cmd = ioqq_pop(ioq->ready); + if (!cmd) { + return NULL; + } + + --ioq->size; + return &cmd->res; +} + +struct ioq_res *ioq_trypop(struct ioq *ioq) { + if (ioq->size == 0) { + return NULL; + } + + struct ioq_cmd *cmd = ioqq_trypop(ioq->ready); + if (!cmd) { + return NULL; + } + + --ioq->size; + return &cmd->res; +} + +void ioq_free(struct ioq *ioq, struct ioq_res *res) { + struct ioq_cmd *cmd = (struct ioq_cmd *)res; + free(cmd); +} + +void ioq_destroy(struct ioq *ioq) { + if (!ioq) { + return; + } + + if (ioq->pending) { + ioqq_stop(ioq->pending); + } + + for (size_t i = 0; i < ioq->nthreads; ++i) { + if (pthread_join(ioq->threads[i], NULL) != 0) { + abort(); + } + } + free(ioq->threads); + + ioqq_destroy(ioq->ready); + ioqq_destroy(ioq->pending); + + free(ioq); +} diff --git a/src/ioq.h b/src/ioq.h new file mode 100644 index 0000000..9492034 --- /dev/null +++ b/src/ioq.h @@ -0,0 +1,94 @@ +// Copyright © Tavian Barnes +// SPDX-License-Identifier: 0BSD + +/** + * Asynchronous I/O queues. + */ + +#ifndef BFS_IOQ_H +#define BFS_IOQ_H + +#include + +/** + * An queue of asynchronous I/O operations. + */ +struct ioq; + +/** + * An I/O queue response. + */ +struct ioq_res { + /** The opened directory. */ + struct bfs_dir *dir; + /** The error code, if the operation failed. */ + int error; + + /** Arbitrary user data. */ + void *ptr; +}; + +/** + * Create an I/O queue. + * + * @param depth + * The maximum depth of the queue. + * @param threads + * The maximum number of background threads. + * @return + * The new I/O queue, or NULL on failure. + */ +struct ioq *ioq_create(size_t depth, size_t threads); + +/** + * Asynchronous bfs_opendir(). + * + * @param ioq + * The I/O queue. + * @param dfd + * The base file descriptor. + * @param path + * The path to open, relative to dfd. + * @param ptr + * An arbitrary pointer to associate with the request. + * @return + * 0 on success, or -1 on failure. + */ +int ioq_opendir(struct ioq *ioq, int dfd, const char *path, void *ptr); + +/** + * Pop a response from the queue. + * + * @param ioq + * The I/O queue. + * @return + * The next response, or NULL. + */ +struct ioq_res *ioq_pop(struct ioq *ioq); + +/** + * Pop a response from the queue, without blocking. + * + * @param ioq + * The I/O queue. + * @return + * The next response, or NULL. + */ +struct ioq_res *ioq_trypop(struct ioq *ioq); + +/** + * Free a response. + * + * @param ioq + * The I/O queue. + * @param res + * The response to free. + */ +void ioq_free(struct ioq *ioq, struct ioq_res *res); + +/** + * Stop and destroy an I/O queue. + */ +void ioq_destroy(struct ioq *ioq); + +#endif // BFS_IOQ_H diff --git a/src/main.c b/src/main.c index d84f7c2..76dde86 100644 --- a/src/main.c +++ b/src/main.c @@ -31,6 +31,7 @@ * - dir.[ch] (a directory API facade) * - dstring.[ch] (a dynamic string library) * - fsade.[ch] (a facade over non-standard filesystem features) + * - ioq.[ch] (an async I/O queue) * - list.h (linked list macros) * - lock.h (mutexes, condition variables, etc.) * - mtab.[ch] (parses the system's mount table) -- cgit v1.2.3