From 0acff566213fdddbc8f4561887aced121f82dc26 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Wed, 23 Apr 2014 18:12:53 -0400 Subject: future: Add a race-free way to examine a partial computation. This allows safe OpenGL previews, for example. dmnsn_future* learned the dmnsn_future_{pause,resume}() functions which cause all worker threads to block. render.test now survives Helgrind with no errors. --- libdimension/future.c | 104 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 100 insertions(+), 4 deletions(-) (limited to 'libdimension/future.c') diff --git a/libdimension/future.c b/libdimension/future.c index 6ea15ae..8d9c0c8 100644 --- a/libdimension/future.c +++ b/libdimension/future.c @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (C) 2009-2013 Tavian Barnes * + * Copyright (C) 2009-2014 Tavian Barnes * * * * This file is part of The Dimension Library. * * * @@ -31,7 +31,7 @@ * away the constness. This is okay since all valid dmnsn_futures live on the * heap, so cannot be const. */ -#define MUTATE(future) ((dmnsn_future *)future) +#define MUTATE(future) ((dmnsn_future *)(future)) /* Allocate a new dmnsn_future* */ dmnsn_future * @@ -46,6 +46,12 @@ dmnsn_new_future(void) future->min_wait = 1.0; + future->nthreads = future->nrunning = 1; + future->npaused = 0; + dmnsn_initialize_cond(&future->none_running_cond); + dmnsn_initialize_cond(&future->all_running_cond); + dmnsn_initialize_cond(&future->resume_cond); + return future; } @@ -65,6 +71,9 @@ dmnsn_future_join(dmnsn_future *future) } /* Free the future object */ + dmnsn_destroy_cond(&future->resume_cond); + dmnsn_destroy_cond(&future->all_running_cond); + dmnsn_destroy_cond(&future->none_running_cond); dmnsn_destroy_cond(&future->cond); dmnsn_destroy_mutex(&future->mutex); dmnsn_free(future); @@ -114,14 +123,44 @@ dmnsn_future_wait(const dmnsn_future *future, double progress) dmnsn_lock_mutex(&mfuture->mutex); while (dmnsn_future_progress_unlocked(mfuture) < progress) { /* Set the minimum waited-on value */ - if (progress < mfuture->min_wait) + if (progress < mfuture->min_wait) { mfuture->min_wait = progress; + } - dmnsn_cond_wait(&mfuture->cond, &mfuture->mutex); + dmnsn_cond_wait_safely(&mfuture->cond, &mfuture->mutex); } dmnsn_unlock_mutex(&mfuture->mutex); } +/* Pause all threads working on a future. */ +void +dmnsn_future_pause(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + while (future->nrunning < future->nthreads) { + dmnsn_cond_wait_safely(&future->all_running_cond, &future->mutex); + } + if (future->npaused++ == 0) { + while (future->nrunning > 0) { + dmnsn_cond_wait_safely(&future->none_running_cond, &future->mutex); + } + } + dmnsn_unlock_mutex(&future->mutex); +} + +/* Resume all threads working on a future. */ +void +dmnsn_future_resume(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->npaused > 0, + "dmnsn_future_resume() without matching dmnsn_future_pause()"); + if (--future->npaused == 0) { + dmnsn_cond_broadcast(&future->resume_cond); + } + dmnsn_unlock_mutex(&future->mutex); +} + /* Set the total number of loop iterations */ void dmnsn_future_set_total(dmnsn_future *future, size_t total) @@ -131,6 +170,14 @@ dmnsn_future_set_total(dmnsn_future *future, size_t total) dmnsn_unlock_mutex(&future->mutex); } +static void +dmnsn_future_increment_cleanup(void *ptr) +{ + dmnsn_future *future = ptr; + ++future->nrunning; + dmnsn_unlock_mutex_impl(&future->mutex); +} + /* Increment the number of completed loop iterations */ void dmnsn_future_increment(dmnsn_future *future) @@ -147,6 +194,24 @@ dmnsn_future_increment(dmnsn_future *future) future->min_wait = 1.0; dmnsn_cond_broadcast(&future->cond); } + + if (future->npaused > 0) { + dmnsn_assert(future->nrunning > 0, "More worker threads than expected"); + + if (--future->nrunning == 0) { + dmnsn_cond_broadcast(&future->none_running_cond); + } + + pthread_cleanup_push(dmnsn_future_increment_cleanup, future); + do { + dmnsn_cond_wait(&future->resume_cond, &future->mutex); + } while (future->npaused > 0); + pthread_cleanup_pop(false); + + if (++future->nrunning == future->nthreads) { + dmnsn_cond_broadcast(&future->all_running_cond); + } + } dmnsn_unlock_mutex(&future->mutex); } @@ -156,6 +221,37 @@ dmnsn_future_done(dmnsn_future *future) { dmnsn_lock_mutex(&future->mutex); future->progress = future->total; + future->nthreads = future->nrunning = 0; dmnsn_cond_broadcast(&future->cond); + dmnsn_cond_broadcast(&future->none_running_cond); + dmnsn_cond_broadcast(&future->all_running_cond); + dmnsn_unlock_mutex(&future->mutex); +} + +/* Set the number of threads */ +void +dmnsn_future_set_nthreads(dmnsn_future *future, unsigned int nthreads) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->nrunning == future->nthreads, + "dmnsn_future_set_nthreads() called with paused threads"); + future->nthreads = future->nrunning = nthreads; + dmnsn_unlock_mutex(&future->mutex); +} + +/* Notify completion of a worker thread */ +void +dmnsn_future_thread_done(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->nthreads > 0, + "dmnsn_future_thread_done() called with no threads"); + --future->nthreads; + + dmnsn_assert(future->nrunning > 0, + "dmnsn_future_thread_done() called with no running threads"); + if (--future->nrunning == 0) { + dmnsn_cond_broadcast(&future->none_running_cond); + } dmnsn_unlock_mutex(&future->mutex); } -- cgit v1.2.3