summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/bftw.c41
-rw-r--r--src/ioq.c175
-rw-r--r--src/ioq.h81
3 files changed, 197 insertions, 100 deletions
diff --git a/src/bftw.c b/src/bftw.c
index ec67817..64f221b 100644
--- a/src/bftw.c
+++ b/src/bftw.c
@@ -901,34 +901,39 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) {
return -1;
}
- struct ioq_res *res = block ? ioq_pop(ioq) : ioq_trypop(ioq);
- if (!res) {
+ struct ioq_ent *ent = block ? ioq_pop(ioq) : ioq_trypop(ioq);
+ if (!ent) {
return -1;
}
struct bftw_cache *cache = &state->cache;
- ++cache->capacity;
-
- struct bftw_file *file = res->ptr;
- file->ioqueued = false;
+ struct bftw_file *file;
+ struct bfs_dir *dir;
- if (file->parent) {
- bftw_cache_unpin(cache, file->parent);
- }
+ enum ioq_op op = ent->op;
+ if (op == IOQ_OPENDIR) {
+ file = ent->ptr;
+ file->ioqueued = false;
- if (res->error) {
- arena_free(&cache->dirs, res->dir);
- } else {
- bftw_file_set_dir(cache, file, res->dir);
- }
+ ++cache->capacity;
+ if (file->parent) {
+ bftw_cache_unpin(cache, file->parent);
+ }
- ioq_free(ioq, res);
+ dir = ent->opendir.dir;
+ if (ent->ret == 0) {
+ bftw_file_set_dir(cache, file, dir);
+ } else {
+ arena_free(&cache->dirs, dir);
+ }
- if (!(state->flags & BFTW_SORT)) {
- SLIST_PREPEND(&state->dirs, file);
+ if (!(state->flags & BFTW_SORT)) {
+ SLIST_PREPEND(&state->dirs, file);
+ }
}
- return 0;
+ ioq_free(ioq, ent);
+ return op;
}
/** Try to reserve space in the I/O queue. */
diff --git a/src/ioq.c b/src/ioq.c
index 5673c77..0544044 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -17,29 +17,6 @@
#include <stdlib.h>
/**
- * An I/O queue request.
- */
-struct ioq_req {
- /** Directory allocation. */
- struct bfs_dir *dir;
- /** Base file descriptor for openat(). */
- int dfd;
- /** Path to open, relative to dfd. */
- const char *path;
-
- /** Arbitrary user data. */
- void *ptr;
-};
-
-/**
- * An I/O queue command.
- */
-union ioq_cmd {
- struct ioq_req req;
- struct ioq_res res;
-};
-
-/**
* A monitor for an I/O queue slot.
*/
struct ioq_monitor {
@@ -101,7 +78,7 @@ bfs_static_assert(IOQ_STRIDE % 2 == 1);
/** Slot flag bit to indicate waiters. */
#define IOQ_BLOCKED ((uintptr_t)1)
-bfs_static_assert(alignof(union ioq_cmd) > 1);
+bfs_static_assert(alignof(struct ioq_ent) > 1);
/** Destroy an I/O command queue. */
static void ioqq_destroy(struct ioqq *ioqq) {
@@ -189,12 +166,12 @@ static void ioqq_wake(struct ioqq *ioqq, size_t i) {
cond_broadcast(&monitor->cond);
}
-/** Push a command onto the queue. */
-static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) {
+/** Push an entry onto the queue. */
+static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) {
size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed);
atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
- uintptr_t addr = (uintptr_t)cmd;
+ uintptr_t addr = (uintptr_t)ent;
bfs_assert(!(addr & IOQ_BLOCKED));
uintptr_t prev = load(slot, relaxed);
@@ -212,8 +189,8 @@ static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) {
}
}
-/** Pop a command from a queue. */
-static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) {
+/** Pop an entry from the queue. */
+static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) {
size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed);
atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
@@ -232,11 +209,11 @@ static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) {
}
prev &= ~IOQ_BLOCKED;
- return (union ioq_cmd *)prev;
+ return (struct ioq_ent *)prev;
}
-/** Pop a command from a queue if one is available. */
-static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) {
+/** Pop an entry from the queue if one is available. */
+static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) {
size_t i = load(&ioqq->tail, relaxed);
atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask];
@@ -257,11 +234,11 @@ static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) {
bfs_assert(j == i, "ioqq_trypop() only supports a single consumer");
(void)j;
- return (union ioq_cmd *)prev;
+ return (struct ioq_ent *)prev;
}
/** Sentinel stop command. */
-static union ioq_cmd IOQ_STOP;
+static struct ioq_ent IOQ_STOP;
struct ioq {
/** The depth of the queue. */
@@ -271,8 +248,8 @@ struct ioq {
/** Cancellation flag. */
atomic bool cancel;
- /** ioq_cmd command arena. */
- struct arena cmds;
+ /** ioq_ent arena. */
+ struct arena ents;
/** Pending I/O requests. */
struct ioqq *pending;
@@ -290,30 +267,50 @@ static void *ioq_work(void *ptr) {
struct ioq *ioq = ptr;
while (true) {
- union ioq_cmd *cmd = ioqq_pop(ioq->pending);
- if (cmd == &IOQ_STOP) {
+ struct ioq_ent *ent = ioqq_pop(ioq->pending);
+ if (ent == &IOQ_STOP) {
break;
}
bool cancel = load(&ioq->cancel, relaxed);
- struct ioq_req req = cmd->req;
- sanitize_uninit(cmd);
+ ent->ret = -1;
+
+ switch (ent->op) {
+ case IOQ_CLOSE:
+ // Always close(), even if we're cancelled, just like a real EINTR
+ ent->ret = xclose(ent->close.fd);
+ break;
+
+ case IOQ_OPENDIR:
+ if (!cancel) {
+ struct ioq_opendir *args = &ent->opendir;
+ ent->ret = bfs_opendir(args->dir, args->dfd, args->path);
+ if (ent->ret == 0) {
+ bfs_polldir(args->dir);
+ }
+ }
+ break;
+
+ case IOQ_CLOSEDIR:
+ ent->ret = bfs_closedir(ent->closedir.dir);
+ break;
- struct ioq_res *res = &cmd->res;
- res->ptr = req.ptr;
- res->dir = req.dir;
+ default:
+ bfs_bug("Unknown ioq_op %d", (int)ent->op);
+ errno = ENOSYS;
+ break;
+ }
if (cancel) {
- res->error = EINTR;
- } else if (bfs_opendir(req.dir, req.dfd, req.path) != 0) {
- res->error = errno;
+ ent->error = EINTR;
+ } else if (ent->ret < 0) {
+ ent->error = errno;
} else {
- res->error = 0;
- bfs_polldir(res->dir);
+ ent->error = 0;
}
- ioqq_push(ioq->ready, cmd);
+ ioqq_push(ioq->ready, ent);
}
return NULL;
@@ -326,7 +323,7 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) {
}
ioq->depth = depth;
- ARENA_INIT(&ioq->cmds, union ioq_cmd);
+ ARENA_INIT(&ioq->ents, struct ioq_ent);
ioq->pending = ioqq_create(depth);
if (!ioq->pending) {
@@ -359,54 +356,88 @@ size_t ioq_capacity(const struct ioq *ioq) {
return ioq->depth - ioq->size;
}
-int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) {
+static struct ioq_ent *ioq_request(struct ioq *ioq, enum ioq_op op, void *ptr) {
+ if (load(&ioq->cancel, relaxed)) {
+ errno = EINTR;
+ return NULL;
+ }
+
if (ioq->size >= ioq->depth) {
+ errno = EAGAIN;
+ return NULL;
+ }
+
+ struct ioq_ent *ent = arena_alloc(&ioq->ents);
+ if (!ent) {
+ return NULL;
+ }
+
+ ent->op = op;
+ ent->ptr = ptr;
+ ++ioq->size;
+ return ent;
+}
+
+int ioq_close(struct ioq *ioq, int fd, void *ptr) {
+ struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSE, ptr);
+ if (!ent) {
return -1;
}
- union ioq_cmd *cmd = arena_alloc(&ioq->cmds);
- if (!cmd) {
+ ent->close.fd = fd;
+
+ ioqq_push(ioq->pending, ent);
+ return 0;
+}
+
+int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) {
+ struct ioq_ent *ent = ioq_request(ioq, IOQ_OPENDIR, ptr);
+ if (!ent) {
return -1;
}
- struct ioq_req *req = &cmd->req;
- req->dir = dir;
- req->dfd = dfd;
- req->path = path;
- req->ptr = ptr;
+ struct ioq_opendir *args = &ent->opendir;
+ args->dir = dir;
+ args->dfd = dfd;
+ args->path = path;
- ioqq_push(ioq->pending, cmd);
- ++ioq->size;
+ ioqq_push(ioq->pending, ent);
return 0;
}
-struct ioq_res *ioq_pop(struct ioq *ioq) {
- if (ioq->size == 0) {
- return NULL;
+int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) {
+ struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSEDIR, ptr);
+ if (!ent) {
+ return -1;
}
- union ioq_cmd *cmd = ioqq_pop(ioq->ready);
- return &cmd->res;
+ ent->closedir.dir = dir;
+
+ ioqq_push(ioq->pending, ent);
+ return 0;
}
-struct ioq_res *ioq_trypop(struct ioq *ioq) {
+struct ioq_ent *ioq_pop(struct ioq *ioq) {
if (ioq->size == 0) {
return NULL;
}
- union ioq_cmd *cmd = ioqq_trypop(ioq->ready);
- if (!cmd) {
+ return ioqq_pop(ioq->ready);
+}
+
+struct ioq_ent *ioq_trypop(struct ioq *ioq) {
+ if (ioq->size == 0) {
return NULL;
}
- return &cmd->res;
+ return ioqq_trypop(ioq->ready);
}
-void ioq_free(struct ioq *ioq, struct ioq_res *res) {
+void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {
bfs_assert(ioq->size > 0);
--ioq->size;
- arena_free(&ioq->cmds, (union ioq_cmd *)res);
+ arena_free(&ioq->ents, ent);
}
void ioq_cancel(struct ioq *ioq) {
@@ -431,7 +462,7 @@ void ioq_destroy(struct ioq *ioq) {
ioqq_destroy(ioq->ready);
ioqq_destroy(ioq->pending);
- arena_destroy(&ioq->cmds);
+ arena_destroy(&ioq->ents);
free(ioq);
}
diff --git a/src/ioq.h b/src/ioq.h
index 9901293..99c18c2 100644
--- a/src/ioq.h
+++ b/src/ioq.h
@@ -16,16 +16,49 @@
struct ioq;
/**
- * An I/O queue response.
+ * I/O queue operations.
*/
-struct ioq_res {
- /** The opened directory. */
- struct bfs_dir *dir;
+enum ioq_op {
+ /** ioq_close(). */
+ IOQ_CLOSE,
+ /** ioq_opendir(). */
+ IOQ_OPENDIR,
+ /** ioq_closedir(). */
+ IOQ_CLOSEDIR,
+};
+
+/**
+ * An I/O queue entry.
+ */
+struct ioq_ent {
+ /** The I/O operation. */
+ enum ioq_op op;
+
+ /** The return value of the operation. */
+ int ret;
/** The error code, if the operation failed. */
int error;
/** Arbitrary user data. */
void *ptr;
+
+ /** Operation-specific arguments. */
+ union {
+ /** ioq_close() args. */
+ struct ioq_close {
+ int fd;
+ } close;
+ /** ioq_opendir() args. */
+ struct ioq_opendir {
+ struct bfs_dir *dir;
+ int dfd;
+ const char *path;
+ } opendir;
+ /** ioq_closedir() args. */
+ struct ioq_closedir {
+ struct bfs_dir *dir;
+ } closedir;
+ };
};
/**
@@ -46,6 +79,20 @@ struct ioq *ioq_create(size_t depth, size_t nthreads);
size_t ioq_capacity(const struct ioq *ioq);
/**
+ * Asynchronous close().
+ *
+ * @param ioq
+ * The I/O queue.
+ * @param fd
+ * The fd to close.
+ * @param ptr
+ * An arbitrary pointer to associate with the request.
+ * @return
+ * 0 on success, or -1 on failure.
+ */
+int ioq_close(struct ioq *ioq, int fd, void *ptr);
+
+/**
* Asynchronous bfs_opendir().
*
* @param ioq
@@ -64,6 +111,20 @@ size_t ioq_capacity(const struct ioq *ioq);
int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr);
/**
+ * Asynchronous bfs_closedir().
+ *
+ * @param ioq
+ * The I/O queue.
+ * @param dir
+ * The directory to close.
+ * @param ptr
+ * An arbitrary pointer to associate with the request.
+ * @return
+ * 0 on success, or -1 on failure.
+ */
+int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr);
+
+/**
* Pop a response from the queue.
*
* @param ioq
@@ -71,7 +132,7 @@ int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path,
* @return
* The next response, or NULL.
*/
-struct ioq_res *ioq_pop(struct ioq *ioq);
+struct ioq_ent *ioq_pop(struct ioq *ioq);
/**
* Pop a response from the queue, without blocking.
@@ -81,17 +142,17 @@ struct ioq_res *ioq_pop(struct ioq *ioq);
* @return
* The next response, or NULL.
*/
-struct ioq_res *ioq_trypop(struct ioq *ioq);
+struct ioq_ent *ioq_trypop(struct ioq *ioq);
/**
- * Free a response.
+ * Free a queue entry.
*
* @param ioq
* The I/O queue.
- * @param res
- * The response to free.
+ * @param ent
+ * The entry to free.
*/
-void ioq_free(struct ioq *ioq, struct ioq_res *res);
+void ioq_free(struct ioq *ioq, struct ioq_ent *ent);
/**
* Cancel any pending I/O operations.