summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bftw.c154
-rw-r--r--src/bftw.h2
-rw-r--r--src/ctx.c1
-rw-r--r--src/ctx.h2
-rw-r--r--src/eval.c22
-rw-r--r--src/parse.c26
6 files changed, 199 insertions, 8 deletions
diff --git a/src/bftw.c b/src/bftw.c
index 4a49457..e711963 100644
--- a/src/bftw.c
+++ b/src/bftw.c
@@ -22,6 +22,7 @@
#include "diag.h"
#include "dir.h"
#include "dstring.h"
+#include "ioq.h"
#include "list.h"
#include "mtab.h"
#include "stat.h"
@@ -58,6 +59,8 @@ struct bftw_file {
size_t pincount;
/** An open descriptor to this file, or -1. */
int fd;
+ /** Whether this file has a pending ioq request. */
+ bool ioqueued;
/** An open directory for this file, if any. */
struct bfs_dir *dir;
@@ -264,6 +267,7 @@ static struct bftw_file *bftw_file_new(struct bftw_file *parent, const char *nam
file->refcount = 1;
file->pincount = 0;
file->fd = -1;
+ file->ioqueued = false;
file->dir = NULL;
file->type = BFS_UNKNOWN;
@@ -439,6 +443,8 @@ struct bftw_state {
/** The cache of open directories. */
struct bftw_cache cache;
+ /** The async I/O queue. */
+ struct ioq *ioq;
/** The queue of directories to read. */
struct bftw_list dirs;
/** The queue of files to visit. */
@@ -494,6 +500,25 @@ static int bftw_state_init(struct bftw_state *state, const struct bftw_args *arg
bftw_cache_init(&state->cache, args->nopenfd);
+ size_t qdepth = args->nopenfd - 1;
+ if (qdepth > 1024) {
+ qdepth = 1024;
+ }
+
+ size_t nthreads = args->nthreads;
+ if (nthreads > qdepth) {
+ nthreads = qdepth;
+ }
+
+ state->ioq = NULL;
+ if (nthreads > 0) {
+ state->ioq = ioq_create(qdepth, nthreads);
+ if (!state->ioq) {
+ dstrfree(state->path);
+ return -1;
+ }
+ }
+
SLIST_INIT(&state->dirs);
SLIST_INIT(&state->files);
SLIST_INIT(&state->batch);
@@ -846,15 +871,122 @@ static enum bftw_action bftw_call_back(struct bftw_state *state, const char *nam
}
}
+/** Push a directory onto the queue. */
+static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) {
+ bfs_assert(file->type == BFS_DIR);
+
+ struct bftw_cache *cache = &state->cache;
+
+ if (!state->ioq) {
+ goto append;
+ }
+
+ int dfd = AT_FDCWD;
+ if (file->parent) {
+ dfd = file->parent->fd;
+ if (dfd < 0) {
+ goto append;
+ }
+ bftw_cache_pin(cache, file->parent);
+ }
+
+ if (cache->capacity == 0) {
+ if (bftw_cache_pop(cache) != 0) {
+ goto unpin;
+ }
+ }
+ --cache->capacity;
+
+ if (ioq_opendir(state->ioq, dfd, file->name, file) != 0) {
+ ++cache->capacity;
+ goto unpin;
+ }
+
+ file->ioqueued = true;
+
+ if (state->flags & BFTW_SORT) {
+ goto append;
+ } else {
+ return;
+ }
+
+unpin:
+ if (file->parent) {
+ bftw_cache_unpin(cache, file->parent);
+ }
+append:
+ SLIST_APPEND(&state->dirs, file);
+}
+
+/** Pop a response from the I/O queue. */
+static int bftw_ioq_pop(struct bftw_state *state, bool block) {
+ if (!state->ioq) {
+ return -1;
+ }
+
+ struct ioq_res *res;
+ if (block) {
+ res = ioq_pop(state->ioq);
+ } else {
+ res = ioq_trypop(state->ioq);
+ }
+
+ if (!res) {
+ return -1;
+ }
+
+ struct bftw_cache *cache = &state->cache;
+ ++cache->capacity;
+
+ struct bftw_file *file = res->ptr;
+ file->ioqueued = false;
+
+ if (file->parent) {
+ bftw_cache_unpin(cache, file->parent);
+ }
+
+ if (res->dir) {
+ bftw_file_set_dir(cache, file, res->dir);
+ }
+
+ ioq_free(state->ioq, res);
+
+ if (!(state->flags & BFTW_SORT)) {
+ SLIST_PREPEND(&state->dirs, file);
+ }
+
+ return 0;
+}
+
/** Pop a directory to read from the queue. */
static bool bftw_pop_dir(struct bftw_state *state) {
bfs_assert(!state->file);
- if (state->files.head && state->strategy == BFTW_BFS) {
+ bool have_dirs = state->dirs.head;
+ bool have_files = state->files.head;
+ bool have_room = state->cache.capacity > 0;
+
+ if (state->flags & BFTW_SORT) {
+ // Keep strict breadth-first order when sorting
+ if (state->strategy != BFTW_DFS && have_files) {
+ return false;
+ }
+ } else {
+ // Block if we have no other files/dirs to visit, or no room in the cache
+ bool block = !(have_dirs || have_files) || !have_room;
+ bftw_ioq_pop(state, block);
+ }
+
+ struct bftw_file *dir = state->file = SLIST_POP(&state->dirs);
+ if (!dir) {
return false;
}
- return (state->file = SLIST_POP(&state->dirs));
+ while (dir->ioqueued) {
+ bftw_ioq_pop(state, true);
+ }
+
+ return true;
}
/** Pop a file to visit from the queue. */
@@ -872,16 +1004,22 @@ static int bftw_opendir(struct bftw_state *state) {
state->direrror = 0;
- if (bftw_build_path(state, NULL) != 0) {
- return -1;
+ struct bftw_file *file = state->file;
+ if (file->dir) {
+ state->dir = file->dir;
+ } else {
+ if (bftw_build_path(state, NULL) != 0) {
+ return -1;
+ }
+ state->dir = bftw_file_opendir(&state->cache, file, state->path);
}
- state->dir = bftw_file_opendir(&state->cache, state->file, state->path);
if (state->dir) {
- bftw_cache_pin(&state->cache, state->file);
+ bftw_cache_pin(&state->cache, file);
} else {
state->direrror = errno;
}
+
return 0;
}
@@ -988,6 +1126,8 @@ static int bftw_state_destroy(struct bftw_state *state) {
bftw_gc(state, BFTW_VISIT_NONE);
} while (bftw_pop_dir(state) || bftw_pop_file(state));
+ ioq_destroy(state->ioq);
+
bftw_cache_destroy(&state->cache);
errno = state->error;
@@ -1100,7 +1240,7 @@ static int bftw_visit(struct bftw_state *state, const char *name) {
}
bftw_save_ftwbuf(file, &state->ftwbuf);
- SLIST_APPEND(&state->dirs, file);
+ bftw_push_dir(state, file);
return 0;
case BFTW_PRUNE:
diff --git a/src/bftw.h b/src/bftw.h
index 77697ed..940532c 100644
--- a/src/bftw.h
+++ b/src/bftw.h
@@ -186,6 +186,8 @@ struct bftw_args {
void *ptr;
/** The maximum number of file descriptors to keep open. */
int nopenfd;
+ /** The maximum number of threads to use. */
+ int nthreads;
/** Flags that control bftw() behaviour. */
enum bftw_flags flags;
/** The search strategy to use. */
diff --git a/src/ctx.c b/src/ctx.c
index c4b2fb2..e8ce0e8 100644
--- a/src/ctx.c
+++ b/src/ctx.c
@@ -56,6 +56,7 @@ struct bfs_ctx *bfs_ctx_new(void) {
ctx->maxdepth = INT_MAX;
ctx->flags = BFTW_RECOVER;
ctx->strategy = BFTW_BFS;
+ ctx->threads = 0;
ctx->optlevel = 3;
ctx->debug = 0;
ctx->ignore_races = false;
diff --git a/src/ctx.h b/src/ctx.h
index 0dc9f08..2b8e8cb 100644
--- a/src/ctx.h
+++ b/src/ctx.h
@@ -68,6 +68,8 @@ struct bfs_ctx {
/** bftw() search strategy. */
enum bftw_strategy strategy;
+ /** Threads (-j). */
+ int threads;
/** Optimization level (-O). */
int optlevel;
/** Debugging flags (-D). */
diff --git a/src/eval.c b/src/eval.c
index e2c19a9..b9bce6c 100644
--- a/src/eval.c
+++ b/src/eval.c
@@ -1505,6 +1505,19 @@ static int infer_fdlimit(const struct bfs_ctx *ctx, int limit) {
return ret;
}
+static int infer_nproc(void) {
+ long nproc = sysconf(_SC_NPROCESSORS_ONLN);
+
+ if (nproc < 0) {
+ nproc = 0;
+ } else if (nproc > 8) {
+ // Not much speedup after 8 threads
+ nproc = 8;
+ }
+
+ return nproc;
+}
+
/**
* Dump the bftw() flags for -D search.
*/
@@ -1593,12 +1606,20 @@ int bfs_eval(const struct bfs_ctx *ctx) {
int fdlimit = raise_fdlimit(ctx);
fdlimit = infer_fdlimit(ctx, fdlimit);
+ int nthreads;
+ if (ctx->threads > 0) {
+ nthreads = ctx->threads - 1;
+ } else {
+ nthreads = infer_nproc();
+ }
+
struct bftw_args bftw_args = {
.paths = ctx->paths,
.npaths = darray_length(ctx->paths),
.callback = eval_callback,
.ptr = &args,
.nopenfd = fdlimit,
+ .nthreads = nthreads,
.flags = ctx->flags,
.strategy = ctx->strategy,
.mtab = bfs_ctx_mtab(ctx),
@@ -1618,6 +1639,7 @@ int bfs_eval(const struct bfs_ctx *ctx) {
fprintf(stderr, "\t.callback = eval_callback,\n");
fprintf(stderr, "\t.ptr = &args,\n");
fprintf(stderr, "\t.nopenfd = %d,\n", bftw_args.nopenfd);
+ fprintf(stderr, "\t.nthreads = %d,\n", bftw_args.nthreads);
fprintf(stderr, "\t.flags = ");
dump_bftw_flags(bftw_args.flags);
fprintf(stderr, ",\n\t.strategy = %s,\n", dump_bftw_strategy(bftw_args.strategy));
diff --git a/src/parse.c b/src/parse.c
index 59a1e7d..96def14 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -1637,6 +1637,23 @@ static struct bfs_expr *parse_inum(struct parser_state *state, int arg1, int arg
}
/**
+ * Parse -j<n>.
+ */
+static struct bfs_expr *parse_jobs(struct parser_state *state, int arg1, int arg2) {
+ struct bfs_expr *expr = parse_nullary_flag(state);
+ if (!expr) {
+ return NULL;
+ }
+
+ if (!parse_int(state, expr->argv, expr->argv[0] + 2, &state->ctx->threads, IF_INT | IF_UNSIGNED)) {
+ bfs_expr_free(expr);
+ return NULL;
+ }
+
+ return expr;
+}
+
+/**
* Parse -links N.
*/
static struct bfs_expr *parse_links(struct parser_state *state, int arg1, int arg2) {
@@ -2753,7 +2770,9 @@ static struct bfs_expr *parse_help(struct parser_state *state, int arg1, int arg
cfprintf(cout, " Enable optimization level ${bld}N${rs} (default: ${bld}3${rs})\n");
cfprintf(cout, " ${cyn}-S${rs} ${bld}bfs${rs}|${bld}dfs${rs}|${bld}ids${rs}|${bld}eds${rs}\n");
cfprintf(cout, " Use ${bld}b${rs}readth-${bld}f${rs}irst/${bld}d${rs}epth-${bld}f${rs}irst/${bld}i${rs}terative/${bld}e${rs}xponential ${bld}d${rs}eepening ${bld}s${rs}earch\n");
- cfprintf(cout, " (default: ${cyn}-S${rs} ${bld}bfs${rs})\n\n");
+ cfprintf(cout, " (default: ${cyn}-S${rs} ${bld}bfs${rs})\n");
+ cfprintf(cout, " ${cyn}-j${bld}N${rs}\n");
+ cfprintf(cout, " Search with ${bld}N${rs} threads in parallel (default: number of CPUs, up to ${bld}8${rs})\n\n");
cfprintf(cout, "${bld}Operators:${rs}\n\n");
@@ -3060,6 +3079,7 @@ static const struct table_entry parse_table[] = {
{"-ipath", T_TEST, parse_path, true},
{"-iregex", T_TEST, parse_regex, BFS_REGEX_ICASE},
{"-iwholename", T_TEST, parse_path, true},
+ {"-j", T_FLAG, parse_jobs, 0, 0, true},
{"-links", T_TEST, parse_links},
{"-lname", T_TEST, parse_lname, false},
{"-ls", T_ACTION, parse_ls},
@@ -3552,6 +3572,10 @@ void bfs_ctx_dump(const struct bfs_ctx *ctx, enum debug_flags flag) {
cfprintf(cerr, " ${cyn}-O${bld}%d${rs}", ctx->optlevel);
}
+ if (ctx->threads > 0) {
+ cfprintf(cerr, " ${cyn}-j${bld}%d${rs}", ctx->threads);
+ }
+
cfprintf(cerr, " ${cyn}-S${rs} ${bld}%s${rs}", bftw_strategy_name(ctx->strategy));
enum debug_flags debug = ctx->debug;