From 222ac5ba4fbab0ab880e36423d0f1338e39b02c7 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Tue, 4 Jul 2023 15:17:23 -0400 Subject: ioq: Implement async close() and closedir() --- src/ioq.c | 175 ++++++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 103 insertions(+), 72 deletions(-) (limited to 'src/ioq.c') diff --git a/src/ioq.c b/src/ioq.c index 5673c77..0544044 100644 --- a/src/ioq.c +++ b/src/ioq.c @@ -16,29 +16,6 @@ #include #include -/** - * An I/O queue request. - */ -struct ioq_req { - /** Directory allocation. */ - struct bfs_dir *dir; - /** Base file descriptor for openat(). */ - int dfd; - /** Path to open, relative to dfd. */ - const char *path; - - /** Arbitrary user data. */ - void *ptr; -}; - -/** - * An I/O queue command. - */ -union ioq_cmd { - struct ioq_req req; - struct ioq_res res; -}; - /** * A monitor for an I/O queue slot. */ @@ -101,7 +78,7 @@ bfs_static_assert(IOQ_STRIDE % 2 == 1); /** Slot flag bit to indicate waiters. */ #define IOQ_BLOCKED ((uintptr_t)1) -bfs_static_assert(alignof(union ioq_cmd) > 1); +bfs_static_assert(alignof(struct ioq_ent) > 1); /** Destroy an I/O command queue. */ static void ioqq_destroy(struct ioqq *ioqq) { @@ -189,12 +166,12 @@ static void ioqq_wake(struct ioqq *ioqq, size_t i) { cond_broadcast(&monitor->cond); } -/** Push a command onto the queue. */ -static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) { +/** Push an entry onto the queue. */ +static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) { size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed); atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; - uintptr_t addr = (uintptr_t)cmd; + uintptr_t addr = (uintptr_t)ent; bfs_assert(!(addr & IOQ_BLOCKED)); uintptr_t prev = load(slot, relaxed); @@ -212,8 +189,8 @@ static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) { } } -/** Pop a command from a queue. */ -static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) { +/** Pop an entry from the queue. */ +static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) { size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed); atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; @@ -232,11 +209,11 @@ static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) { } prev &= ~IOQ_BLOCKED; - return (union ioq_cmd *)prev; + return (struct ioq_ent *)prev; } -/** Pop a command from a queue if one is available. */ -static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) { +/** Pop an entry from the queue if one is available. */ +static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) { size_t i = load(&ioqq->tail, relaxed); atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; @@ -257,11 +234,11 @@ static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) { bfs_assert(j == i, "ioqq_trypop() only supports a single consumer"); (void)j; - return (union ioq_cmd *)prev; + return (struct ioq_ent *)prev; } /** Sentinel stop command. */ -static union ioq_cmd IOQ_STOP; +static struct ioq_ent IOQ_STOP; struct ioq { /** The depth of the queue. */ @@ -271,8 +248,8 @@ struct ioq { /** Cancellation flag. */ atomic bool cancel; - /** ioq_cmd command arena. */ - struct arena cmds; + /** ioq_ent arena. */ + struct arena ents; /** Pending I/O requests. */ struct ioqq *pending; @@ -290,30 +267,50 @@ static void *ioq_work(void *ptr) { struct ioq *ioq = ptr; while (true) { - union ioq_cmd *cmd = ioqq_pop(ioq->pending); - if (cmd == &IOQ_STOP) { + struct ioq_ent *ent = ioqq_pop(ioq->pending); + if (ent == &IOQ_STOP) { break; } bool cancel = load(&ioq->cancel, relaxed); - struct ioq_req req = cmd->req; - sanitize_uninit(cmd); + ent->ret = -1; + + switch (ent->op) { + case IOQ_CLOSE: + // Always close(), even if we're cancelled, just like a real EINTR + ent->ret = xclose(ent->close.fd); + break; + + case IOQ_OPENDIR: + if (!cancel) { + struct ioq_opendir *args = &ent->opendir; + ent->ret = bfs_opendir(args->dir, args->dfd, args->path); + if (ent->ret == 0) { + bfs_polldir(args->dir); + } + } + break; + + case IOQ_CLOSEDIR: + ent->ret = bfs_closedir(ent->closedir.dir); + break; - struct ioq_res *res = &cmd->res; - res->ptr = req.ptr; - res->dir = req.dir; + default: + bfs_bug("Unknown ioq_op %d", (int)ent->op); + errno = ENOSYS; + break; + } if (cancel) { - res->error = EINTR; - } else if (bfs_opendir(req.dir, req.dfd, req.path) != 0) { - res->error = errno; + ent->error = EINTR; + } else if (ent->ret < 0) { + ent->error = errno; } else { - res->error = 0; - bfs_polldir(res->dir); + ent->error = 0; } - ioqq_push(ioq->ready, cmd); + ioqq_push(ioq->ready, ent); } return NULL; @@ -326,7 +323,7 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) { } ioq->depth = depth; - ARENA_INIT(&ioq->cmds, union ioq_cmd); + ARENA_INIT(&ioq->ents, struct ioq_ent); ioq->pending = ioqq_create(depth); if (!ioq->pending) { @@ -359,54 +356,88 @@ size_t ioq_capacity(const struct ioq *ioq) { return ioq->depth - ioq->size; } -int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) { +static struct ioq_ent *ioq_request(struct ioq *ioq, enum ioq_op op, void *ptr) { + if (load(&ioq->cancel, relaxed)) { + errno = EINTR; + return NULL; + } + if (ioq->size >= ioq->depth) { + errno = EAGAIN; + return NULL; + } + + struct ioq_ent *ent = arena_alloc(&ioq->ents); + if (!ent) { + return NULL; + } + + ent->op = op; + ent->ptr = ptr; + ++ioq->size; + return ent; +} + +int ioq_close(struct ioq *ioq, int fd, void *ptr) { + struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSE, ptr); + if (!ent) { return -1; } - union ioq_cmd *cmd = arena_alloc(&ioq->cmds); - if (!cmd) { + ent->close.fd = fd; + + ioqq_push(ioq->pending, ent); + return 0; +} + +int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) { + struct ioq_ent *ent = ioq_request(ioq, IOQ_OPENDIR, ptr); + if (!ent) { return -1; } - struct ioq_req *req = &cmd->req; - req->dir = dir; - req->dfd = dfd; - req->path = path; - req->ptr = ptr; + struct ioq_opendir *args = &ent->opendir; + args->dir = dir; + args->dfd = dfd; + args->path = path; - ioqq_push(ioq->pending, cmd); - ++ioq->size; + ioqq_push(ioq->pending, ent); return 0; } -struct ioq_res *ioq_pop(struct ioq *ioq) { - if (ioq->size == 0) { - return NULL; +int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) { + struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSEDIR, ptr); + if (!ent) { + return -1; } - union ioq_cmd *cmd = ioqq_pop(ioq->ready); - return &cmd->res; + ent->closedir.dir = dir; + + ioqq_push(ioq->pending, ent); + return 0; } -struct ioq_res *ioq_trypop(struct ioq *ioq) { +struct ioq_ent *ioq_pop(struct ioq *ioq) { if (ioq->size == 0) { return NULL; } - union ioq_cmd *cmd = ioqq_trypop(ioq->ready); - if (!cmd) { + return ioqq_pop(ioq->ready); +} + +struct ioq_ent *ioq_trypop(struct ioq *ioq) { + if (ioq->size == 0) { return NULL; } - return &cmd->res; + return ioqq_trypop(ioq->ready); } -void ioq_free(struct ioq *ioq, struct ioq_res *res) { +void ioq_free(struct ioq *ioq, struct ioq_ent *ent) { bfs_assert(ioq->size > 0); --ioq->size; - arena_free(&ioq->cmds, (union ioq_cmd *)res); + arena_free(&ioq->ents, ent); } void ioq_cancel(struct ioq *ioq) { @@ -431,7 +462,7 @@ void ioq_destroy(struct ioq *ioq) { ioqq_destroy(ioq->ready); ioqq_destroy(ioq->pending); - arena_destroy(&ioq->cmds); + arena_destroy(&ioq->ents); free(ioq); } -- cgit v1.2.3