summaryrefslogtreecommitdiffstats
path: root/src/ioq.c
diff options
context:
space:
mode:
authorTavian Barnes <tavianator@tavianator.com>2023-07-04 15:17:23 -0400
committerTavian Barnes <tavianator@tavianator.com>2023-07-10 14:53:46 -0400
commit222ac5ba4fbab0ab880e36423d0f1338e39b02c7 (patch)
treee2f4610625ac5fbe58eba9ff121bce745b3c940f /src/ioq.c
parent35d357793c35ed0263d57a439323027c80a4a5b7 (diff)
downloadbfs-222ac5ba4fbab0ab880e36423d0f1338e39b02c7.tar.xz
ioq: Implement async close() and closedir()
Diffstat (limited to 'src/ioq.c')
-rw-r--r--src/ioq.c175
1 files changed, 103 insertions, 72 deletions
diff --git a/src/ioq.c b/src/ioq.c
index 5673c77..0544044 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -17,29 +17,6 @@
#include <stdlib.h>
/**
- * An I/O queue request.
- */
-struct ioq_req {
- /** Directory allocation. */
- struct bfs_dir *dir;
- /** Base file descriptor for openat(). */
- int dfd;
- /** Path to open, relative to dfd. */
- const char *path;
-
- /** Arbitrary user data. */
- void *ptr;
-};
-
-/**
- * An I/O queue command.
- */
-union ioq_cmd {
- struct ioq_req req;
- struct ioq_res res;
-};
-
-/**
* A monitor for an I/O queue slot.
*/
struct ioq_monitor {
@@ -101,7 +78,7 @@ bfs_static_assert(IOQ_STRIDE % 2 == 1);
/** Slot flag bit to indicate waiters. */
#define IOQ_BLOCKED ((uintptr_t)1)
-bfs_static_assert(alignof(union ioq_cmd) > 1);
+bfs_static_assert(alignof(struct ioq_ent) > 1);
/** Destroy an I/O command queue. */
static void ioqq_destroy(struct ioqq *ioqq) {
@@ -189,12 +166,12 @@ static void ioqq_wake(struct ioqq *ioqq, size_t i) {
cond_broadcast(&monitor->cond);
}
-/** Push a command onto the queue. */
-static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) {
+/** Push an entry onto the queue. */
+static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) {
size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed);
atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
- uintptr_t addr = (uintptr_t)cmd;
+ uintptr_t addr = (uintptr_t)ent;
bfs_assert(!(addr & IOQ_BLOCKED));
uintptr_t prev = load(slot, relaxed);
@@ -212,8 +189,8 @@ static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) {
}
}
-/** Pop a command from a queue. */
-static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) {
+/** Pop an entry from the queue. */
+static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) {
size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed);
atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
@@ -232,11 +209,11 @@ static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) {
}
prev &= ~IOQ_BLOCKED;
- return (union ioq_cmd *)prev;
+ return (struct ioq_ent *)prev;
}
-/** Pop a command from a queue if one is available. */
-static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) {
+/** Pop an entry from the queue if one is available. */
+static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) {
size_t i = load(&ioqq->tail, relaxed);
atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
@@ -257,11 +234,11 @@ static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) {
bfs_assert(j == i, "ioqq_trypop() only supports a single consumer");
(void)j;
- return (union ioq_cmd *)prev;
+ return (struct ioq_ent *)prev;
}
/** Sentinel stop command. */
-static union ioq_cmd IOQ_STOP;
+static struct ioq_ent IOQ_STOP;
struct ioq {
/** The depth of the queue. */
@@ -271,8 +248,8 @@ struct ioq {
/** Cancellation flag. */
atomic bool cancel;
- /** ioq_cmd command arena. */
- struct arena cmds;
+ /** ioq_ent arena. */
+ struct arena ents;
/** Pending I/O requests. */
struct ioqq *pending;
@@ -290,30 +267,50 @@ static void *ioq_work(void *ptr) {
struct ioq *ioq = ptr;
while (true) {
- union ioq_cmd *cmd = ioqq_pop(ioq->pending);
- if (cmd == &IOQ_STOP) {
+ struct ioq_ent *ent = ioqq_pop(ioq->pending);
+ if (ent == &IOQ_STOP) {
break;
}
bool cancel = load(&ioq->cancel, relaxed);
- struct ioq_req req = cmd->req;
- sanitize_uninit(cmd);
+ ent->ret = -1;
+
+ switch (ent->op) {
+ case IOQ_CLOSE:
+ // Always close(), even if we're cancelled, just like a real EINTR
+ ent->ret = xclose(ent->close.fd);
+ break;
+
+ case IOQ_OPENDIR:
+ if (!cancel) {
+ struct ioq_opendir *args = &ent->opendir;
+ ent->ret = bfs_opendir(args->dir, args->dfd, args->path);
+ if (ent->ret == 0) {
+ bfs_polldir(args->dir);
+ }
+ }
+ break;
+
+ case IOQ_CLOSEDIR:
+ ent->ret = bfs_closedir(ent->closedir.dir);
+ break;
- struct ioq_res *res = &cmd->res;
- res->ptr = req.ptr;
- res->dir = req.dir;
+ default:
+ bfs_bug("Unknown ioq_op %d", (int)ent->op);
+ errno = ENOSYS;
+ break;
+ }
if (cancel) {
- res->error = EINTR;
- } else if (bfs_opendir(req.dir, req.dfd, req.path) != 0) {
- res->error = errno;
+ ent->error = EINTR;
+ } else if (ent->ret < 0) {
+ ent->error = errno;
} else {
- res->error = 0;
- bfs_polldir(res->dir);
+ ent->error = 0;
}
- ioqq_push(ioq->ready, cmd);
+ ioqq_push(ioq->ready, ent);
}
return NULL;
@@ -326,7 +323,7 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) {
}
ioq->depth = depth;
- ARENA_INIT(&ioq->cmds, union ioq_cmd);
+ ARENA_INIT(&ioq->ents, struct ioq_ent);
ioq->pending = ioqq_create(depth);
if (!ioq->pending) {
@@ -359,54 +356,88 @@ size_t ioq_capacity(const struct ioq *ioq) {
return ioq->depth - ioq->size;
}
-int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) {
+static struct ioq_ent *ioq_request(struct ioq *ioq, enum ioq_op op, void *ptr) {
+ if (load(&ioq->cancel, relaxed)) {
+ errno = EINTR;
+ return NULL;
+ }
+
if (ioq->size >= ioq->depth) {
+ errno = EAGAIN;
+ return NULL;
+ }
+
+ struct ioq_ent *ent = arena_alloc(&ioq->ents);
+ if (!ent) {
+ return NULL;
+ }
+
+ ent->op = op;
+ ent->ptr = ptr;
+ ++ioq->size;
+ return ent;
+}
+
+int ioq_close(struct ioq *ioq, int fd, void *ptr) {
+ struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSE, ptr);
+ if (!ent) {
return -1;
}
- union ioq_cmd *cmd = arena_alloc(&ioq->cmds);
- if (!cmd) {
+ ent->close.fd = fd;
+
+ ioqq_push(ioq->pending, ent);
+ return 0;
+}
+
+int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) {
+ struct ioq_ent *ent = ioq_request(ioq, IOQ_OPENDIR, ptr);
+ if (!ent) {
return -1;
}
- struct ioq_req *req = &cmd->req;
- req->dir = dir;
- req->dfd = dfd;
- req->path = path;
- req->ptr = ptr;
+ struct ioq_opendir *args = &ent->opendir;
+ args->dir = dir;
+ args->dfd = dfd;
+ args->path = path;
- ioqq_push(ioq->pending, cmd);
- ++ioq->size;
+ ioqq_push(ioq->pending, ent);
return 0;
}
-struct ioq_res *ioq_pop(struct ioq *ioq) {
- if (ioq->size == 0) {
- return NULL;
+int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) {
+ struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSEDIR, ptr);
+ if (!ent) {
+ return -1;
}
- union ioq_cmd *cmd = ioqq_pop(ioq->ready);
- return &cmd->res;
+ ent->closedir.dir = dir;
+
+ ioqq_push(ioq->pending, ent);
+ return 0;
}
-struct ioq_res *ioq_trypop(struct ioq *ioq) {
+struct ioq_ent *ioq_pop(struct ioq *ioq) {
if (ioq->size == 0) {
return NULL;
}
- union ioq_cmd *cmd = ioqq_trypop(ioq->ready);
- if (!cmd) {
+ return ioqq_pop(ioq->ready);
+}
+
+struct ioq_ent *ioq_trypop(struct ioq *ioq) {
+ if (ioq->size == 0) {
return NULL;
}
- return &cmd->res;
+ return ioqq_trypop(ioq->ready);
}
-void ioq_free(struct ioq *ioq, struct ioq_res *res) {
+void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {
bfs_assert(ioq->size > 0);
--ioq->size;
- arena_free(&ioq->cmds, (union ioq_cmd *)res);
+ arena_free(&ioq->ents, ent);
}
void ioq_cancel(struct ioq *ioq) {
@@ -431,7 +462,7 @@ void ioq_destroy(struct ioq *ioq) {
ioqq_destroy(ioq->ready);
ioqq_destroy(ioq->pending);
- arena_destroy(&ioq->cmds);
+ arena_destroy(&ioq->ents);
free(ioq);
}