diff options
author | Tavian Barnes <tavianator@tavianator.com> | 2023-06-09 16:34:41 -0400 |
---|---|---|
committer | Tavian Barnes <tavianator@tavianator.com> | 2023-06-13 11:06:47 -0400 |
commit | c023cceb3f50d92ed565ea3f085883f86de0f3f0 (patch) | |
tree | afa4b05301e7c339a78e65d4c7a0c3443f553a93 /src/bftw.c | |
parent | 0cca5b64e1355af5d2c3d935da4e110982273703 (diff) | |
download | bfs-c023cceb3f50d92ed565ea3f085883f86de0f3f0.tar.xz |
bftw: Use an I/O queue to open directories
Parallelism is controlled by the new -j flag.
Diffstat (limited to 'src/bftw.c')
-rw-r--r-- | src/bftw.c | 154 |
1 files changed, 147 insertions, 7 deletions
@@ -22,6 +22,7 @@ #include "diag.h" #include "dir.h" #include "dstring.h" +#include "ioq.h" #include "list.h" #include "mtab.h" #include "stat.h" @@ -58,6 +59,8 @@ struct bftw_file { size_t pincount; /** An open descriptor to this file, or -1. */ int fd; + /** Whether this file has a pending ioq request. */ + bool ioqueued; /** An open directory for this file, if any. */ struct bfs_dir *dir; @@ -264,6 +267,7 @@ static struct bftw_file *bftw_file_new(struct bftw_file *parent, const char *nam file->refcount = 1; file->pincount = 0; file->fd = -1; + file->ioqueued = false; file->dir = NULL; file->type = BFS_UNKNOWN; @@ -439,6 +443,8 @@ struct bftw_state { /** The cache of open directories. */ struct bftw_cache cache; + /** The async I/O queue. */ + struct ioq *ioq; /** The queue of directories to read. */ struct bftw_list dirs; /** The queue of files to visit. */ @@ -494,6 +500,25 @@ static int bftw_state_init(struct bftw_state *state, const struct bftw_args *arg bftw_cache_init(&state->cache, args->nopenfd); + size_t qdepth = args->nopenfd - 1; + if (qdepth > 1024) { + qdepth = 1024; + } + + size_t nthreads = args->nthreads; + if (nthreads > qdepth) { + nthreads = qdepth; + } + + state->ioq = NULL; + if (nthreads > 0) { + state->ioq = ioq_create(qdepth, nthreads); + if (!state->ioq) { + dstrfree(state->path); + return -1; + } + } + SLIST_INIT(&state->dirs); SLIST_INIT(&state->files); SLIST_INIT(&state->batch); @@ -846,15 +871,122 @@ static enum bftw_action bftw_call_back(struct bftw_state *state, const char *nam } } +/** Push a directory onto the queue. */ +static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) { + bfs_assert(file->type == BFS_DIR); + + struct bftw_cache *cache = &state->cache; + + if (!state->ioq) { + goto append; + } + + int dfd = AT_FDCWD; + if (file->parent) { + dfd = file->parent->fd; + if (dfd < 0) { + goto append; + } + bftw_cache_pin(cache, file->parent); + } + + if (cache->capacity == 0) { + if (bftw_cache_pop(cache) != 0) { + goto unpin; + } + } + --cache->capacity; + + if (ioq_opendir(state->ioq, dfd, file->name, file) != 0) { + ++cache->capacity; + goto unpin; + } + + file->ioqueued = true; + + if (state->flags & BFTW_SORT) { + goto append; + } else { + return; + } + +unpin: + if (file->parent) { + bftw_cache_unpin(cache, file->parent); + } +append: + SLIST_APPEND(&state->dirs, file); +} + +/** Pop a response from the I/O queue. */ +static int bftw_ioq_pop(struct bftw_state *state, bool block) { + if (!state->ioq) { + return -1; + } + + struct ioq_res *res; + if (block) { + res = ioq_pop(state->ioq); + } else { + res = ioq_trypop(state->ioq); + } + + if (!res) { + return -1; + } + + struct bftw_cache *cache = &state->cache; + ++cache->capacity; + + struct bftw_file *file = res->ptr; + file->ioqueued = false; + + if (file->parent) { + bftw_cache_unpin(cache, file->parent); + } + + if (res->dir) { + bftw_file_set_dir(cache, file, res->dir); + } + + ioq_free(state->ioq, res); + + if (!(state->flags & BFTW_SORT)) { + SLIST_PREPEND(&state->dirs, file); + } + + return 0; +} + /** Pop a directory to read from the queue. */ static bool bftw_pop_dir(struct bftw_state *state) { bfs_assert(!state->file); - if (state->files.head && state->strategy == BFTW_BFS) { + bool have_dirs = state->dirs.head; + bool have_files = state->files.head; + bool have_room = state->cache.capacity > 0; + + if (state->flags & BFTW_SORT) { + // Keep strict breadth-first order when sorting + if (state->strategy != BFTW_DFS && have_files) { + return false; + } + } else { + // Block if we have no other files/dirs to visit, or no room in the cache + bool block = !(have_dirs || have_files) || !have_room; + bftw_ioq_pop(state, block); + } + + struct bftw_file *dir = state->file = SLIST_POP(&state->dirs); + if (!dir) { return false; } - return (state->file = SLIST_POP(&state->dirs)); + while (dir->ioqueued) { + bftw_ioq_pop(state, true); + } + + return true; } /** Pop a file to visit from the queue. */ @@ -872,16 +1004,22 @@ static int bftw_opendir(struct bftw_state *state) { state->direrror = 0; - if (bftw_build_path(state, NULL) != 0) { - return -1; + struct bftw_file *file = state->file; + if (file->dir) { + state->dir = file->dir; + } else { + if (bftw_build_path(state, NULL) != 0) { + return -1; + } + state->dir = bftw_file_opendir(&state->cache, file, state->path); } - state->dir = bftw_file_opendir(&state->cache, state->file, state->path); if (state->dir) { - bftw_cache_pin(&state->cache, state->file); + bftw_cache_pin(&state->cache, file); } else { state->direrror = errno; } + return 0; } @@ -988,6 +1126,8 @@ static int bftw_state_destroy(struct bftw_state *state) { bftw_gc(state, BFTW_VISIT_NONE); } while (bftw_pop_dir(state) || bftw_pop_file(state)); + ioq_destroy(state->ioq); + bftw_cache_destroy(&state->cache); errno = state->error; @@ -1100,7 +1240,7 @@ static int bftw_visit(struct bftw_state *state, const char *name) { } bftw_save_ftwbuf(file, &state->ftwbuf); - SLIST_APPEND(&state->dirs, file); + bftw_push_dir(state, file); return 0; case BFTW_PRUNE: |