From c0284b69fe0ee82d1f604a1b6f0511b4f129b919 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Fri, 19 Aug 2011 00:05:22 -0600 Subject: Support thread cancelation, and handle ^C in the client. --- dimension/dimension.in | 30 +++++++++++----- libdimension-python/dimension.pxd | 1 + libdimension-python/dimension.pyx | 12 +++++-- libdimension/dimension/progress.h | 6 ++++ libdimension/progress.c | 17 +++++++-- libdimension/threads.c | 74 +++++++++++++++++++++++++++++---------- libdimension/threads.h | 8 ++++- 7 files changed, 113 insertions(+), 35 deletions(-) diff --git a/dimension/dimension.in b/dimension/dimension.in index 01dc2f3..f0db4fc 100644 --- a/dimension/dimension.in +++ b/dimension/dimension.in @@ -111,21 +111,33 @@ from dimension import * # Display a progress bar def _progress_bar(str, progress): - if not _args.quiet: - print(str, end = ' ') - _sys.stdout.flush() + try: + if not _args.quiet: + print(str, end = ' ') + _sys.stdout.flush() + + term_width = terminal_width() + width = term_width - (len(str) + 1)%term_width + for i in range(width): + progress.wait((i + 1)/width) + print('.', end = '') + _sys.stdout.flush() - term_width = terminal_width() - width = term_width - (len(str) + 1)%term_width - for i in range(width): - progress.wait((i + 1)/width) - print('.', end = '') + print() _sys.stdout.flush() + progress.finish() + except KeyboardInterrupt: print() _sys.stdout.flush() - progress.finish() + progress.cancel() + try: + progress.finish() + except: + # Swallow the failure exception + pass + raise # --strict option die_on_warnings(_args.strict) diff --git a/libdimension-python/dimension.pxd b/libdimension-python/dimension.pxd index 5523cc0..a3523d0 100644 --- a/libdimension-python/dimension.pxd +++ b/libdimension-python/dimension.pxd @@ -71,6 +71,7 @@ cdef extern from "../libdimension/dimension.h": ctypedef struct dmnsn_progress int dmnsn_finish_progress(dmnsn_progress *progress) + void dmnsn_cancel_progress(dmnsn_progress *progress) double dmnsn_get_progress(dmnsn_progress *progress) void dmnsn_wait_progress(dmnsn_progress *progress, double prog) diff --git a/libdimension-python/dimension.pyx b/libdimension-python/dimension.pyx index 673dc50..0c0c214 100644 --- a/libdimension-python/dimension.pyx +++ b/libdimension-python/dimension.pyx @@ -60,15 +60,21 @@ cdef class Progress: finally: self._progress = NULL + def cancel(self): + self._assert_unfinished() + dmnsn_cancel_progress(self._progress) + def progress(self): - if self._progress == NULL: - raise RuntimeError("background task finished.") + self._assert_unfinished() return dmnsn_get_progress(self._progress) def wait(self, progress): + self._assert_unfinished() + dmnsn_wait_progress(self._progress, progress) + + def _assert_unfinished(self): if self._progress == NULL: raise RuntimeError("background task finished.") - dmnsn_wait_progress(self._progress, progress) cdef _Progress(dmnsn_progress *progress): cdef Progress self = Progress.__new__(Progress) diff --git a/libdimension/dimension/progress.h b/libdimension/dimension/progress.h index ff5fccd..e458c69 100644 --- a/libdimension/dimension/progress.h +++ b/libdimension/dimension/progress.h @@ -37,6 +37,12 @@ typedef struct dmnsn_progress dmnsn_progress; */ int dmnsn_finish_progress(dmnsn_progress *progress); +/** + * Interrupt the execution of a background thread. + * @param[in,out] progress The background task to cancel. + */ +void dmnsn_cancel_progress(dmnsn_progress *progress); + /** * Get the progress of the background task. * @param[in] progress The background task to examine. diff --git a/libdimension/progress.c b/libdimension/progress.c index 17fa7dc..0fa1c66 100644 --- a/libdimension/progress.c +++ b/libdimension/progress.c @@ -60,9 +60,8 @@ dmnsn_finish_progress(dmnsn_progress *progress) if (progress) { /* Get the thread's return value */ - if (pthread_join(progress->thread, &ptr) != 0) { - dmnsn_error("Joining worker thread failed."); - } else if (ptr) { + dmnsn_join_thread(progress->thread, &ptr); + if (ptr && ptr != PTHREAD_CANCELED) { retval = *(int *)ptr; dmnsn_free(ptr); } @@ -86,6 +85,13 @@ dmnsn_finish_progress(dmnsn_progress *progress) return retval; } +/* Cancel a background thread */ +void +dmnsn_cancel_progress(dmnsn_progress *progress) +{ + pthread_cancel(progress->thread); +} + /* Get the current progress of the worker thread, in [0.0, 1.0] */ double dmnsn_get_progress(const dmnsn_progress *progress) @@ -127,6 +133,11 @@ dmnsn_set_progress_total(dmnsn_progress *progress, size_t total) void dmnsn_increment_progress(dmnsn_progress *progress) { + /* Allow a thread to be canceled whenever it increments a progress object -- + this is close to PTHREAD_CANCEL_ASYNCHRONOUS but allows consistent state + on cancellation */ + pthread_testcancel(); + dmnsn_write_lock(progress->rwlock); ++progress->progress; dmnsn_unlock_rwlock(progress->rwlock); diff --git a/libdimension/threads.c b/libdimension/threads.c index 11b01cf..d4bddd3 100644 --- a/libdimension/threads.c +++ b/libdimension/threads.c @@ -52,9 +52,9 @@ dmnsn_thread(void *arg) int *ret; pthread_cleanup_push(dmnsn_thread_cleanup, payload); - ret = dmnsn_malloc(sizeof(int)); - *ret = payload->thread_fn(payload->arg); - pthread_cleanup_pop(1); + ret = dmnsn_malloc(sizeof(int)); + *ret = payload->thread_fn(payload->arg); + pthread_cleanup_pop(true); return ret; } @@ -78,6 +78,7 @@ typedef struct dmnsn_ccthread_payload { void *arg; unsigned int thread, nthreads; int ret; + bool started; } dmnsn_ccthread_payload; static void * @@ -89,6 +90,24 @@ dmnsn_concurrent_thread(void *ptr) return NULL; } +typedef struct dmnsn_ccthread_cleanup_payload { + pthread_t *threads; + dmnsn_ccthread_payload *payloads; + unsigned int nthreads; +} dmnsn_ccthread_cleanup_payload; + +static void +dmnsn_ccthread_cleanup(void *ptr) +{ + dmnsn_ccthread_cleanup_payload *payload = ptr; + for (unsigned int i = 0; i < payload->nthreads; ++i) { + if (payload->payloads[i].started) { + pthread_cancel(payload->threads[i]); + dmnsn_join_thread(payload->threads[i], NULL); + } + } +} + int dmnsn_execute_concurrently(dmnsn_ccthread_fn *ccthread_fn, void *arg, unsigned int nthreads) @@ -97,30 +116,39 @@ dmnsn_execute_concurrently(dmnsn_ccthread_fn *ccthread_fn, pthread_t threads[nthreads]; dmnsn_ccthread_payload payloads[nthreads]; - for (unsigned int i = 0; i < nthreads; ++i) { - payloads[i].ccthread_fn = ccthread_fn; - payloads[i].arg = arg; - payloads[i].thread = i; - payloads[i].nthreads = nthreads; - payloads[i].ret = -1; - if (pthread_create(&threads[i], NULL, dmnsn_concurrent_thread, - &payloads[i]) != 0) - { - dmnsn_error("Couldn't start worker thread."); - } + payloads[i].started = false; } int ret = 0; - for (unsigned int i = 0; i < nthreads; ++i) { - if (pthread_join(threads[i], NULL) == 0) { + dmnsn_ccthread_cleanup_payload cleanup_payload = { + .threads = threads, + .payloads = payloads, + .nthreads = nthreads, + }; + pthread_cleanup_push(dmnsn_ccthread_cleanup, &cleanup_payload); + for (unsigned int i = 0; i < nthreads; ++i) { + payloads[i].ccthread_fn = ccthread_fn; + payloads[i].arg = arg; + payloads[i].thread = i; + payloads[i].nthreads = nthreads; + payloads[i].ret = -1; + if (pthread_create(&threads[i], NULL, dmnsn_concurrent_thread, + &payloads[i]) != 0) + { + dmnsn_error("Couldn't start worker thread."); + } + payloads[i].started = true; + } + + for (unsigned int i = 0; i < nthreads; ++i) { + dmnsn_join_thread(threads[i], NULL); + payloads[i].started = false; if (payloads[i].ret != 0) { ret = payloads[i].ret; } - } else { - dmnsn_error("Couldn't join worker thread."); } - } + pthread_cleanup_pop(false); return ret; } @@ -262,3 +290,11 @@ dmnsn_key_delete(pthread_key_t key) dmnsn_warning("Couldn't destroy thread-specific pointer."); } } + +void +dmnsn_join_thread(pthread_t thread, void **retval) +{ + if (pthread_join(thread, retval) != 0) { + dmnsn_error("Couldn't join thread."); + } +} diff --git a/libdimension/threads.h b/libdimension/threads.h index 229acb9..ba1bb43 100644 --- a/libdimension/threads.h +++ b/libdimension/threads.h @@ -187,7 +187,13 @@ DMNSN_INTERNAL void dmnsn_key_create(pthread_key_t *key, DMNSN_INTERNAL void dmnsn_setspecific(pthread_key_t key, const void *value); /** - * Destroy a thread-local storage key, warning out on failure. + * Destroy a thread-local storage key, warning on failure. * @param[out] key The key to destroy. */ DMNSN_INTERNAL void dmnsn_key_delete(pthread_key_t key); + +/** + * Join a thread, bailing out on failure. + * @param[in,out] thread The thread to join. + */ +DMNSN_INTERNAL void dmnsn_join_thread(pthread_t thread, void **retval); -- cgit v1.2.3