From 43ec427535e2d21a3d8ec36d98889f1bc0515938 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Fri, 26 Jan 2024 13:32:26 -0500 Subject: bftw: New bftw_queue abstraction --- src/bftw.c | 366 ++++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 292 insertions(+), 74 deletions(-) (limited to 'src/bftw.c') diff --git a/src/bftw.c b/src/bftw.c index 02dd051..ce08dba 100644 --- a/src/bftw.c +++ b/src/bftw.c @@ -9,6 +9,8 @@ * * - struct bftw_list: A linked list of bftw_file's. * + * - struct bftw_queue: A multi-stage queue of bftw_file's. + * * - struct bftw_cache: An LRU list of bftw_file's with open file descriptors, * used for openat() to minimize the amount of path re-traversals. * @@ -125,7 +127,7 @@ struct bftw_file { /** The next file to open/close/visit. */ struct bftw_file *next; /** The next directory to read. */ - struct { struct bftw_file *next; } to_read; + struct { struct bftw_file *next; } ready; /** LRU list node. */ struct { @@ -170,6 +172,254 @@ struct bftw_list { struct bftw_file **tail; }; +/** + * bftw_queue flags. + */ +enum bftw_qflags { + /** Track the sync/async service balance. */ + BFTW_QBALANCE = 1 << 0, + /** Buffer files before adding them to the queue. */ + BFTW_QBUFFER = 1 << 1, + /** Use LIFO (stack/DFS) ordering. */ + BFTW_QLIFO = 1 << 2, + /** Maintain a strict order. */ + BFTW_QORDER = 1 << 3, +}; + +/** + * A queue of bftw_file's that may be serviced asynchronously. + * + * A bftw_queue comprises three linked lists each tracking different stages. + * When BFTW_QBUFFER is set, files are initially pushed to the buffer: + * + * ╔═══╗ ╔═══╦═══╗ + * buffer: ║ 𝘩 ║ ║ 𝘩 ║ 𝘪 ║ + * ╠═══╬═══╦═══╗ ╠═══╬═══╬═══╗ + * waiting: ║ e ║ f ║ g ║ → ║ e ║ f ║ g ║ + * ╠═══╬═══╬═══╬═══╗ ╠═══╬═══╬═══╬═══╗ + * ready: ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║ ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║ + * ╚═══╩═══╩═══╩═══╝ ╚═══╩═══╩═══╩═══╝ + * + * When bftw_queue_flush() is called, the files in the buffer are appended to + * the waiting list (or prepended, if BFTW_QLIFO is set): + * + * ╔═╗ + * buffer: ║ ║ + * ╠═╩═╦═══╦═══╦═══╦═══╗ + * waiting: ║ e ║ f ║ g ║ h ║ i ║ + * ╠═══╬═══╬═══╬═══╬═══╝ + * ready: ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║ + * ╚═══╩═══╩═══╩═══╝ + * + * Using the buffer gives a more natural ordering for BFTW_QLIFO, and allows + * files to be sorted before adding them to the waiting list. If BFTW_QBUFFER + * is not set, files are pushed directly to the waiting list instead. + * + * Files on the waiting list are waiting to be "serviced" asynchronously by the + * ioq (for example, by an ioq_opendir() or ioq_stat() call). While they are + * being serviced, they are detached from the queue by bftw_queue_detach() and + * are not tracked by the queue at all: + * + * ╔═╗ + * buffer: ║ ║ + * ╠═╩═╦═══╦═══╗ ⎛ ┌───┬───┐ ⎞ + * waiting: ║ g ║ h ║ i ║ ⎜ ioq: │ 𝓮 │ 𝓯 │ ⎟ + * ╠═══╬═══╬═══╬═══╗ ⎝ └───┴───┘ ⎠ + * ready: ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║ + * ╚═══╩═══╩═══╩═══╝ + * + * When their async service is complete, files are reattached to the queue by + * bftw_queue_attach(), this time on the ready list: + * + * ╔═╗ + * buffer: ║ ║ + * ╠═╩═╦═══╦═══╗ ⎛ ┌───┐ ⎞ + * waiting: ║ g ║ h ║ i ║ ⎜ ioq: │ 𝓮 │ ⎟ + * ╠═══╬═══╬═══╬═══╦═══╗ ⎝ └───┘ ⎠ + * ready: ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║ 𝕗 ║ + * ╚═══╩═══╩═══╩═══╩═══╝ + * + * Files are added to the ready list in the order they are finished by the ioq. + * bftw_queue_pop() pops a file from the ready list if possible. Otherwise, it + * pops from the waiting list, and the file must be serviced synchronously. + * + * However, if BFTW_QORDER is set, files must be popped in the exact order they + * are added to the waiting list (to maintain sorted order). In this case, + * files are added to the waiting and ready lists at the same time. The + * file->ioqueued flag is set while it is in-service, so that bftw() can wait + * for it to be truly ready before using it. + * + * ╔═╗ + * buffer: ║ ║ + * ╠═╩═╦═══╦═══╗ ⎛ ┌───┐ ⎞ + * waiting: ║ g ║ h ║ i ║ ⎜ ioq: │ 𝓮 │ ⎟ + * ╠═══╬═══╬═══╬═══╦═══╦═══╦═══╦═══╦═══╗ ⎝ └───┘ ⎠ + * ready: ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║ 𝓮 ║ 𝕗 ║ g ║ h ║ i ║ + * ╚═══╩═══╩═══╩═══╩═══╩═══╩═══╩═══╩═══╝ + * + * If BFTW_QBALANCE is set, queue->imbalance tracks the delta between async + * service (negative) and synchronous service (positive). The queue is + * considered "balanced" when this number is non-negative. Only a balanced + * queue will perform any async service, ensuring work is fairly distributed + * between the main thread and the ioq. + * + * BFTW_QBALANCE is only set for single-threaded ioqs. When an ioq has multiple + * threads, it is faster to wait for the ioq to complete an operation than it is + * to perform it on the main thread. + */ +struct bftw_queue { + /** Queue flags. */ + enum bftw_qflags flags; + /** A buffer of files to be enqueued together. */ + struct bftw_list buffer; + /** A list of files which are waiting to be serviced. */ + struct bftw_list waiting; + /** A list of already-serviced files. */ + struct bftw_list ready; + /** Tracks the imbalance between synchronous and async service. */ + unsigned long imbalance; +}; + +/** Initialize a queue. */ +static void bftw_queue_init(struct bftw_queue *queue, enum bftw_qflags flags) { + queue->flags = flags; + SLIST_INIT(&queue->buffer); + SLIST_INIT(&queue->waiting); + SLIST_INIT(&queue->ready); + queue->imbalance = 0; +} + +/** Add a file to the queue. */ +static void bftw_queue_push(struct bftw_queue *queue, struct bftw_file *file) { + if (queue->flags & BFTW_QBUFFER) { + SLIST_APPEND(&queue->buffer, file); + } else if (queue->flags & BFTW_QLIFO) { + SLIST_PREPEND(&queue->waiting, file); + if (queue->flags & BFTW_QORDER) { + SLIST_PREPEND(&queue->ready, file, ready); + } + } else { + SLIST_APPEND(&queue->waiting, file); + if (queue->flags & BFTW_QORDER) { + SLIST_APPEND(&queue->ready, file, ready); + } + } +} + +/** Add any buffered files to the queue. */ +static void bftw_queue_flush(struct bftw_queue *queue) { + if (!(queue->flags & BFTW_QBUFFER)) { + return; + } + + if (queue->flags & BFTW_QORDER) { + // When sorting, add files to the ready list at the same time + // (and in the same order) as they are added to the waiting list + struct bftw_file **cursor = (queue->flags & BFTW_QLIFO) + ? &queue->ready.head + : queue->ready.tail; + for_slist (struct bftw_file, file, &queue->buffer) { + cursor = SLIST_INSERT(&queue->ready, cursor, file, ready); + } + } + + if (queue->flags & BFTW_QLIFO) { + SLIST_EXTEND(&queue->buffer, &queue->waiting); + } + + SLIST_EXTEND(&queue->waiting, &queue->buffer); +} + +/** Update the queue imbalance. */ +static void bftw_queue_balance(struct bftw_queue *queue, long delta) { + queue->imbalance += delta; +} + +/** Check if the queue is properly balanced for async work. */ +static bool bftw_queue_balanced(const struct bftw_queue *queue) { + if (queue->flags & BFTW_QBALANCE) { + return (long)queue->imbalance >= 0; + } else { + return true; + } +} + +/** Detatch the next waiting file to service it asynchronously. */ +static void bftw_queue_detach(struct bftw_queue *queue, struct bftw_file *file) { + if (file == SLIST_HEAD(&queue->buffer)) { + // To maintain order, we can't detach any files until they're + // added to the waiting/ready lists + bfs_assert(!(queue->flags & BFTW_QORDER)); + SLIST_POP(&queue->buffer); + } else if (file == SLIST_HEAD(&queue->waiting)) { + SLIST_POP(&queue->waiting); + } else { + bfs_bug("Detached file was not buffered or waiting"); + } + + file->ioqueued = true; + bftw_queue_balance(queue, -1); +} + +/** Reattach a serviced file to the queue. */ +static void bftw_queue_attach(struct bftw_queue *queue, struct bftw_file *file) { + file->ioqueued = false; + + if (!(queue->flags & BFTW_QORDER)) { + SLIST_APPEND(&queue->ready, file, ready); + } +} + +/** Get the next waiting file. */ +static struct bftw_file *bftw_queue_waiting(const struct bftw_queue *queue) { + if (!(queue->flags & BFTW_QBUFFER)) { + return SLIST_HEAD(&queue->waiting); + } + + if (queue->flags & BFTW_QORDER) { + // Don't detach files until they're on the waiting/ready lists + return SLIST_HEAD(&queue->waiting); + } + + const struct bftw_list *prefix = &queue->waiting; + const struct bftw_list *suffix = &queue->buffer; + if (queue->flags & BFTW_QLIFO) { + prefix = &queue->buffer; + suffix = &queue->waiting; + } + + struct bftw_file *file = SLIST_HEAD(prefix); + if (!file) { + file = SLIST_HEAD(suffix); + } + return file; +} + +/** Get the next ready file. */ +static struct bftw_file *bftw_queue_ready(const struct bftw_queue *queue) { + return SLIST_HEAD(&queue->ready); +} + +/** Pop a file from the queue. */ +static struct bftw_file *bftw_queue_pop(struct bftw_queue *queue) { + // Don't pop until we've had a chance to sort the buffer + bfs_assert(SLIST_EMPTY(&queue->buffer)); + + struct bftw_file *file = SLIST_POP(&queue->ready, ready); + + if (!file || file == SLIST_HEAD(&queue->waiting)) { + // If no files are ready, try the waiting list. Or, if + // BFTW_QORDER is set, we may need to pop from both lists. + file = SLIST_POP(&queue->waiting); + if (file) { + // This file will be serviced synchronously + bftw_queue_balance(queue, +1); + } + } + + return file; +} + /** * A cache of open directories. */ @@ -354,7 +604,7 @@ static struct bftw_file *bftw_file_new(struct bftw_cache *cache, struct bftw_fil } SLIST_ITEM_INIT(file); - SLIST_ITEM_INIT(file, to_read); + SLIST_ITEM_INIT(file, ready); LIST_ITEM_INIT(file, lru); file->refcount = 1; @@ -430,17 +680,11 @@ struct bftw_state { struct ioq *ioq; /** The number of I/O threads. */ size_t nthreads; - /** Tracks the imbalance between main thread and background I/O. */ - long imbalance; - - /** A batch of directories to open. */ - struct bftw_list dir_batch; - /** The queue of directories to open. */ - struct bftw_list to_open; - /** The queue of directories to read. */ - struct bftw_list to_read; + /** The queue of unpinned directories to unwrap. */ struct bftw_list to_close; + /** The queue of directories to open/read. */ + struct bftw_queue dirq; /** A batch of files to enqueue. */ struct bftw_list file_batch; @@ -543,13 +787,22 @@ static int bftw_state_init(struct bftw_state *state, const struct bftw_args *arg state->dir_flags |= BFS_DIR_WHITEOUTS; } - state->imbalance = 0; - - SLIST_INIT(&state->dir_batch); - SLIST_INIT(&state->to_open); - SLIST_INIT(&state->to_read); SLIST_INIT(&state->to_close); + enum bftw_qflags qflags = 0; + if (state->strategy != BFTW_BFS && !(state->flags & BFTW_BUFFER)) { + // For a depth-first, unbuffered search, queue directories in + // LIFO order + qflags |= BFTW_QBUFFER | BFTW_QLIFO; + } + if (state->flags & BFTW_SORT) { + qflags |= BFTW_QORDER; + } + if (nthreads == 1) { + qflags |= BFTW_QBALANCE; + } + bftw_queue_init(&state->dirq, qflags); + SLIST_INIT(&state->file_batch); SLIST_INIT(&state->to_visit); @@ -575,12 +828,6 @@ static void bftw_unpin_dir(struct bftw_state *state, struct bftw_file *file, boo } } -/** Adjust the I/O queue balance. */ -static void bftw_ioq_balance(struct bftw_state *state, long delta) { - // Avoid signed overflow - state->imbalance = (unsigned long)state->imbalance + (unsigned long)delta; -} - /** Pop a response from the I/O queue. */ static int bftw_ioq_pop(struct bftw_state *state, bool block) { struct ioq *ioq = state->ioq; @@ -612,7 +859,6 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) { case IOQ_OPENDIR: file = ent->ptr; - file->ioqueued = false; ++cache->capacity; parent = file->parent; @@ -627,9 +873,7 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) { bftw_freedir(cache, dir); } - if (!(state->flags & BFTW_SORT)) { - SLIST_APPEND(&state->to_read, file, to_read); - } + bftw_queue_attach(&state->dirq, file); break; case IOQ_STAT: @@ -647,19 +891,14 @@ static int bftw_ioq_reserve(struct bftw_state *state) { return -1; } - // With only one background thread, we should balance I/O between it and - // the main thread. With more than one background thread, it's faster - // to wait on background I/O than it is to do it on the main thread. - bool balance = state->nthreads <= 1; - if (balance && state->imbalance < 0) { - return -1; - } - if (ioq_capacity(ioq) > 0) { return 0; } - if (bftw_ioq_pop(state, !balance) < 0) { + // With more than one background thread, it's faster to wait on + // background I/O than it is to do it on the main thread + bool block = state->nthreads > 1; + if (bftw_ioq_pop(state, block) < 0) { return -1; } @@ -892,9 +1131,7 @@ static int bftw_ioq_opendir(struct bftw_state *state, struct bftw_file *file) { goto free; } - file->ioqueued = true; --cache->capacity; - bftw_ioq_balance(state, -1); return 0; free: @@ -908,39 +1145,26 @@ fail: } /** Open a batch of directories asynchronously. */ -static void bftw_ioq_opendirs(struct bftw_state *state, struct bftw_list *queue) { - for_slist (struct bftw_file, dir, queue) { - if (bftw_ioq_opendir(state, dir) != 0) { +static void bftw_ioq_opendirs(struct bftw_state *state) { + while (bftw_queue_balanced(&state->dirq)) { + struct bftw_file *dir = bftw_queue_waiting(&state->dirq); + if (!dir) { + break; + } + + if (bftw_ioq_opendir(state, dir) == 0) { + bftw_queue_detach(&state->dirq, dir); + } else { break; } - SLIST_POP(queue); } } /** Push a directory onto the queue. */ static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) { bfs_assert(file->type == BFS_DIR); - - struct bftw_list *queue; - if (state->strategy == BFTW_BFS || (state->flags & BFTW_BUFFER)) { - // In breadth-first mode, or if we're already buffering files, - // we can push directly to the to_open queue - queue = &state->to_open; - } else { - // For a depth-first, unbuffered search, add directories to a - // batch, then push the patch to the front of the queue - queue = &state->dir_batch; - } - - SLIST_APPEND(queue, file); - - if (state->flags & BFTW_SORT) { - // When sorting, directories are kept in order on the to_read - // list; otherwise, they are only added once they are open - SLIST_APPEND(&state->to_read, file, to_read); - } - - bftw_ioq_opendirs(state, queue); + bftw_queue_push(&state->dirq, file); + bftw_ioq_opendirs(state); } /** Pop a directory to read from the queue. */ @@ -956,9 +1180,9 @@ static bool bftw_pop_dir(struct bftw_state *state) { return false; } } else { - while (SLIST_EMPTY(&state->to_read)) { + while (!bftw_queue_ready(&state->dirq)) { // Block if we have no other files/dirs to visit, or no room in the cache - bool have_dirs = !SLIST_EMPTY(&state->to_open); + bool have_dirs = bftw_queue_waiting(&state->dirq); bool have_room = cache->capacity > 0 && cache->dirlimit > 0; bool block = !(have_dirs || have_files) || !have_room; @@ -968,10 +1192,7 @@ static bool bftw_pop_dir(struct bftw_state *state) { } } - struct bftw_file *file = SLIST_POP(&state->to_read, to_read); - if (!file || file == state->to_open.head) { - file = SLIST_POP(&state->to_open); - } + struct bftw_file *file = bftw_queue_pop(&state->dirq); if (!file) { return false; } @@ -1051,8 +1272,6 @@ static struct bfs_dir *bftw_file_opendir(struct bftw_state *state, struct bftw_f return NULL; } - bftw_ioq_balance(state, +1); - if (bfs_opendir(dir, fd, NULL, state->dir_flags) != 0) { bftw_freedir(cache, dir); return NULL; @@ -1428,14 +1647,13 @@ static void bftw_batch_finish(struct bftw_state *state) { } if (state->strategy != BFTW_BFS) { - SLIST_EXTEND(&state->dir_batch, &state->to_open); SLIST_EXTEND(&state->file_batch, &state->to_visit); } - SLIST_EXTEND(&state->to_open, &state->dir_batch); SLIST_EXTEND(&state->to_visit, &state->file_batch); - bftw_ioq_opendirs(state, &state->to_open); + bftw_queue_flush(&state->dirq); + bftw_ioq_opendirs(state); } /** Close the current directory. */ @@ -1525,7 +1743,7 @@ static int bftw_state_destroy(struct bftw_state *state) { state->ioq = NULL; } - SLIST_EXTEND(&state->to_open, &state->dir_batch); + bftw_queue_flush(&state->dirq); SLIST_EXTEND(&state->to_visit, &state->file_batch); do { bftw_gc(state, BFTW_VISIT_NONE); -- cgit v1.2.3