// Copyright © Tavian Barnes // SPDX-License-Identifier: 0BSD #include "ioq.h" #include "dir.h" #include "list.h" #include "lock.h" #include "sanity.h" #include #include #include #include #include /** * An I/O queue request. */ struct ioq_req { /** Base file descriptor for openat(). */ int dfd; /** Relative path to dfd. */ const char *path; /** Arbitrary user data. */ void *ptr; }; /** * An I/O queue command. */ struct ioq_cmd { union { struct ioq_req req; struct ioq_res res; }; struct ioq_cmd *next; }; /** * An MPMC queue of I/O commands. */ struct ioqq { pthread_mutex_t mutex; pthread_cond_t cond; bool stop; struct ioq_cmd *head; struct ioq_cmd **tail; }; static struct ioqq *ioqq_create(void) { struct ioqq *ioqq = malloc(sizeof(*ioqq)); if (!ioqq) { goto fail; } if (mutex_init(&ioqq->mutex, NULL) != 0) { goto fail_free; } if (cond_init(&ioqq->cond, NULL) != 0) { goto fail_mutex; } ioqq->stop = false; SLIST_INIT(ioqq); return ioqq; fail_mutex: mutex_destroy(&ioqq->mutex); fail_free: free(ioqq); fail: return NULL; } /** Push a command onto the queue. */ static void ioqq_push(struct ioqq *ioqq, struct ioq_cmd *cmd) { mutex_lock(&ioqq->mutex); SLIST_APPEND(ioqq, cmd); mutex_unlock(&ioqq->mutex); cond_signal(&ioqq->cond); } /** Pop a command from a queue. */ static struct ioq_cmd *ioqq_pop(struct ioqq *ioqq) { mutex_lock(&ioqq->mutex); while (!ioqq->stop && !ioqq->head) { cond_wait(&ioqq->cond, &ioqq->mutex); } struct ioq_cmd *cmd = SLIST_POP(ioqq); mutex_unlock(&ioqq->mutex); return cmd; } /** Pop a command from a queue without blocking. */ static struct ioq_cmd *ioqq_trypop(struct ioqq *ioqq) { if (!mutex_trylock(&ioqq->mutex)) { return NULL; } struct ioq_cmd *cmd = SLIST_POP(ioqq); mutex_unlock(&ioqq->mutex); return cmd; } /** Stop a queue, waking up any waiters. */ static void ioqq_stop(struct ioqq *ioqq) { mutex_lock(&ioqq->mutex); ioqq->stop = true; mutex_unlock(&ioqq->mutex); cond_broadcast(&ioqq->cond); } static void ioqq_destroy(struct ioqq *ioqq) { if (ioqq) { cond_destroy(&ioqq->cond); mutex_destroy(&ioqq->mutex); free(ioqq); } } struct ioq { /** The depth of the queue. */ size_t depth; /** The current size of the queue. */ size_t size; /** Pending I/O requests. */ struct ioqq *pending; /** Ready I/O responses. */ struct ioqq *ready; /** The number of background threads. */ size_t nthreads; /** The background threads themselves. */ pthread_t *threads; }; /** Background thread entry point. */ static void *ioq_work(void *ptr) { struct ioq *ioq = ptr; while (true) { struct ioq_cmd *cmd = ioqq_pop(ioq->pending); if (!cmd) { break; } struct ioq_req req = cmd->req; sanitize_uninit(cmd); struct ioq_res *res = &cmd->res; res->ptr = req.ptr; res->dir = bfs_opendir(req.dfd, req.path); res->error = errno; if (res->dir) { bfs_polldir(res->dir); } ioqq_push(ioq->ready, cmd); } return NULL; } struct ioq *ioq_create(size_t depth, size_t threads) { struct ioq *ioq = malloc(sizeof(*ioq)); if (!ioq) { goto fail; } ioq->depth = depth; ioq->size = 0; ioq->pending = NULL; ioq->ready = NULL; ioq->nthreads = 0; ioq->pending = ioqq_create(); if (!ioq->pending) { goto fail; } ioq->ready = ioqq_create(); if (!ioq->ready) { goto fail; } ioq->threads = malloc(threads * sizeof(ioq->threads[0])); if (!ioq->threads) { goto fail; } for (size_t i = 0; i < threads; ++i) { errno = pthread_create(&ioq->threads[i], NULL, ioq_work, ioq); if (errno != 0) { goto fail; } ++ioq->nthreads; } return ioq; int err; fail: err = errno; ioq_destroy(ioq); errno = err; return NULL; } int ioq_opendir(struct ioq *ioq, int dfd, const char *path, void *ptr) { if (ioq->size >= ioq->depth) { return -1; } struct ioq_cmd *cmd = malloc(sizeof(*cmd)); if (!cmd) { return -1; } struct ioq_req *req = &cmd->req; req->dfd = dfd; req->path = path; req->ptr = ptr; ++ioq->size; ioqq_push(ioq->pending, cmd); return 0; } struct ioq_res *ioq_pop(struct ioq *ioq) { if (ioq->size == 0) { return NULL; } struct ioq_cmd *cmd = ioqq_pop(ioq->ready); if (!cmd) { return NULL; } --ioq->size; return &cmd->res; } struct ioq_res *ioq_trypop(struct ioq *ioq) { if (ioq->size == 0) { return NULL; } struct ioq_cmd *cmd = ioqq_trypop(ioq->ready); if (!cmd) { return NULL; } --ioq->size; return &cmd->res; } void ioq_free(struct ioq *ioq, struct ioq_res *res) { struct ioq_cmd *cmd = (struct ioq_cmd *)res; free(cmd); } void ioq_destroy(struct ioq *ioq) { if (!ioq) { return; } if (ioq->pending) { ioqq_stop(ioq->pending); } for (size_t i = 0; i < ioq->nthreads; ++i) { if (pthread_join(ioq->threads[i], NULL) != 0) { abort(); } } free(ioq->threads); ioqq_destroy(ioq->ready); ioqq_destroy(ioq->pending); free(ioq); }