From 7b09710392d35fb55b52031d447a542d99fc6b4b Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Tue, 19 Aug 2014 17:10:03 -0400 Subject: Modularize the libdimension codebase. --- libdimension/concurrency/future.c | 281 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100644 libdimension/concurrency/future.c (limited to 'libdimension/concurrency/future.c') diff --git a/libdimension/concurrency/future.c b/libdimension/concurrency/future.c new file mode 100644 index 0000000..90ffa24 --- /dev/null +++ b/libdimension/concurrency/future.c @@ -0,0 +1,281 @@ +/************************************************************************* + * Copyright (C) 2009-2014 Tavian Barnes * + * * + * This file is part of The Dimension Library. * + * * + * The Dimension Library is free software; you can redistribute it and/ * + * or modify it under the terms of the GNU Lesser General Public License * + * as published by the Free Software Foundation; either version 3 of the * + * License, or (at your option) any later version. * + * * + * The Dimension Library is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * + * Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public * + * License along with this program. If not, see * + * . * + *************************************************************************/ + +/** + * @file + * Future objects. + */ + +#include "internal.h" +#include "internal/concurrency.h" +#include "internal/future.h" +#include + +/** + * Since C doesn't support anything like C++'s mutable, we fake it by casting + * away the constness. This is okay since all valid dmnsn_futures live on the + * heap, so cannot be const. + */ +#define MUTATE(future) ((dmnsn_future *)(future)) + +// Allocate a new dmnsn_future* +dmnsn_future * +dmnsn_new_future(void) +{ + dmnsn_future *future = DMNSN_MALLOC(dmnsn_future); + future->progress = 0; + future->total = 1; + + dmnsn_initialize_mutex(&future->mutex); + dmnsn_initialize_cond(&future->cond); + + future->min_wait = 1.0; + + future->nthreads = future->nrunning = 1; + future->npaused = 0; + dmnsn_initialize_cond(&future->none_running_cond); + dmnsn_initialize_cond(&future->all_running_cond); + dmnsn_initialize_cond(&future->resume_cond); + + return future; +} + +static void +dmnsn_delete_future(dmnsn_future *future) +{ + if (future) { + dmnsn_destroy_cond(&future->resume_cond); + dmnsn_destroy_cond(&future->all_running_cond); + dmnsn_destroy_cond(&future->none_running_cond); + dmnsn_destroy_cond(&future->cond); + dmnsn_destroy_mutex(&future->mutex); + dmnsn_free(future); + } +} + +// Join the worker thread and delete `future'. +int +dmnsn_future_join(dmnsn_future *future) +{ + void *ptr; + int retval = -1; + + if (future) { + dmnsn_assert(future->npaused == 0, "Attempt to join future while paused"); + + // Get the thread's return value + dmnsn_join_thread(future->thread, &ptr); + if (ptr && ptr != PTHREAD_CANCELED) { + retval = *(int *)ptr; + dmnsn_free(ptr); + } + + // Free the future object + dmnsn_delete_future(future); + } + + return retval; +} + +// Cancel a background thread +void +dmnsn_future_cancel(dmnsn_future *future) +{ + pthread_cancel(future->thread); +} + +/** + * Get the current progress, without locking anything. + * + * future->mutex must be locked for this call to be safe. + */ +static inline double +dmnsn_future_progress_unlocked(const dmnsn_future *future) +{ + return (double)future->progress/future->total; +} + +// Get the current progress of the worker thread, in [0.0, 1.0] +double +dmnsn_future_progress(const dmnsn_future *future) +{ + dmnsn_future *mfuture = MUTATE(future); + double progress; + + dmnsn_lock_mutex(&mfuture->mutex); + progress = dmnsn_future_progress_unlocked(mfuture); + dmnsn_unlock_mutex(&mfuture->mutex); + + return progress; +} + +// Find out whether the task is complete. +bool +dmnsn_future_is_done(const dmnsn_future *future) +{ + dmnsn_future *mfuture = MUTATE(future); + bool result; + + dmnsn_lock_mutex(&mfuture->mutex); + result = future->progress == future->total; + dmnsn_unlock_mutex(&mfuture->mutex); + + return result; +} + +// Wait until dmnsn_future_progress(future) >= progress +void +dmnsn_future_wait(const dmnsn_future *future, double progress) +{ + dmnsn_future *mfuture = MUTATE(future); + + dmnsn_lock_mutex(&mfuture->mutex); + while (dmnsn_future_progress_unlocked(mfuture) < progress) { + // Set the minimum waited-on value + if (progress < mfuture->min_wait) { + mfuture->min_wait = progress; + } + + dmnsn_cond_wait_safely(&mfuture->cond, &mfuture->mutex); + } + dmnsn_unlock_mutex(&mfuture->mutex); +} + +// Pause all threads working on a future. +void +dmnsn_future_pause(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + while (future->nrunning < future->nthreads) { + dmnsn_cond_wait_safely(&future->all_running_cond, &future->mutex); + } + ++future->npaused; + while (future->nrunning > 0) { + dmnsn_cond_wait_safely(&future->none_running_cond, &future->mutex); + } + dmnsn_unlock_mutex(&future->mutex); +} + +// Resume all threads working on a future. +void +dmnsn_future_resume(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->npaused > 0, "dmnsn_future_resume() without matching dmnsn_future_pause()"); + if (--future->npaused == 0) { + dmnsn_cond_broadcast(&future->resume_cond); + } + dmnsn_unlock_mutex(&future->mutex); +} + +// Set the total number of loop iterations +void +dmnsn_future_set_total(dmnsn_future *future, size_t total) +{ + dmnsn_lock_mutex(&future->mutex); + future->total = total; + dmnsn_unlock_mutex(&future->mutex); +} + +static void +dmnsn_future_increment_cleanup(void *ptr) +{ + dmnsn_future *future = ptr; + ++future->nrunning; + dmnsn_unlock_mutex_impl(&future->mutex); +} + +// Increment the number of completed loop iterations +void +dmnsn_future_increment(dmnsn_future *future) +{ + // Allow a thread to be canceled whenever it increments a future object -- + // this is close to PTHREAD_CANCEL_ASYNCHRONOUS but allows consistent state + // on cancellation + pthread_testcancel(); + + dmnsn_lock_mutex(&future->mutex); + ++future->progress; + + if (dmnsn_future_progress_unlocked(future) >= future->min_wait) { + future->min_wait = 1.0; + dmnsn_cond_broadcast(&future->cond); + } + + if (future->npaused > 0) { + dmnsn_assert(future->nrunning > 0, "More worker threads than expected"); + + if (--future->nrunning == 0) { + dmnsn_cond_broadcast(&future->none_running_cond); + } + + pthread_cleanup_push(dmnsn_future_increment_cleanup, future); + do { + dmnsn_cond_wait(&future->resume_cond, &future->mutex); + } while (future->npaused > 0); + pthread_cleanup_pop(false); + + if (++future->nrunning == future->nthreads) { + dmnsn_cond_broadcast(&future->all_running_cond); + } + } + dmnsn_unlock_mutex(&future->mutex); +} + +// Immediately set to 100% completion +void +dmnsn_future_finish(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + future->progress = future->total; + future->nthreads = future->nrunning = 0; + dmnsn_cond_broadcast(&future->cond); + dmnsn_cond_broadcast(&future->none_running_cond); + dmnsn_cond_broadcast(&future->all_running_cond); + dmnsn_unlock_mutex(&future->mutex); +} + +// Set the number of threads +void +dmnsn_future_set_nthreads(dmnsn_future *future, unsigned int nthreads) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->nrunning == future->nthreads, + "dmnsn_future_set_nthreads() called with paused threads"); + future->nthreads = future->nrunning = nthreads; + dmnsn_unlock_mutex(&future->mutex); +} + +// Notify completion of a worker thread +void +dmnsn_future_finish_thread(dmnsn_future *future) +{ + dmnsn_lock_mutex(&future->mutex); + dmnsn_assert(future->nthreads > 0, + "dmnsn_future_finish_thread() called with no threads"); + --future->nthreads; + + dmnsn_assert(future->nrunning > 0, + "dmnsn_future_finish_thread() called with no running threads"); + if (--future->nrunning == 0) { + dmnsn_cond_broadcast(&future->none_running_cond); + } + dmnsn_unlock_mutex(&future->mutex); +} -- cgit v1.2.3