From 222ac5ba4fbab0ab880e36423d0f1338e39b02c7 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Tue, 4 Jul 2023 15:17:23 -0400 Subject: ioq: Implement async close() and closedir() --- src/bftw.c | 41 ++++++++------- src/ioq.c | 175 ++++++++++++++++++++++++++++++++++++------------------------- src/ioq.h | 81 ++++++++++++++++++++++++---- 3 files changed, 197 insertions(+), 100 deletions(-) (limited to 'src') diff --git a/src/bftw.c b/src/bftw.c index ec67817..64f221b 100644 --- a/src/bftw.c +++ b/src/bftw.c @@ -901,34 +901,39 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) { return -1; } - struct ioq_res *res = block ? ioq_pop(ioq) : ioq_trypop(ioq); - if (!res) { + struct ioq_ent *ent = block ? ioq_pop(ioq) : ioq_trypop(ioq); + if (!ent) { return -1; } struct bftw_cache *cache = &state->cache; - ++cache->capacity; - - struct bftw_file *file = res->ptr; - file->ioqueued = false; + struct bftw_file *file; + struct bfs_dir *dir; - if (file->parent) { - bftw_cache_unpin(cache, file->parent); - } + enum ioq_op op = ent->op; + if (op == IOQ_OPENDIR) { + file = ent->ptr; + file->ioqueued = false; - if (res->error) { - arena_free(&cache->dirs, res->dir); - } else { - bftw_file_set_dir(cache, file, res->dir); - } + ++cache->capacity; + if (file->parent) { + bftw_cache_unpin(cache, file->parent); + } - ioq_free(ioq, res); + dir = ent->opendir.dir; + if (ent->ret == 0) { + bftw_file_set_dir(cache, file, dir); + } else { + arena_free(&cache->dirs, dir); + } - if (!(state->flags & BFTW_SORT)) { - SLIST_PREPEND(&state->dirs, file); + if (!(state->flags & BFTW_SORT)) { + SLIST_PREPEND(&state->dirs, file); + } } - return 0; + ioq_free(ioq, ent); + return op; } /** Try to reserve space in the I/O queue. */ diff --git a/src/ioq.c b/src/ioq.c index 5673c77..0544044 100644 --- a/src/ioq.c +++ b/src/ioq.c @@ -16,29 +16,6 @@ #include #include -/** - * An I/O queue request. - */ -struct ioq_req { - /** Directory allocation. */ - struct bfs_dir *dir; - /** Base file descriptor for openat(). */ - int dfd; - /** Path to open, relative to dfd. */ - const char *path; - - /** Arbitrary user data. */ - void *ptr; -}; - -/** - * An I/O queue command. - */ -union ioq_cmd { - struct ioq_req req; - struct ioq_res res; -}; - /** * A monitor for an I/O queue slot. */ @@ -101,7 +78,7 @@ bfs_static_assert(IOQ_STRIDE % 2 == 1); /** Slot flag bit to indicate waiters. */ #define IOQ_BLOCKED ((uintptr_t)1) -bfs_static_assert(alignof(union ioq_cmd) > 1); +bfs_static_assert(alignof(struct ioq_ent) > 1); /** Destroy an I/O command queue. */ static void ioqq_destroy(struct ioqq *ioqq) { @@ -189,12 +166,12 @@ static void ioqq_wake(struct ioqq *ioqq, size_t i) { cond_broadcast(&monitor->cond); } -/** Push a command onto the queue. */ -static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) { +/** Push an entry onto the queue. */ +static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) { size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed); atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; - uintptr_t addr = (uintptr_t)cmd; + uintptr_t addr = (uintptr_t)ent; bfs_assert(!(addr & IOQ_BLOCKED)); uintptr_t prev = load(slot, relaxed); @@ -212,8 +189,8 @@ static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) { } } -/** Pop a command from a queue. */ -static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) { +/** Pop an entry from the queue. */ +static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) { size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed); atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; @@ -232,11 +209,11 @@ static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) { } prev &= ~IOQ_BLOCKED; - return (union ioq_cmd *)prev; + return (struct ioq_ent *)prev; } -/** Pop a command from a queue if one is available. */ -static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) { +/** Pop an entry from the queue if one is available. */ +static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) { size_t i = load(&ioqq->tail, relaxed); atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; @@ -257,11 +234,11 @@ static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) { bfs_assert(j == i, "ioqq_trypop() only supports a single consumer"); (void)j; - return (union ioq_cmd *)prev; + return (struct ioq_ent *)prev; } /** Sentinel stop command. */ -static union ioq_cmd IOQ_STOP; +static struct ioq_ent IOQ_STOP; struct ioq { /** The depth of the queue. */ @@ -271,8 +248,8 @@ struct ioq { /** Cancellation flag. */ atomic bool cancel; - /** ioq_cmd command arena. */ - struct arena cmds; + /** ioq_ent arena. */ + struct arena ents; /** Pending I/O requests. */ struct ioqq *pending; @@ -290,30 +267,50 @@ static void *ioq_work(void *ptr) { struct ioq *ioq = ptr; while (true) { - union ioq_cmd *cmd = ioqq_pop(ioq->pending); - if (cmd == &IOQ_STOP) { + struct ioq_ent *ent = ioqq_pop(ioq->pending); + if (ent == &IOQ_STOP) { break; } bool cancel = load(&ioq->cancel, relaxed); - struct ioq_req req = cmd->req; - sanitize_uninit(cmd); + ent->ret = -1; + + switch (ent->op) { + case IOQ_CLOSE: + // Always close(), even if we're cancelled, just like a real EINTR + ent->ret = xclose(ent->close.fd); + break; + + case IOQ_OPENDIR: + if (!cancel) { + struct ioq_opendir *args = &ent->opendir; + ent->ret = bfs_opendir(args->dir, args->dfd, args->path); + if (ent->ret == 0) { + bfs_polldir(args->dir); + } + } + break; + + case IOQ_CLOSEDIR: + ent->ret = bfs_closedir(ent->closedir.dir); + break; - struct ioq_res *res = &cmd->res; - res->ptr = req.ptr; - res->dir = req.dir; + default: + bfs_bug("Unknown ioq_op %d", (int)ent->op); + errno = ENOSYS; + break; + } if (cancel) { - res->error = EINTR; - } else if (bfs_opendir(req.dir, req.dfd, req.path) != 0) { - res->error = errno; + ent->error = EINTR; + } else if (ent->ret < 0) { + ent->error = errno; } else { - res->error = 0; - bfs_polldir(res->dir); + ent->error = 0; } - ioqq_push(ioq->ready, cmd); + ioqq_push(ioq->ready, ent); } return NULL; @@ -326,7 +323,7 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) { } ioq->depth = depth; - ARENA_INIT(&ioq->cmds, union ioq_cmd); + ARENA_INIT(&ioq->ents, struct ioq_ent); ioq->pending = ioqq_create(depth); if (!ioq->pending) { @@ -359,54 +356,88 @@ size_t ioq_capacity(const struct ioq *ioq) { return ioq->depth - ioq->size; } -int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) { +static struct ioq_ent *ioq_request(struct ioq *ioq, enum ioq_op op, void *ptr) { + if (load(&ioq->cancel, relaxed)) { + errno = EINTR; + return NULL; + } + if (ioq->size >= ioq->depth) { + errno = EAGAIN; + return NULL; + } + + struct ioq_ent *ent = arena_alloc(&ioq->ents); + if (!ent) { + return NULL; + } + + ent->op = op; + ent->ptr = ptr; + ++ioq->size; + return ent; +} + +int ioq_close(struct ioq *ioq, int fd, void *ptr) { + struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSE, ptr); + if (!ent) { return -1; } - union ioq_cmd *cmd = arena_alloc(&ioq->cmds); - if (!cmd) { + ent->close.fd = fd; + + ioqq_push(ioq->pending, ent); + return 0; +} + +int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) { + struct ioq_ent *ent = ioq_request(ioq, IOQ_OPENDIR, ptr); + if (!ent) { return -1; } - struct ioq_req *req = &cmd->req; - req->dir = dir; - req->dfd = dfd; - req->path = path; - req->ptr = ptr; + struct ioq_opendir *args = &ent->opendir; + args->dir = dir; + args->dfd = dfd; + args->path = path; - ioqq_push(ioq->pending, cmd); - ++ioq->size; + ioqq_push(ioq->pending, ent); return 0; } -struct ioq_res *ioq_pop(struct ioq *ioq) { - if (ioq->size == 0) { - return NULL; +int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) { + struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSEDIR, ptr); + if (!ent) { + return -1; } - union ioq_cmd *cmd = ioqq_pop(ioq->ready); - return &cmd->res; + ent->closedir.dir = dir; + + ioqq_push(ioq->pending, ent); + return 0; } -struct ioq_res *ioq_trypop(struct ioq *ioq) { +struct ioq_ent *ioq_pop(struct ioq *ioq) { if (ioq->size == 0) { return NULL; } - union ioq_cmd *cmd = ioqq_trypop(ioq->ready); - if (!cmd) { + return ioqq_pop(ioq->ready); +} + +struct ioq_ent *ioq_trypop(struct ioq *ioq) { + if (ioq->size == 0) { return NULL; } - return &cmd->res; + return ioqq_trypop(ioq->ready); } -void ioq_free(struct ioq *ioq, struct ioq_res *res) { +void ioq_free(struct ioq *ioq, struct ioq_ent *ent) { bfs_assert(ioq->size > 0); --ioq->size; - arena_free(&ioq->cmds, (union ioq_cmd *)res); + arena_free(&ioq->ents, ent); } void ioq_cancel(struct ioq *ioq) { @@ -431,7 +462,7 @@ void ioq_destroy(struct ioq *ioq) { ioqq_destroy(ioq->ready); ioqq_destroy(ioq->pending); - arena_destroy(&ioq->cmds); + arena_destroy(&ioq->ents); free(ioq); } diff --git a/src/ioq.h b/src/ioq.h index 9901293..99c18c2 100644 --- a/src/ioq.h +++ b/src/ioq.h @@ -16,16 +16,49 @@ struct ioq; /** - * An I/O queue response. + * I/O queue operations. */ -struct ioq_res { - /** The opened directory. */ - struct bfs_dir *dir; +enum ioq_op { + /** ioq_close(). */ + IOQ_CLOSE, + /** ioq_opendir(). */ + IOQ_OPENDIR, + /** ioq_closedir(). */ + IOQ_CLOSEDIR, +}; + +/** + * An I/O queue entry. + */ +struct ioq_ent { + /** The I/O operation. */ + enum ioq_op op; + + /** The return value of the operation. */ + int ret; /** The error code, if the operation failed. */ int error; /** Arbitrary user data. */ void *ptr; + + /** Operation-specific arguments. */ + union { + /** ioq_close() args. */ + struct ioq_close { + int fd; + } close; + /** ioq_opendir() args. */ + struct ioq_opendir { + struct bfs_dir *dir; + int dfd; + const char *path; + } opendir; + /** ioq_closedir() args. */ + struct ioq_closedir { + struct bfs_dir *dir; + } closedir; + }; }; /** @@ -45,6 +78,20 @@ struct ioq *ioq_create(size_t depth, size_t nthreads); */ size_t ioq_capacity(const struct ioq *ioq); +/** + * Asynchronous close(). + * + * @param ioq + * The I/O queue. + * @param fd + * The fd to close. + * @param ptr + * An arbitrary pointer to associate with the request. + * @return + * 0 on success, or -1 on failure. + */ +int ioq_close(struct ioq *ioq, int fd, void *ptr); + /** * Asynchronous bfs_opendir(). * @@ -63,6 +110,20 @@ size_t ioq_capacity(const struct ioq *ioq); */ int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr); +/** + * Asynchronous bfs_closedir(). + * + * @param ioq + * The I/O queue. + * @param dir + * The directory to close. + * @param ptr + * An arbitrary pointer to associate with the request. + * @return + * 0 on success, or -1 on failure. + */ +int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr); + /** * Pop a response from the queue. * @@ -71,7 +132,7 @@ int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, * @return * The next response, or NULL. */ -struct ioq_res *ioq_pop(struct ioq *ioq); +struct ioq_ent *ioq_pop(struct ioq *ioq); /** * Pop a response from the queue, without blocking. @@ -81,17 +142,17 @@ struct ioq_res *ioq_pop(struct ioq *ioq); * @return * The next response, or NULL. */ -struct ioq_res *ioq_trypop(struct ioq *ioq); +struct ioq_ent *ioq_trypop(struct ioq *ioq); /** - * Free a response. + * Free a queue entry. * * @param ioq * The I/O queue. - * @param res - * The response to free. + * @param ent + * The entry to free. */ -void ioq_free(struct ioq *ioq, struct ioq_res *res); +void ioq_free(struct ioq *ioq, struct ioq_ent *ent); /** * Cancel any pending I/O operations. -- cgit v1.2.3