From c023cceb3f50d92ed565ea3f085883f86de0f3f0 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Fri, 9 Jun 2023 16:34:41 -0400 Subject: bftw: Use an I/O queue to open directories Parallelism is controlled by the new -j flag. --- docs/bfs.1 | 8 +++- src/bftw.c | 154 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- src/bftw.h | 2 + src/ctx.c | 1 + src/ctx.h | 2 + src/eval.c | 22 +++++++++ src/parse.c | 26 +++++++++- 7 files changed, 206 insertions(+), 9 deletions(-) diff --git a/docs/bfs.1 b/docs/bfs.1 index 53a9831..bc82457 100644 --- a/docs/bfs.1 +++ b/docs/bfs.1 @@ -171,12 +171,18 @@ consumes too much memory. .TP .I eds Exponential deepening search. -A compromise between breadth- and depth-first search, which searches exponentially increasing depth ranges (e.g 0-1, 1-2, 2-4, 4-8, etc.). +A compromise between breadth- and depth-first search, which searches exponentially increasing depth ranges (e.g. 0-1, 1-2, 2-4, 4-8, etc.). Provides many of the benefits of breadth-first search with depth-first's reduced memory consumption. Typically far faster than .B \-S .IR ids . .RE +.TP +\fB\-j\fIN\fR +Search with +.I N +threads in parallel (default: number of CPUs, up to +.IR 8 ). .SH OPERATORS .TP \fB( \fIexpression \fB)\fR diff --git a/src/bftw.c b/src/bftw.c index 4a49457..e711963 100644 --- a/src/bftw.c +++ b/src/bftw.c @@ -22,6 +22,7 @@ #include "diag.h" #include "dir.h" #include "dstring.h" +#include "ioq.h" #include "list.h" #include "mtab.h" #include "stat.h" @@ -58,6 +59,8 @@ struct bftw_file { size_t pincount; /** An open descriptor to this file, or -1. */ int fd; + /** Whether this file has a pending ioq request. */ + bool ioqueued; /** An open directory for this file, if any. */ struct bfs_dir *dir; @@ -264,6 +267,7 @@ static struct bftw_file *bftw_file_new(struct bftw_file *parent, const char *nam file->refcount = 1; file->pincount = 0; file->fd = -1; + file->ioqueued = false; file->dir = NULL; file->type = BFS_UNKNOWN; @@ -439,6 +443,8 @@ struct bftw_state { /** The cache of open directories. */ struct bftw_cache cache; + /** The async I/O queue. */ + struct ioq *ioq; /** The queue of directories to read. */ struct bftw_list dirs; /** The queue of files to visit. */ @@ -494,6 +500,25 @@ static int bftw_state_init(struct bftw_state *state, const struct bftw_args *arg bftw_cache_init(&state->cache, args->nopenfd); + size_t qdepth = args->nopenfd - 1; + if (qdepth > 1024) { + qdepth = 1024; + } + + size_t nthreads = args->nthreads; + if (nthreads > qdepth) { + nthreads = qdepth; + } + + state->ioq = NULL; + if (nthreads > 0) { + state->ioq = ioq_create(qdepth, nthreads); + if (!state->ioq) { + dstrfree(state->path); + return -1; + } + } + SLIST_INIT(&state->dirs); SLIST_INIT(&state->files); SLIST_INIT(&state->batch); @@ -846,15 +871,122 @@ static enum bftw_action bftw_call_back(struct bftw_state *state, const char *nam } } +/** Push a directory onto the queue. */ +static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) { + bfs_assert(file->type == BFS_DIR); + + struct bftw_cache *cache = &state->cache; + + if (!state->ioq) { + goto append; + } + + int dfd = AT_FDCWD; + if (file->parent) { + dfd = file->parent->fd; + if (dfd < 0) { + goto append; + } + bftw_cache_pin(cache, file->parent); + } + + if (cache->capacity == 0) { + if (bftw_cache_pop(cache) != 0) { + goto unpin; + } + } + --cache->capacity; + + if (ioq_opendir(state->ioq, dfd, file->name, file) != 0) { + ++cache->capacity; + goto unpin; + } + + file->ioqueued = true; + + if (state->flags & BFTW_SORT) { + goto append; + } else { + return; + } + +unpin: + if (file->parent) { + bftw_cache_unpin(cache, file->parent); + } +append: + SLIST_APPEND(&state->dirs, file); +} + +/** Pop a response from the I/O queue. */ +static int bftw_ioq_pop(struct bftw_state *state, bool block) { + if (!state->ioq) { + return -1; + } + + struct ioq_res *res; + if (block) { + res = ioq_pop(state->ioq); + } else { + res = ioq_trypop(state->ioq); + } + + if (!res) { + return -1; + } + + struct bftw_cache *cache = &state->cache; + ++cache->capacity; + + struct bftw_file *file = res->ptr; + file->ioqueued = false; + + if (file->parent) { + bftw_cache_unpin(cache, file->parent); + } + + if (res->dir) { + bftw_file_set_dir(cache, file, res->dir); + } + + ioq_free(state->ioq, res); + + if (!(state->flags & BFTW_SORT)) { + SLIST_PREPEND(&state->dirs, file); + } + + return 0; +} + /** Pop a directory to read from the queue. */ static bool bftw_pop_dir(struct bftw_state *state) { bfs_assert(!state->file); - if (state->files.head && state->strategy == BFTW_BFS) { + bool have_dirs = state->dirs.head; + bool have_files = state->files.head; + bool have_room = state->cache.capacity > 0; + + if (state->flags & BFTW_SORT) { + // Keep strict breadth-first order when sorting + if (state->strategy != BFTW_DFS && have_files) { + return false; + } + } else { + // Block if we have no other files/dirs to visit, or no room in the cache + bool block = !(have_dirs || have_files) || !have_room; + bftw_ioq_pop(state, block); + } + + struct bftw_file *dir = state->file = SLIST_POP(&state->dirs); + if (!dir) { return false; } - return (state->file = SLIST_POP(&state->dirs)); + while (dir->ioqueued) { + bftw_ioq_pop(state, true); + } + + return true; } /** Pop a file to visit from the queue. */ @@ -872,16 +1004,22 @@ static int bftw_opendir(struct bftw_state *state) { state->direrror = 0; - if (bftw_build_path(state, NULL) != 0) { - return -1; + struct bftw_file *file = state->file; + if (file->dir) { + state->dir = file->dir; + } else { + if (bftw_build_path(state, NULL) != 0) { + return -1; + } + state->dir = bftw_file_opendir(&state->cache, file, state->path); } - state->dir = bftw_file_opendir(&state->cache, state->file, state->path); if (state->dir) { - bftw_cache_pin(&state->cache, state->file); + bftw_cache_pin(&state->cache, file); } else { state->direrror = errno; } + return 0; } @@ -988,6 +1126,8 @@ static int bftw_state_destroy(struct bftw_state *state) { bftw_gc(state, BFTW_VISIT_NONE); } while (bftw_pop_dir(state) || bftw_pop_file(state)); + ioq_destroy(state->ioq); + bftw_cache_destroy(&state->cache); errno = state->error; @@ -1100,7 +1240,7 @@ static int bftw_visit(struct bftw_state *state, const char *name) { } bftw_save_ftwbuf(file, &state->ftwbuf); - SLIST_APPEND(&state->dirs, file); + bftw_push_dir(state, file); return 0; case BFTW_PRUNE: diff --git a/src/bftw.h b/src/bftw.h index 77697ed..940532c 100644 --- a/src/bftw.h +++ b/src/bftw.h @@ -186,6 +186,8 @@ struct bftw_args { void *ptr; /** The maximum number of file descriptors to keep open. */ int nopenfd; + /** The maximum number of threads to use. */ + int nthreads; /** Flags that control bftw() behaviour. */ enum bftw_flags flags; /** The search strategy to use. */ diff --git a/src/ctx.c b/src/ctx.c index c4b2fb2..e8ce0e8 100644 --- a/src/ctx.c +++ b/src/ctx.c @@ -56,6 +56,7 @@ struct bfs_ctx *bfs_ctx_new(void) { ctx->maxdepth = INT_MAX; ctx->flags = BFTW_RECOVER; ctx->strategy = BFTW_BFS; + ctx->threads = 0; ctx->optlevel = 3; ctx->debug = 0; ctx->ignore_races = false; diff --git a/src/ctx.h b/src/ctx.h index 0dc9f08..2b8e8cb 100644 --- a/src/ctx.h +++ b/src/ctx.h @@ -68,6 +68,8 @@ struct bfs_ctx { /** bftw() search strategy. */ enum bftw_strategy strategy; + /** Threads (-j). */ + int threads; /** Optimization level (-O). */ int optlevel; /** Debugging flags (-D). */ diff --git a/src/eval.c b/src/eval.c index e2c19a9..b9bce6c 100644 --- a/src/eval.c +++ b/src/eval.c @@ -1505,6 +1505,19 @@ static int infer_fdlimit(const struct bfs_ctx *ctx, int limit) { return ret; } +static int infer_nproc(void) { + long nproc = sysconf(_SC_NPROCESSORS_ONLN); + + if (nproc < 0) { + nproc = 0; + } else if (nproc > 8) { + // Not much speedup after 8 threads + nproc = 8; + } + + return nproc; +} + /** * Dump the bftw() flags for -D search. */ @@ -1593,12 +1606,20 @@ int bfs_eval(const struct bfs_ctx *ctx) { int fdlimit = raise_fdlimit(ctx); fdlimit = infer_fdlimit(ctx, fdlimit); + int nthreads; + if (ctx->threads > 0) { + nthreads = ctx->threads - 1; + } else { + nthreads = infer_nproc(); + } + struct bftw_args bftw_args = { .paths = ctx->paths, .npaths = darray_length(ctx->paths), .callback = eval_callback, .ptr = &args, .nopenfd = fdlimit, + .nthreads = nthreads, .flags = ctx->flags, .strategy = ctx->strategy, .mtab = bfs_ctx_mtab(ctx), @@ -1618,6 +1639,7 @@ int bfs_eval(const struct bfs_ctx *ctx) { fprintf(stderr, "\t.callback = eval_callback,\n"); fprintf(stderr, "\t.ptr = &args,\n"); fprintf(stderr, "\t.nopenfd = %d,\n", bftw_args.nopenfd); + fprintf(stderr, "\t.nthreads = %d,\n", bftw_args.nthreads); fprintf(stderr, "\t.flags = "); dump_bftw_flags(bftw_args.flags); fprintf(stderr, ",\n\t.strategy = %s,\n", dump_bftw_strategy(bftw_args.strategy)); diff --git a/src/parse.c b/src/parse.c index 59a1e7d..96def14 100644 --- a/src/parse.c +++ b/src/parse.c @@ -1636,6 +1636,23 @@ static struct bfs_expr *parse_inum(struct parser_state *state, int arg1, int arg return parse_test_icmp(state, eval_inum); } +/** + * Parse -j. + */ +static struct bfs_expr *parse_jobs(struct parser_state *state, int arg1, int arg2) { + struct bfs_expr *expr = parse_nullary_flag(state); + if (!expr) { + return NULL; + } + + if (!parse_int(state, expr->argv, expr->argv[0] + 2, &state->ctx->threads, IF_INT | IF_UNSIGNED)) { + bfs_expr_free(expr); + return NULL; + } + + return expr; +} + /** * Parse -links N. */ @@ -2753,7 +2770,9 @@ static struct bfs_expr *parse_help(struct parser_state *state, int arg1, int arg cfprintf(cout, " Enable optimization level ${bld}N${rs} (default: ${bld}3${rs})\n"); cfprintf(cout, " ${cyn}-S${rs} ${bld}bfs${rs}|${bld}dfs${rs}|${bld}ids${rs}|${bld}eds${rs}\n"); cfprintf(cout, " Use ${bld}b${rs}readth-${bld}f${rs}irst/${bld}d${rs}epth-${bld}f${rs}irst/${bld}i${rs}terative/${bld}e${rs}xponential ${bld}d${rs}eepening ${bld}s${rs}earch\n"); - cfprintf(cout, " (default: ${cyn}-S${rs} ${bld}bfs${rs})\n\n"); + cfprintf(cout, " (default: ${cyn}-S${rs} ${bld}bfs${rs})\n"); + cfprintf(cout, " ${cyn}-j${bld}N${rs}\n"); + cfprintf(cout, " Search with ${bld}N${rs} threads in parallel (default: number of CPUs, up to ${bld}8${rs})\n\n"); cfprintf(cout, "${bld}Operators:${rs}\n\n"); @@ -3060,6 +3079,7 @@ static const struct table_entry parse_table[] = { {"-ipath", T_TEST, parse_path, true}, {"-iregex", T_TEST, parse_regex, BFS_REGEX_ICASE}, {"-iwholename", T_TEST, parse_path, true}, + {"-j", T_FLAG, parse_jobs, 0, 0, true}, {"-links", T_TEST, parse_links}, {"-lname", T_TEST, parse_lname, false}, {"-ls", T_ACTION, parse_ls}, @@ -3552,6 +3572,10 @@ void bfs_ctx_dump(const struct bfs_ctx *ctx, enum debug_flags flag) { cfprintf(cerr, " ${cyn}-O${bld}%d${rs}", ctx->optlevel); } + if (ctx->threads > 0) { + cfprintf(cerr, " ${cyn}-j${bld}%d${rs}", ctx->threads); + } + cfprintf(cerr, " ${cyn}-S${rs} ${bld}%s${rs}", bftw_strategy_name(ctx->strategy)); enum debug_flags debug = ctx->debug; -- cgit v1.2.3