From 0acff566213fdddbc8f4561887aced121f82dc26 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Wed, 23 Apr 2014 18:12:53 -0400 Subject: future: Add a race-free way to examine a partial computation. This allows safe OpenGL previews, for example. dmnsn_future* learned the dmnsn_future_{pause,resume}() functions which cause all worker threads to block. render.test now survives Helgrind with no errors. --- libdimension/bench/future.c | 5 +- libdimension/dimension/future.h | 15 +++++- libdimension/future-internal.h | 20 +++++++- libdimension/future.c | 104 ++++++++++++++++++++++++++++++++++++++-- libdimension/prtree.c | 4 +- libdimension/ray_trace.c | 5 +- libdimension/tests/render.c | 17 ++++--- libdimension/threads.c | 37 ++++++++++---- libdimension/threads.h | 30 +++++++++--- 9 files changed, 202 insertions(+), 35 deletions(-) (limited to 'libdimension') diff --git a/libdimension/bench/future.c b/libdimension/bench/future.c index 5937c3c..0e3acc6 100644 --- a/libdimension/bench/future.c +++ b/libdimension/bench/future.c @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (C) 2013 Tavian Barnes * + * Copyright (C) 2013-2014 Tavian Barnes * * * * This file is part of The Dimension Benchmark Suite. * * * @@ -58,7 +58,8 @@ dmnsn_bench_thread(void *ptr) dmnsn_unlock_mutex(&future->mutex); /* Now run a bunch of increments concurrently. */ - return dmnsn_execute_concurrently(&dmnsn_bench_future, future, nthreads); + return dmnsn_execute_concurrently(future, &dmnsn_bench_future, future, + nthreads); } int diff --git a/libdimension/dimension/future.h b/libdimension/dimension/future.h index f0693b9..9f1f14a 100644 --- a/libdimension/dimension/future.h +++ b/libdimension/dimension/future.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (C) 2009-2011 Tavian Barnes * + * Copyright (C) 2009-2014 Tavian Barnes * * * * This file is part of The Dimension Library. * * * @@ -57,3 +57,16 @@ double dmnsn_future_progress(const dmnsn_future *future); * @param[in] progress The progress value to wait for. */ void dmnsn_future_wait(const dmnsn_future *future, double progress); + +/** + * Pause all threads working on the given future. Once this function returns, + * it is safe to examine the intermediate state of the asynchronous computation. + * @param[in,out] future The background task to pause. + */ +void dmnsn_future_pause(dmnsn_future *future); + +/** + * Resume a previously paused future object. + * @param[in,out] future The background task to resume. + */ +void dmnsn_future_resume(dmnsn_future *future); diff --git a/libdimension/future-internal.h b/libdimension/future-internal.h index 4ec6f75..644a486 100644 --- a/libdimension/future-internal.h +++ b/libdimension/future-internal.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (C) 2010-2013 Tavian Barnes * + * Copyright (C) 2010-2014 Tavian Barnes * * * * This file is part of The Dimension Library. * * * @@ -34,6 +34,11 @@ DMNSN_INTERNAL void dmnsn_future_set_total(dmnsn_future *future, size_t total); DMNSN_INTERNAL void dmnsn_future_increment(dmnsn_future *future); /** Instantly complete the background teask. */ DMNSN_INTERNAL void dmnsn_future_done(dmnsn_future *future); +/** Set the number of worker threads. */ +DMNSN_INTERNAL void dmnsn_future_set_nthreads(dmnsn_future *future, + unsigned int nthreads); +/** Notify completion of a worker thread. */ +DMNSN_INTERNAL void dmnsn_future_thread_done(dmnsn_future *future); struct dmnsn_future { size_t progress; /**< Completed loop iterations. */ @@ -50,4 +55,17 @@ struct dmnsn_future { /** Minimum waited-on value. */ double min_wait; + + /** Number of threads working on the future's background task. */ + unsigned int nthreads; + /** Number of threads not yet paused. */ + unsigned int nrunning; + /** Count of threads holding the future paused. */ + unsigned int npaused; + /** Condition variable for waiting for nrunning == 0. */ + pthread_cond_t none_running_cond; + /** Condition variable for waiting for nrunning == nthreads. */ + pthread_cond_t all_running_cond; + /** Condition variable for waiting for npaused == 0. */ + pthread_cond_t resume_cond; }; diff --git a/libdimension/future.c b/libdimension/future.c index 6ea15ae..8d9c0c8 100644 --- a/libdimension/future.c +++ b/libdimension/future.c @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (C) 2009-2013 Tavian Barnes * + * Copyright (C) 2009-2014 Tavian Barnes * * * * This file is part of The Dimension Library. * * * @@ -31,7 +31,7 @@ * away the constness. This is okay since all valid dmnsn_futures live on the * heap, so cannot be const. */ -#define MUTATE(future) ((dmnsn_future *)future) +#define MUTATE(future) ((dmnsn_future *)(future)) /* Allocate a new dmnsn_future* */ dmnsn_future * @@ -46,6 +46,12 @@ dmnsn_new_future(void) future->min_wait = 1.0; + future->nthreads = future->nrunning = 1; + future->npaused = 0; + dmnsn_initialize_cond(&future->none_running_cond); + dmnsn_initialize_cond(&future->all_running_cond); + dmnsn_initialize_cond(&future->resume_cond); + return future; } @@ -65,6 +71,9 @@ dmnsn_future_join(dmnsn_future *future) } /* Free the future object */ + dmnsn_destroy_cond(&future->resume_cond); + dmnsn_destroy_cond(&future->all_running_cond); + dmnsn_destroy_cond(&future->none_running_cond); dmnsn_destroy_cond(&future->cond); dmnsn_destroy_mutex(&future->mutex); dmnsn_free(future); @@ -114,14 +123,44 @@ dmnsn_future_wait(const dmnsn_future *future, double progress) dmnsn_lock_mutex(&mfuture->mutex); while (dmnsn_future_progress_unlocked(mfuture) < progress) { /* Set the minimum waited-on value */ - if (progress < mfuture->min_wait) + if (progress < mfuture->min_wait) { mfuture->min_wait = progress; + } - dmnsn_cond_wait(&mfuture->cond, &mfuture->mutex); + dmnsn_cond_wait_safely(&mfuture->cond, &mfuture->mutex); } dmnsn_unlock_mutex(&mfuture->mutex); } +/* Pause all threads working on a future. */ +void +dmnsn_future_pause(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + while (future->nrunning < future->nthreads) { + dmnsn_cond_wait_safely(&future->all_running_cond, &future->mutex); + } + if (future->npaused++ == 0) { + while (future->nrunning > 0) { + dmnsn_cond_wait_safely(&future->none_running_cond, &future->mutex); + } + } + dmnsn_unlock_mutex(&future->mutex); +} + +/* Resume all threads working on a future. */ +void +dmnsn_future_resume(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->npaused > 0, + "dmnsn_future_resume() without matching dmnsn_future_pause()"); + if (--future->npaused == 0) { + dmnsn_cond_broadcast(&future->resume_cond); + } + dmnsn_unlock_mutex(&future->mutex); +} + /* Set the total number of loop iterations */ void dmnsn_future_set_total(dmnsn_future *future, size_t total) @@ -131,6 +170,14 @@ dmnsn_future_set_total(dmnsn_future *future, size_t total) dmnsn_unlock_mutex(&future->mutex); } +static void +dmnsn_future_increment_cleanup(void *ptr) +{ + dmnsn_future *future = ptr; + ++future->nrunning; + dmnsn_unlock_mutex_impl(&future->mutex); +} + /* Increment the number of completed loop iterations */ void dmnsn_future_increment(dmnsn_future *future) @@ -147,6 +194,24 @@ dmnsn_future_increment(dmnsn_future *future) future->min_wait = 1.0; dmnsn_cond_broadcast(&future->cond); } + + if (future->npaused > 0) { + dmnsn_assert(future->nrunning > 0, "More worker threads than expected"); + + if (--future->nrunning == 0) { + dmnsn_cond_broadcast(&future->none_running_cond); + } + + pthread_cleanup_push(dmnsn_future_increment_cleanup, future); + do { + dmnsn_cond_wait(&future->resume_cond, &future->mutex); + } while (future->npaused > 0); + pthread_cleanup_pop(false); + + if (++future->nrunning == future->nthreads) { + dmnsn_cond_broadcast(&future->all_running_cond); + } + } dmnsn_unlock_mutex(&future->mutex); } @@ -156,6 +221,37 @@ dmnsn_future_done(dmnsn_future *future) { dmnsn_lock_mutex(&future->mutex); future->progress = future->total; + future->nthreads = future->nrunning = 0; dmnsn_cond_broadcast(&future->cond); + dmnsn_cond_broadcast(&future->none_running_cond); + dmnsn_cond_broadcast(&future->all_running_cond); + dmnsn_unlock_mutex(&future->mutex); +} + +/* Set the number of threads */ +void +dmnsn_future_set_nthreads(dmnsn_future *future, unsigned int nthreads) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->nrunning == future->nthreads, + "dmnsn_future_set_nthreads() called with paused threads"); + future->nthreads = future->nrunning = nthreads; + dmnsn_unlock_mutex(&future->mutex); +} + +/* Notify completion of a worker thread */ +void +dmnsn_future_thread_done(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->nthreads > 0, + "dmnsn_future_thread_done() called with no threads"); + --future->nthreads; + + dmnsn_assert(future->nrunning > 0, + "dmnsn_future_thread_done() called with no running threads"); + if (--future->nrunning == 0) { + dmnsn_cond_broadcast(&future->none_running_cond); + } dmnsn_unlock_mutex(&future->mutex); } diff --git a/libdimension/prtree.c b/libdimension/prtree.c index 0d459eb..a726982 100644 --- a/libdimension/prtree.c +++ b/libdimension/prtree.c @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (C) 2010-2013 Tavian Barnes * + * Copyright (C) 2010-2014 Tavian Barnes * * * * This file is part of The Dimension Library. * * * @@ -307,7 +307,7 @@ dmnsn_priority_leaves(const dmnsn_array *leaves, unsigned int nthreads) .sorted_leaves = sorted_leaves, .nleaves = nleaves, }; - dmnsn_execute_concurrently(dmnsn_sort_leaves, &payload, nthreads); + dmnsn_execute_concurrently(NULL, dmnsn_sort_leaves, &payload, nthreads); } else { for (size_t i = 0; i < DMNSN_PSEUDO_B; ++i) { sorted_leaves[i] = dmnsn_sort_leaf_array(leaves_arr, nleaves, i); diff --git a/libdimension/ray_trace.c b/libdimension/ray_trace.c index 7587e2c..c9f7da4 100644 --- a/libdimension/ray_trace.c +++ b/libdimension/ray_trace.c @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (C) 2010-2012 Tavian Barnes * + * Copyright (C) 2010-2014 Tavian Barnes * * * * This file is part of The Dimension Library. * * * @@ -89,7 +89,8 @@ dmnsn_ray_trace_scene_thread(void *ptr) /* Time the render itself */ dmnsn_timer_start(&payload->scene->render_timer); - int ret = dmnsn_execute_concurrently(dmnsn_ray_trace_scene_concurrent, + int ret = dmnsn_execute_concurrently(payload->future, + dmnsn_ray_trace_scene_concurrent, payload, payload->scene->nthreads); dmnsn_timer_stop(&payload->scene->render_timer); diff --git a/libdimension/tests/render.c b/libdimension/tests/render.c index 58a7930..1efff28 100644 --- a/libdimension/tests/render.c +++ b/libdimension/tests/render.c @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (C) 2010-2011 Tavian Barnes * + * Copyright (C) 2010-2014 Tavian Barnes * * * * This file is part of The Dimension Test Suite. * * * @@ -367,12 +367,15 @@ main(void) /* Display the scene as it's rendered */ if (display) { while (dmnsn_future_progress(future) < 1.0) { - if (dmnsn_gl_write_canvas(scene->canvas) != 0) { - dmnsn_delete_display(display); - dmnsn_delete_scene(scene); - fprintf(stderr, "--- Drawing to OpenGL failed! ---\n"); - return EXIT_FAILURE; - } + dmnsn_future_pause(future); + if (dmnsn_gl_write_canvas(scene->canvas) != 0) { + dmnsn_delete_display(display); + dmnsn_delete_scene(scene); + fprintf(stderr, "--- Drawing to OpenGL failed! ---\n"); + return EXIT_FAILURE; + } + dmnsn_future_resume(future); + dmnsn_display_flush(display); } } diff --git a/libdimension/threads.c b/libdimension/threads.c index 0aed16d..76f4796 100644 --- a/libdimension/threads.c +++ b/libdimension/threads.c @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (C) 2010-2011 Tavian Barnes * + * Copyright (C) 2010-2014 Tavian Barnes * * * * This file is part of The Dimension Library. * * * @@ -73,11 +73,12 @@ dmnsn_new_thread(dmnsn_future *future, dmnsn_thread_fn *thread_fn, void *arg) /** Payload for threads executed by dmnsn_execute_concurrently(). */ typedef struct dmnsn_ccthread_payload { + dmnsn_future *future; dmnsn_ccthread_fn *ccthread_fn; void *arg; unsigned int thread, nthreads; int ret; - bool started; + bool running; } dmnsn_ccthread_payload; static void * @@ -86,10 +87,14 @@ dmnsn_concurrent_thread(void *ptr) dmnsn_ccthread_payload *payload = ptr; payload->ret = payload->ccthread_fn(payload->arg, payload->thread, payload->nthreads); + if (payload->future) { + dmnsn_future_thread_done(payload->future); + } return NULL; } typedef struct dmnsn_ccthread_cleanup_payload { + dmnsn_future *future; pthread_t *threads; dmnsn_ccthread_payload *payloads; unsigned int nthreads; @@ -101,38 +106,48 @@ dmnsn_ccthread_cleanup(void *ptr) dmnsn_ccthread_cleanup_payload *payload = ptr; for (unsigned int i = 0; i < payload->nthreads; ++i) { - if (payload->payloads[i].started) { + if (payload->payloads[i].running) { pthread_cancel(payload->threads[i]); } } for (unsigned int i = 0; i < payload->nthreads; ++i) { - if (payload->payloads[i].started) { + if (payload->payloads[i].running) { dmnsn_join_thread(payload->threads[i], NULL); } } + + if (payload->future) { + dmnsn_future_set_nthreads(payload->future, 1); + } } int -dmnsn_execute_concurrently(dmnsn_ccthread_fn *ccthread_fn, +dmnsn_execute_concurrently(dmnsn_future *future, dmnsn_ccthread_fn *ccthread_fn, void *arg, unsigned int nthreads) { dmnsn_assert(nthreads > 0, "Attempt to execute using 0 concurrent threads."); + if (future) { + dmnsn_future_set_nthreads(future, nthreads); + } + pthread_t threads[nthreads]; dmnsn_ccthread_payload payloads[nthreads]; for (unsigned int i = 0; i < nthreads; ++i) { - payloads[i].started = false; + payloads[i].running = false; } int ret = 0; dmnsn_ccthread_cleanup_payload cleanup_payload = { + .future = future, .threads = threads, .payloads = payloads, .nthreads = nthreads, }; pthread_cleanup_push(dmnsn_ccthread_cleanup, &cleanup_payload); for (unsigned int i = 0; i < nthreads; ++i) { + payloads[i].future = future; payloads[i].ccthread_fn = ccthread_fn; payloads[i].arg = arg; payloads[i].thread = i; @@ -143,18 +158,22 @@ dmnsn_execute_concurrently(dmnsn_ccthread_fn *ccthread_fn, { dmnsn_error("Couldn't start worker thread."); } - payloads[i].started = true; + payloads[i].running = true; } for (unsigned int i = 0; i < nthreads; ++i) { dmnsn_join_thread(threads[i], NULL); - payloads[i].started = false; + payloads[i].running = false; if (payloads[i].ret != 0) { ret = payloads[i].ret; } } pthread_cleanup_pop(false); + if (future) { + dmnsn_future_set_nthreads(future, 1); + } + return ret; } @@ -177,7 +196,7 @@ dmnsn_lock_mutex_impl(pthread_mutex_t *mutex) } void -dmnsn_unlock_mutex_impl(pthread_mutex_t *mutex) +dmnsn_unlock_mutex_impl(void *mutex) { if (pthread_mutex_unlock(mutex) != 0) { dmnsn_error("Couldn't unlock mutex."); diff --git a/libdimension/threads.h b/libdimension/threads.h index 3fba3b9..0c9d6fd 100644 --- a/libdimension/threads.h +++ b/libdimension/threads.h @@ -53,12 +53,15 @@ typedef int dmnsn_ccthread_fn(void *ptr, unsigned int thread, /** * Run \p nthreads threads in parallel. + * @param[in,out] future The future object to associate with the threads, + * possibly NULL. * @param[in] ccthread_fn The routine to run in each concurrent thread. * @param[in,out] arg The pointer to pass to the thread callbacks. * @param[in] nthreads The number of concurrent threads to run. * @return 0 if all threads were successful, and an error code otherwise. */ -DMNSN_INTERNAL int dmnsn_execute_concurrently(dmnsn_ccthread_fn *ccthread_fn, +DMNSN_INTERNAL int dmnsn_execute_concurrently(dmnsn_future *future, + dmnsn_ccthread_fn *ccthread_fn, void *arg, unsigned int nthreads); /** @@ -70,21 +73,21 @@ DMNSN_INTERNAL void dmnsn_initialize_mutex(pthread_mutex_t *mutex); /** dmnsn_lock_mutex() implementation. */ DMNSN_INTERNAL void dmnsn_lock_mutex_impl(pthread_mutex_t *mutex); /** dmnsn_unlock_mutex() implementation. */ -DMNSN_INTERNAL void dmnsn_unlock_mutex_impl(pthread_mutex_t *mutex); +DMNSN_INTERNAL void dmnsn_unlock_mutex_impl(void *mutex); /** * Lock a mutex, bailing out on failure. * Contains a {, so must be used in the same block as dmnsn_unlock_mutex(). * @param[in,out] mutex The mutex to lock. */ -#define dmnsn_lock_mutex(mutex) dmnsn_lock_mutex_impl((mutex)); { +#define dmnsn_lock_mutex(mutex) do { dmnsn_lock_mutex_impl((mutex)) /** * Unlock a mutex, bailing out on failure. * Contains a }, so must be used in the same block as dmnsn_lock_mutex(). * @param[in,out] mutex The mutex to unlock. */ -#define dmnsn_unlock_mutex(mutex) dmnsn_unlock_mutex_impl((mutex)); } +#define dmnsn_unlock_mutex(mutex) dmnsn_unlock_mutex_impl((mutex)); } while (0) /** * Destroy a mutex, warning on failure. @@ -110,14 +113,14 @@ DMNSN_INTERNAL void dmnsn_unlock_rwlock_impl(pthread_rwlock_t *rwlock); * Contains a {, so must be used in the same block as dmnsn_unlock_rwlock(). * @param[in,out] rwlock The read-write lock to lock. */ -#define dmnsn_read_lock(rwlock) dmnsn_read_lock_impl((rwlock)); { +#define dmnsn_read_lock(rwlock) do { dmnsn_read_lock_impl((rwlock)) /** * Write-lock a read-write lock, bailing out on failure. * Contains a {, so must be used in the same block as dmnsn_unlock_rwlock(). * @param[in,out] rwlock The read-write lock to lock. */ -#define dmnsn_write_lock(rwlock) dmnsn_write_lock_impl((rwlock)); { +#define dmnsn_write_lock(rwlock) do { dmnsn_write_lock_impl((rwlock)) /** * Unlock a read-write lock, bailing out on failure. @@ -125,7 +128,8 @@ DMNSN_INTERNAL void dmnsn_unlock_rwlock_impl(pthread_rwlock_t *rwlock); * dmnsn_write_lock(). * @param[in,out] rwlock The read-write lock to lock. */ -#define dmnsn_unlock_rwlock(rwlock) dmnsn_unlock_rwlock_impl((rwlock)); } +#define dmnsn_unlock_rwlock(rwlock) \ + dmnsn_unlock_rwlock_impl((rwlock)); } while (0) /** * Destroy a read-write lock, warning on failure. @@ -146,6 +150,18 @@ DMNSN_INTERNAL void dmnsn_initialize_cond(pthread_cond_t *cond); */ DMNSN_INTERNAL void dmnsn_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); +/** + * Wait on a condition variable, bailing out on error, and unlock the mutex if + * cancelled. + * @param[in] cond The condition variable to wait on. + * @param[in] mutex The associated mutex. + */ +#define dmnsn_cond_wait_safely(cond, mutex) \ + do { \ + pthread_cleanup_push(dmnsn_unlock_mutex_impl, (mutex)); \ + dmnsn_cond_wait((cond), (mutex)); \ + pthread_cleanup_pop(false); \ + } while (0) /** * Signal a condition variable, bailing out on error. -- cgit v1.2.3