summaryrefslogtreecommitdiffstats
path: root/libdimension/concurrency
diff options
context:
space:
mode:
authorTavian Barnes <tavianator@tavianator.com>2014-08-19 17:10:03 -0400
committerTavian Barnes <tavianator@tavianator.com>2015-10-25 11:03:56 -0400
commit7b09710392d35fb55b52031d447a542d99fc6b4b (patch)
tree270eb927ee8c52ceeb99926ebf4843704775a610 /libdimension/concurrency
parent200c86b91ea7063d35be3bffc11c5da53c054653 (diff)
downloaddimension-7b09710392d35fb55b52031d447a542d99fc6b4b.tar.xz
Modularize the libdimension codebase.
Diffstat (limited to 'libdimension/concurrency')
-rw-r--r--libdimension/concurrency/future.c281
-rw-r--r--libdimension/concurrency/threads.c326
2 files changed, 607 insertions, 0 deletions
diff --git a/libdimension/concurrency/future.c b/libdimension/concurrency/future.c
new file mode 100644
index 0000000..90ffa24
--- /dev/null
+++ b/libdimension/concurrency/future.c
@@ -0,0 +1,281 @@
+/*************************************************************************
+ * Copyright (C) 2009-2014 Tavian Barnes <tavianator@tavianator.com> *
+ * *
+ * This file is part of The Dimension Library. *
+ * *
+ * The Dimension Library is free software; you can redistribute it and/ *
+ * or modify it under the terms of the GNU Lesser General Public License *
+ * as published by the Free Software Foundation; either version 3 of the *
+ * License, or (at your option) any later version. *
+ * *
+ * The Dimension Library is distributed in the hope that it will be *
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty *
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
+ * Lesser General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU Lesser General Public *
+ * License along with this program. If not, see *
+ * <http://www.gnu.org/licenses/>. *
+ *************************************************************************/
+
+/**
+ * @file
+ * Future objects.
+ */
+
+#include "internal.h"
+#include "internal/concurrency.h"
+#include "internal/future.h"
+#include <pthread.h>
+
+/**
+ * Since C doesn't support anything like C++'s mutable, we fake it by casting
+ * away the constness. This is okay since all valid dmnsn_futures live on the
+ * heap, so cannot be const.
+ */
+#define MUTATE(future) ((dmnsn_future *)(future))
+
+// Allocate a new dmnsn_future*
+dmnsn_future *
+dmnsn_new_future(void)
+{
+ dmnsn_future *future = DMNSN_MALLOC(dmnsn_future);
+ future->progress = 0;
+ future->total = 1;
+
+ dmnsn_initialize_mutex(&future->mutex);
+ dmnsn_initialize_cond(&future->cond);
+
+ future->min_wait = 1.0;
+
+ future->nthreads = future->nrunning = 1;
+ future->npaused = 0;
+ dmnsn_initialize_cond(&future->none_running_cond);
+ dmnsn_initialize_cond(&future->all_running_cond);
+ dmnsn_initialize_cond(&future->resume_cond);
+
+ return future;
+}
+
+static void
+dmnsn_delete_future(dmnsn_future *future)
+{
+ if (future) {
+ dmnsn_destroy_cond(&future->resume_cond);
+ dmnsn_destroy_cond(&future->all_running_cond);
+ dmnsn_destroy_cond(&future->none_running_cond);
+ dmnsn_destroy_cond(&future->cond);
+ dmnsn_destroy_mutex(&future->mutex);
+ dmnsn_free(future);
+ }
+}
+
+// Join the worker thread and delete `future'.
+int
+dmnsn_future_join(dmnsn_future *future)
+{
+ void *ptr;
+ int retval = -1;
+
+ if (future) {
+ dmnsn_assert(future->npaused == 0, "Attempt to join future while paused");
+
+ // Get the thread's return value
+ dmnsn_join_thread(future->thread, &ptr);
+ if (ptr && ptr != PTHREAD_CANCELED) {
+ retval = *(int *)ptr;
+ dmnsn_free(ptr);
+ }
+
+ // Free the future object
+ dmnsn_delete_future(future);
+ }
+
+ return retval;
+}
+
+// Cancel a background thread
+void
+dmnsn_future_cancel(dmnsn_future *future)
+{
+ pthread_cancel(future->thread);
+}
+
+/**
+ * Get the current progress, without locking anything.
+ *
+ * future->mutex must be locked for this call to be safe.
+ */
+static inline double
+dmnsn_future_progress_unlocked(const dmnsn_future *future)
+{
+ return (double)future->progress/future->total;
+}
+
+// Get the current progress of the worker thread, in [0.0, 1.0]
+double
+dmnsn_future_progress(const dmnsn_future *future)
+{
+ dmnsn_future *mfuture = MUTATE(future);
+ double progress;
+
+ dmnsn_lock_mutex(&mfuture->mutex);
+ progress = dmnsn_future_progress_unlocked(mfuture);
+ dmnsn_unlock_mutex(&mfuture->mutex);
+
+ return progress;
+}
+
+// Find out whether the task is complete.
+bool
+dmnsn_future_is_done(const dmnsn_future *future)
+{
+ dmnsn_future *mfuture = MUTATE(future);
+ bool result;
+
+ dmnsn_lock_mutex(&mfuture->mutex);
+ result = future->progress == future->total;
+ dmnsn_unlock_mutex(&mfuture->mutex);
+
+ return result;
+}
+
+// Wait until dmnsn_future_progress(future) >= progress
+void
+dmnsn_future_wait(const dmnsn_future *future, double progress)
+{
+ dmnsn_future *mfuture = MUTATE(future);
+
+ dmnsn_lock_mutex(&mfuture->mutex);
+ while (dmnsn_future_progress_unlocked(mfuture) < progress) {
+ // Set the minimum waited-on value
+ if (progress < mfuture->min_wait) {
+ mfuture->min_wait = progress;
+ }
+
+ dmnsn_cond_wait_safely(&mfuture->cond, &mfuture->mutex);
+ }
+ dmnsn_unlock_mutex(&mfuture->mutex);
+}
+
+// Pause all threads working on a future.
+void
+dmnsn_future_pause(dmnsn_future *future)
+{
+ dmnsn_lock_mutex(&future->mutex);
+ while (future->nrunning < future->nthreads) {
+ dmnsn_cond_wait_safely(&future->all_running_cond, &future->mutex);
+ }
+ ++future->npaused;
+ while (future->nrunning > 0) {
+ dmnsn_cond_wait_safely(&future->none_running_cond, &future->mutex);
+ }
+ dmnsn_unlock_mutex(&future->mutex);
+}
+
+// Resume all threads working on a future.
+void
+dmnsn_future_resume(dmnsn_future *future)
+{
+ dmnsn_lock_mutex(&future->mutex);
+ dmnsn_assert(future->npaused > 0, "dmnsn_future_resume() without matching dmnsn_future_pause()");
+ if (--future->npaused == 0) {
+ dmnsn_cond_broadcast(&future->resume_cond);
+ }
+ dmnsn_unlock_mutex(&future->mutex);
+}
+
+// Set the total number of loop iterations
+void
+dmnsn_future_set_total(dmnsn_future *future, size_t total)
+{
+ dmnsn_lock_mutex(&future->mutex);
+ future->total = total;
+ dmnsn_unlock_mutex(&future->mutex);
+}
+
+static void
+dmnsn_future_increment_cleanup(void *ptr)
+{
+ dmnsn_future *future = ptr;
+ ++future->nrunning;
+ dmnsn_unlock_mutex_impl(&future->mutex);
+}
+
+// Increment the number of completed loop iterations
+void
+dmnsn_future_increment(dmnsn_future *future)
+{
+ // Allow a thread to be canceled whenever it increments a future object --
+ // this is close to PTHREAD_CANCEL_ASYNCHRONOUS but allows consistent state
+ // on cancellation
+ pthread_testcancel();
+
+ dmnsn_lock_mutex(&future->mutex);
+ ++future->progress;
+
+ if (dmnsn_future_progress_unlocked(future) >= future->min_wait) {
+ future->min_wait = 1.0;
+ dmnsn_cond_broadcast(&future->cond);
+ }
+
+ if (future->npaused > 0) {
+ dmnsn_assert(future->nrunning > 0, "More worker threads than expected");
+
+ if (--future->nrunning == 0) {
+ dmnsn_cond_broadcast(&future->none_running_cond);
+ }
+
+ pthread_cleanup_push(dmnsn_future_increment_cleanup, future);
+ do {
+ dmnsn_cond_wait(&future->resume_cond, &future->mutex);
+ } while (future->npaused > 0);
+ pthread_cleanup_pop(false);
+
+ if (++future->nrunning == future->nthreads) {
+ dmnsn_cond_broadcast(&future->all_running_cond);
+ }
+ }
+ dmnsn_unlock_mutex(&future->mutex);
+}
+
+// Immediately set to 100% completion
+void
+dmnsn_future_finish(dmnsn_future *future)
+{
+ dmnsn_lock_mutex(&future->mutex);
+ future->progress = future->total;
+ future->nthreads = future->nrunning = 0;
+ dmnsn_cond_broadcast(&future->cond);
+ dmnsn_cond_broadcast(&future->none_running_cond);
+ dmnsn_cond_broadcast(&future->all_running_cond);
+ dmnsn_unlock_mutex(&future->mutex);
+}
+
+// Set the number of threads
+void
+dmnsn_future_set_nthreads(dmnsn_future *future, unsigned int nthreads)
+{
+ dmnsn_lock_mutex(&future->mutex);
+ dmnsn_assert(future->nrunning == future->nthreads,
+ "dmnsn_future_set_nthreads() called with paused threads");
+ future->nthreads = future->nrunning = nthreads;
+ dmnsn_unlock_mutex(&future->mutex);
+}
+
+// Notify completion of a worker thread
+void
+dmnsn_future_finish_thread(dmnsn_future *future)
+{
+ dmnsn_lock_mutex(&future->mutex);
+ dmnsn_assert(future->nthreads > 0,
+ "dmnsn_future_finish_thread() called with no threads");
+ --future->nthreads;
+
+ dmnsn_assert(future->nrunning > 0,
+ "dmnsn_future_finish_thread() called with no running threads");
+ if (--future->nrunning == 0) {
+ dmnsn_cond_broadcast(&future->none_running_cond);
+ }
+ dmnsn_unlock_mutex(&future->mutex);
+}
diff --git a/libdimension/concurrency/threads.c b/libdimension/concurrency/threads.c
new file mode 100644
index 0000000..93d2ea9
--- /dev/null
+++ b/libdimension/concurrency/threads.c
@@ -0,0 +1,326 @@
+/*************************************************************************
+ * Copyright (C) 2010-2014 Tavian Barnes <tavianator@tavianator.com> *
+ * *
+ * This file is part of The Dimension Library. *
+ * *
+ * The Dimension Library is free software; you can redistribute it and/ *
+ * or modify it under the terms of the GNU Lesser General Public License *
+ * as published by the Free Software Foundation; either version 3 of the *
+ * License, or (at your option) any later version. *
+ * *
+ * The Dimension Library is distributed in the hope that it will be *
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty *
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
+ * Lesser General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU Lesser General Public *
+ * License along with this program. If not, see *
+ * <http://www.gnu.org/licenses/>. *
+ *************************************************************************/
+
+/**
+ * @file
+ * Background threading.
+ */
+
+#include "internal.h"
+#include "internal/concurrency.h"
+#include "internal/future.h"
+#include <pthread.h>
+
+/// The payload to pass to the pthread callback.
+typedef struct dmnsn_thread_payload {
+ dmnsn_thread_fn *thread_fn;
+ void *arg;
+ dmnsn_future *future;
+} dmnsn_thread_payload;
+
+/// Clean up after a thread.
+static void
+dmnsn_thread_cleanup(void *arg)
+{
+ dmnsn_thread_payload *payload = arg;
+ dmnsn_future *future = payload->future;
+ dmnsn_free(payload);
+
+ dmnsn_future_finish(future);
+}
+
+/// pthread callback -- call the real thread callback.
+static void *
+dmnsn_thread(void *arg)
+{
+ dmnsn_thread_payload *payload = arg;
+ int *ret;
+
+ pthread_cleanup_push(dmnsn_thread_cleanup, payload);
+ ret = DMNSN_MALLOC(int);
+ *ret = payload->thread_fn(payload->arg);
+ pthread_cleanup_pop(true);
+ return ret;
+}
+
+void
+dmnsn_new_thread(dmnsn_future *future, dmnsn_thread_fn *thread_fn, void *arg)
+{
+ dmnsn_thread_payload *payload = DMNSN_MALLOC(dmnsn_thread_payload);
+ payload->thread_fn = thread_fn;
+ payload->arg = arg;
+ payload->future = future;
+
+ if (pthread_create(&future->thread, NULL, dmnsn_thread, payload) != 0) {
+ dmnsn_error("Couldn't start thread.");
+ }
+}
+
+/// Payload for threads executed by dmnsn_execute_concurrently().
+typedef struct dmnsn_ccthread_payload {
+ dmnsn_future *future;
+ dmnsn_ccthread_fn *ccthread_fn;
+ void *arg;
+ unsigned int thread, nthreads;
+ int ret;
+ bool running;
+} dmnsn_ccthread_payload;
+
+static void *
+dmnsn_concurrent_thread(void *ptr)
+{
+ dmnsn_ccthread_payload *payload = ptr;
+ payload->ret = payload->ccthread_fn(payload->arg, payload->thread,
+ payload->nthreads);
+ if (payload->future) {
+ dmnsn_future_finish_thread(payload->future);
+ }
+ return NULL;
+}
+
+typedef struct dmnsn_ccthread_cleanup_payload {
+ dmnsn_future *future;
+ pthread_t *threads;
+ dmnsn_ccthread_payload *payloads;
+ unsigned int nthreads;
+} dmnsn_ccthread_cleanup_payload;
+
+static void
+dmnsn_ccthread_cleanup(void *ptr)
+{
+ dmnsn_ccthread_cleanup_payload *payload = ptr;
+
+ for (unsigned int i = 0; i < payload->nthreads; ++i) {
+ if (payload->payloads[i].running) {
+ pthread_cancel(payload->threads[i]);
+ }
+ }
+
+ for (unsigned int i = 0; i < payload->nthreads; ++i) {
+ if (payload->payloads[i].running) {
+ dmnsn_join_thread(payload->threads[i], NULL);
+ }
+ }
+
+ if (payload->future) {
+ dmnsn_future_set_nthreads(payload->future, 1);
+ }
+}
+
+int
+dmnsn_execute_concurrently(dmnsn_future *future, dmnsn_ccthread_fn *ccthread_fn,
+ void *arg, unsigned int nthreads)
+{
+ dmnsn_assert(nthreads > 0, "Attempt to execute using 0 concurrent threads.");
+
+ if (future) {
+ dmnsn_future_set_nthreads(future, nthreads);
+ }
+
+ pthread_t threads[nthreads];
+ dmnsn_ccthread_payload payloads[nthreads];
+ for (unsigned int i = 0; i < nthreads; ++i) {
+ payloads[i].running = false;
+ }
+
+ int ret = 0;
+ dmnsn_ccthread_cleanup_payload cleanup_payload = {
+ .future = future,
+ .threads = threads,
+ .payloads = payloads,
+ .nthreads = nthreads,
+ };
+ pthread_cleanup_push(dmnsn_ccthread_cleanup, &cleanup_payload);
+ for (unsigned int i = 0; i < nthreads; ++i) {
+ payloads[i].future = future;
+ payloads[i].ccthread_fn = ccthread_fn;
+ payloads[i].arg = arg;
+ payloads[i].thread = i;
+ payloads[i].nthreads = nthreads;
+ payloads[i].ret = -1;
+ if (pthread_create(&threads[i], NULL, dmnsn_concurrent_thread,
+ &payloads[i]) != 0)
+ {
+ dmnsn_error("Couldn't start worker thread.");
+ }
+ payloads[i].running = true;
+ }
+
+ for (unsigned int i = 0; i < nthreads; ++i) {
+ dmnsn_join_thread(threads[i], NULL);
+ payloads[i].running = false;
+ if (payloads[i].ret != 0) {
+ ret = payloads[i].ret;
+ }
+ }
+ pthread_cleanup_pop(false);
+
+ if (future) {
+ dmnsn_future_set_nthreads(future, 1);
+ }
+
+ return ret;
+}
+
+// pthread wrappers
+
+void
+dmnsn_initialize_mutex(pthread_mutex_t *mutex)
+{
+ if (pthread_mutex_init(mutex, NULL) != 0) {
+ dmnsn_error("Couldn't initialize mutex.");
+ }
+}
+
+void
+dmnsn_lock_mutex_impl(pthread_mutex_t *mutex)
+{
+ if (pthread_mutex_lock(mutex) != 0) {
+ dmnsn_error("Couldn't lock mutex.");
+ }
+}
+
+void
+dmnsn_unlock_mutex_impl(void *mutex)
+{
+ if (pthread_mutex_unlock(mutex) != 0) {
+ dmnsn_error("Couldn't unlock mutex.");
+ }
+}
+
+void
+dmnsn_destroy_mutex(pthread_mutex_t *mutex)
+{
+ if (pthread_mutex_destroy(mutex) != 0) {
+ dmnsn_warning("Couldn't destroy mutex.");
+ }
+}
+
+void
+dmnsn_initialize_rwlock(pthread_rwlock_t *rwlock)
+{
+ if (pthread_rwlock_init(rwlock, NULL) != 0) {
+ dmnsn_error("Couldn't initialize read-write lock.");
+ }
+}
+
+void
+dmnsn_read_lock_impl(pthread_rwlock_t *rwlock)
+{
+ if (pthread_rwlock_rdlock(rwlock) != 0) {
+ dmnsn_error("Couldn't acquire read lock.");
+ }
+}
+
+void
+dmnsn_write_lock_impl(pthread_rwlock_t *rwlock)
+{
+ if (pthread_rwlock_wrlock(rwlock) != 0) {
+ dmnsn_error("Couldn't acquire write lock.");
+ }
+}
+
+void
+dmnsn_unlock_rwlock_impl(pthread_rwlock_t *rwlock)
+{
+ if (pthread_rwlock_unlock(rwlock) != 0) {
+ dmnsn_error("Couldn't unlock read-write lock.");
+ }
+}
+
+void
+dmnsn_destroy_rwlock(pthread_rwlock_t *rwlock)
+{
+ if (pthread_rwlock_destroy(rwlock) != 0) {
+ dmnsn_warning("Couldn't destroy read-write lock.");
+ }
+}
+
+void
+dmnsn_initialize_cond(pthread_cond_t *cond)
+{
+ if (pthread_cond_init(cond, NULL) != 0) {
+ dmnsn_error("Couldn't initialize condition variable.");
+ }
+}
+
+void
+dmnsn_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
+{
+ if (pthread_cond_wait(cond, mutex) != 0) {
+ dmnsn_error("Couldn't wait on condition variable.");
+ }
+}
+
+void
+dmnsn_cond_broadcast(pthread_cond_t *cond)
+{
+ if (pthread_cond_broadcast(cond) != 0) {
+ dmnsn_error("Couldn't signal condition variable.");
+ }
+}
+
+void
+dmnsn_destroy_cond(pthread_cond_t *cond)
+{
+ if (pthread_cond_destroy(cond) != 0) {
+ dmnsn_warning("Couldn't destroy condition variable.");
+ }
+}
+
+void
+dmnsn_once(pthread_once_t *once, dmnsn_once_fn *once_fn)
+{
+ if (pthread_once(once, once_fn) != 0) {
+ dmnsn_error("Couldn't call one-shot function.");
+ }
+}
+
+void
+dmnsn_key_create(pthread_key_t *key, dmnsn_callback_fn *destructor)
+{
+ if (pthread_key_create(key, destructor) != 0) {
+ dmnsn_error("Couldn't initialize thread-specific pointer.");
+ }
+}
+
+void
+dmnsn_setspecific(pthread_key_t key, const void *value)
+{
+ if (pthread_setspecific(key, value) != 0) {
+ dmnsn_error("Couldn't set thread-specific pointer.");
+ }
+}
+
+void
+dmnsn_key_delete(pthread_key_t key)
+{
+ if (pthread_key_delete(key) != 0) {
+ dmnsn_warning("Couldn't destroy thread-specific pointer.");
+ }
+}
+
+void
+dmnsn_join_thread(pthread_t thread, void **retval)
+{
+ if (pthread_join(thread, retval) != 0) {
+ dmnsn_error("Couldn't join thread.");
+ }
+}