summaryrefslogtreecommitdiffstats
path: root/src/bftw.c
diff options
context:
space:
mode:
authorTavian Barnes <tavianator@tavianator.com>2023-06-09 16:34:41 -0400
committerTavian Barnes <tavianator@tavianator.com>2023-06-13 11:06:47 -0400
commitc023cceb3f50d92ed565ea3f085883f86de0f3f0 (patch)
treeafa4b05301e7c339a78e65d4c7a0c3443f553a93 /src/bftw.c
parent0cca5b64e1355af5d2c3d935da4e110982273703 (diff)
downloadbfs-c023cceb3f50d92ed565ea3f085883f86de0f3f0.tar.xz
bftw: Use an I/O queue to open directories
Parallelism is controlled by the new -j flag.
Diffstat (limited to 'src/bftw.c')
-rw-r--r--src/bftw.c154
1 files changed, 147 insertions, 7 deletions
diff --git a/src/bftw.c b/src/bftw.c
index 4a49457..e711963 100644
--- a/src/bftw.c
+++ b/src/bftw.c
@@ -22,6 +22,7 @@
#include "diag.h"
#include "dir.h"
#include "dstring.h"
+#include "ioq.h"
#include "list.h"
#include "mtab.h"
#include "stat.h"
@@ -58,6 +59,8 @@ struct bftw_file {
size_t pincount;
/** An open descriptor to this file, or -1. */
int fd;
+ /** Whether this file has a pending ioq request. */
+ bool ioqueued;
/** An open directory for this file, if any. */
struct bfs_dir *dir;
@@ -264,6 +267,7 @@ static struct bftw_file *bftw_file_new(struct bftw_file *parent, const char *nam
file->refcount = 1;
file->pincount = 0;
file->fd = -1;
+ file->ioqueued = false;
file->dir = NULL;
file->type = BFS_UNKNOWN;
@@ -439,6 +443,8 @@ struct bftw_state {
/** The cache of open directories. */
struct bftw_cache cache;
+ /** The async I/O queue. */
+ struct ioq *ioq;
/** The queue of directories to read. */
struct bftw_list dirs;
/** The queue of files to visit. */
@@ -494,6 +500,25 @@ static int bftw_state_init(struct bftw_state *state, const struct bftw_args *arg
bftw_cache_init(&state->cache, args->nopenfd);
+ size_t qdepth = args->nopenfd - 1;
+ if (qdepth > 1024) {
+ qdepth = 1024;
+ }
+
+ size_t nthreads = args->nthreads;
+ if (nthreads > qdepth) {
+ nthreads = qdepth;
+ }
+
+ state->ioq = NULL;
+ if (nthreads > 0) {
+ state->ioq = ioq_create(qdepth, nthreads);
+ if (!state->ioq) {
+ dstrfree(state->path);
+ return -1;
+ }
+ }
+
SLIST_INIT(&state->dirs);
SLIST_INIT(&state->files);
SLIST_INIT(&state->batch);
@@ -846,15 +871,122 @@ static enum bftw_action bftw_call_back(struct bftw_state *state, const char *nam
}
}
+/** Push a directory onto the queue. */
+static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) {
+ bfs_assert(file->type == BFS_DIR);
+
+ struct bftw_cache *cache = &state->cache;
+
+ if (!state->ioq) {
+ goto append;
+ }
+
+ int dfd = AT_FDCWD;
+ if (file->parent) {
+ dfd = file->parent->fd;
+ if (dfd < 0) {
+ goto append;
+ }
+ bftw_cache_pin(cache, file->parent);
+ }
+
+ if (cache->capacity == 0) {
+ if (bftw_cache_pop(cache) != 0) {
+ goto unpin;
+ }
+ }
+ --cache->capacity;
+
+ if (ioq_opendir(state->ioq, dfd, file->name, file) != 0) {
+ ++cache->capacity;
+ goto unpin;
+ }
+
+ file->ioqueued = true;
+
+ if (state->flags & BFTW_SORT) {
+ goto append;
+ } else {
+ return;
+ }
+
+unpin:
+ if (file->parent) {
+ bftw_cache_unpin(cache, file->parent);
+ }
+append:
+ SLIST_APPEND(&state->dirs, file);
+}
+
+/** Pop a response from the I/O queue. */
+static int bftw_ioq_pop(struct bftw_state *state, bool block) {
+ if (!state->ioq) {
+ return -1;
+ }
+
+ struct ioq_res *res;
+ if (block) {
+ res = ioq_pop(state->ioq);
+ } else {
+ res = ioq_trypop(state->ioq);
+ }
+
+ if (!res) {
+ return -1;
+ }
+
+ struct bftw_cache *cache = &state->cache;
+ ++cache->capacity;
+
+ struct bftw_file *file = res->ptr;
+ file->ioqueued = false;
+
+ if (file->parent) {
+ bftw_cache_unpin(cache, file->parent);
+ }
+
+ if (res->dir) {
+ bftw_file_set_dir(cache, file, res->dir);
+ }
+
+ ioq_free(state->ioq, res);
+
+ if (!(state->flags & BFTW_SORT)) {
+ SLIST_PREPEND(&state->dirs, file);
+ }
+
+ return 0;
+}
+
/** Pop a directory to read from the queue. */
static bool bftw_pop_dir(struct bftw_state *state) {
bfs_assert(!state->file);
- if (state->files.head && state->strategy == BFTW_BFS) {
+ bool have_dirs = state->dirs.head;
+ bool have_files = state->files.head;
+ bool have_room = state->cache.capacity > 0;
+
+ if (state->flags & BFTW_SORT) {
+ // Keep strict breadth-first order when sorting
+ if (state->strategy != BFTW_DFS && have_files) {
+ return false;
+ }
+ } else {
+ // Block if we have no other files/dirs to visit, or no room in the cache
+ bool block = !(have_dirs || have_files) || !have_room;
+ bftw_ioq_pop(state, block);
+ }
+
+ struct bftw_file *dir = state->file = SLIST_POP(&state->dirs);
+ if (!dir) {
return false;
}
- return (state->file = SLIST_POP(&state->dirs));
+ while (dir->ioqueued) {
+ bftw_ioq_pop(state, true);
+ }
+
+ return true;
}
/** Pop a file to visit from the queue. */
@@ -872,16 +1004,22 @@ static int bftw_opendir(struct bftw_state *state) {
state->direrror = 0;
- if (bftw_build_path(state, NULL) != 0) {
- return -1;
+ struct bftw_file *file = state->file;
+ if (file->dir) {
+ state->dir = file->dir;
+ } else {
+ if (bftw_build_path(state, NULL) != 0) {
+ return -1;
+ }
+ state->dir = bftw_file_opendir(&state->cache, file, state->path);
}
- state->dir = bftw_file_opendir(&state->cache, state->file, state->path);
if (state->dir) {
- bftw_cache_pin(&state->cache, state->file);
+ bftw_cache_pin(&state->cache, file);
} else {
state->direrror = errno;
}
+
return 0;
}
@@ -988,6 +1126,8 @@ static int bftw_state_destroy(struct bftw_state *state) {
bftw_gc(state, BFTW_VISIT_NONE);
} while (bftw_pop_dir(state) || bftw_pop_file(state));
+ ioq_destroy(state->ioq);
+
bftw_cache_destroy(&state->cache);
errno = state->error;
@@ -1100,7 +1240,7 @@ static int bftw_visit(struct bftw_state *state, const char *name) {
}
bftw_save_ftwbuf(file, &state->ftwbuf);
- SLIST_APPEND(&state->dirs, file);
+ bftw_push_dir(state, file);
return 0;
case BFTW_PRUNE: