You could have invented futexes
The futex (fast userspace mutex) is a Linux kernel feature designed for synchronization primitives (mutexes, condition variables, semaphores, etc.). Like many topics in concurrency, they have a reputation for being tricky (for example, see the paper Futexes Are Tricky). Despite that, they really are a well-motivated and simple but powerful API. This post tries to explain that motivation, and even shows how to implement something similar yourself.
Spinlocks
Let's say you've found yourself implementing the C standard library (my condolences; hopefully this doesn't happen to you too often). If your libc has threads, it's also going to need locks. The easiest one to implement is probably a spinlock, so you start there:
#include <stdatomic.h>
typedef atomic_flag spinlock_t;
#define SPINLOCK_INITIALIZER ATOMIC_FLAG_INIT
void spin_lock(spinlock_t *lock) {
while (atomic_flag_test_and_set_explicit(lock, memory_order_acquire)) {
// spin...
}
}
void spin_unlock(spinlock_t *lock) {
atomic_flag_clear(lock, memory_order_release);
}
This is not the best spinlock implementation (it could be more efficient with TTAS, or more fair as a ticket lock), but it works.
Maybe you're not a memory ordering expert, but acquire
sounds right for acquiring a lock, and release
sounds right for releasing it, so that's probably correct (it is).
Mutexes
Spinlocks are great when they don't actually have to spin. But as soon as two threads are contending for the same lock, the one that loses the race starts spinning, at 100% CPU utilization, running up your power bill/killing your battery and contributing to global warming. It would be better for the spinning thread to go to sleep, using 0% CPU, until the lock gets unlocked.
Okay, but how do you make a thread "go to sleep"?
You could literally sleep(1)
, but that's way too much latency.
You could try usleep(1)
, but then you're waking up a million times a second.
Ideally, we would go to sleep indefinitely, and get whoever unlocks the lock to wake us up.
There's a few ways to implement the "go to sleep" and "wake another thread up" operations, but they all involve system calls.
You might have heard that syscalls are slow, particularly since Meltdown/
mutex_lock()
should avoid sleeping if possible, andmutex_unlock()
should avoid waking anyone up if no one's asleep
Whether any threads are asleep is a third state (in addition to locked and unlocked), so we can't use an atomic_flag
any more.
typedef atomic_int mutex_t;
enum {
UNLOCKED,
LOCKED,
SLEEPING,
};
#define MUTEX_INITIALIZER UNLOCKED
Moving from the UNLOCKED
to the LOCKED
state is cheap, so that's the first thing we try.
Otherwise, we move to the SLEEPING
state, and wait to be woken up.
void mutex_lock(mutex_t *mutex) {
int old = UNLOCKED;
if (atomic_compare_exchange_weak_explicit(
mutex, &old, LOCKED,
memory_order_acquire, // Memory order if it succeeds
memory_order_relaxed)) // Memory order if it fails
{
return;
}
while (true) {
old = atomic_exchange_explicit(mutex, SLEEPING, memory_order_acquire);
if (old == UNLOCKED) {
return;
}
// π
go_to_sleep();
}
}
void mutex_unlock(mutex_t *mutex) {
int old = atomic_exchange_explicit(mutex, UNLOCKED, memory_order_release);
if (old == SLEEPING) {
wake_someone_up();
}
}
You might have spotted a bug in this code: if one thread runs wake_someone_up()
while another is at the line marked π, before go_to_sleep()
gets called, then no one will wake up (because no one is asleep yet).
Then when go_to_sleep()
does get called, it might sleep forever, as no one else will wake it up.
This condition is known as a lost wakeup, and left untreated, would cause a deadlock.
// mutex_lock()
old = exchange(mutex, SLEEPING, acquire);
if (old == UNLOCKED) {
return;
}
// π
go_to_sleep();
// mutex_unlock()
old = exchange(mutex, UNLOCKED, release);
if (old == SLEEPING) {
wake_someone_up();
}
It's not obvious how to fix this. We could try to detect the race and avoid sleeping, but that just moves the race window, rather than closing it completely.
// mutex_lock()
old = exchange(mutex, SLEEPING, acquire);
if (old == UNLOCKED) {
return;
}
if (load(mutex, relaxed) == SLEEPING) {
// π
go_to_sleep();
}
// mutex_unlock()
old = exchange(mutex, UNLOCKED, release);
if (old == SLEEPING) {
wake_someone_up();
}
If you had an atomic version of
if (atomic_load_explicit(mutex, memory_order_relaxed) == SLEEPING) {
go_to_sleep();
}
so that the wakeup could not possibly happen between the if
and the go_to_sleep()
, that would fix the bug.
That's what a futex is!
Futexes
A minimal futex API looks something like this:
// Atomically check if `*futex == value`, and if so, go to sleep
void futex_wait(atomic_int *futex, int value);
// Wake up a thread currently waiting on `futex`
void futex_wake(atomic_int *futex);
Your mutex implementation could use them like this:
void mutex_lock(mutex_t *mutex) {
...
while (true) {
old = atomic_exchange_explicit(mutex, SLEEPING, memory_order_acquire);
if (old == UNLOCKED) {
return;
}
futex_wait(mutex, SLEEPING);
}
}
void mutex_unlock(mutex_t *mutex) {
int old = atomic_exchange_explicit(mutex, UNLOCKED, memory_order_release);
if (old == SLEEPING) {
futex_wake(mutex, 1);
}
}
This implementation is finally bug-free (I hope, anyway; after all, futexes are tricky).
You could implement futex_wait()
and futex_wake()
using the futex()
system call, but it's instructive to implement them a different way: with signals.
futex_wait()
will go to sleep until a signal arrives, and futex_wake()
will send a signal to the waiting thread(s).
To do this, we'll keep track of the waiting threads on a wait queue:
// A single waiting thread
struct waiter {
// The waiting thread
pthread_t thread;
// The futex it's waiting on
atomic_int *futex;
// A linked list of waiters.
struct waiter *prev, *next;
};
// A wait queue
struct waitq {
// Lock that protects the wait queue (can't be a mutex,
// since we're implementing mutexes)
spinlock_t lock;
// A circular linked list of waiters
struct waiter list;
};
// The global wait queue
struct waitq waitq = {
.lock = SPINLOCK_INITIALIZER,
.list = {
.prev = &waitq.list,
.next = &waitq.list,
},
};
futex_wait()
adds the calling thread to the wait queue and sleeps until a signal arrives with sigwait()
.
void futex_wait(atomic_int *futex, int value) {
spin_lock(&waitq.lock);
struct waiter *head = &waitq.list;
struct waiter waiter = {
.thread = pthread_self(),
.futex = futex,
.prev = head,
.next = head->next,
};
// Insert the waiter into the list
waiter.prev->next = &waiter;
waiter.next->prev = &waiter;
// Block the signal in the current thread so we can wait for it
sigset_t old_mask, mask;
sigemptyset(&mask);
sigaddset(&mask, SIGCONT);
pthread_sigmask(SIG_BLOCK, &mask, &old_mask);
if (atomic_load_explicit(futex, memory_order_relaxed) == value) {
// Unlock the wait queue before we sleep
spin_unlock(&waitq.lock);
// Sleep until we receive SIGCONT
int sig;
sigwait(&mask, &sig);
// Re-lock the wait queue
spin_lock(&waitq.lock);
}
// Remove ourselves from the wait queue
waiter.prev->next = waiter.next;
waiter.next->prev = waiter.prev;
// Restore the old signal mask
pthread_sigmask(SIG_SETMASK, &old_mask, NULL);
spin_unlock(&waitq.lock);
}
futex_wake()
walks the wait queue, signalling any threads waiting on the same futex:
void futex_wake(atomic_int *futex) {
spin_lock(&waitq.lock);
struct waiter *head = &waitq.list;
for (struct waiter *waiter = head->next; waiter != head; waiter = waiter->next) {
if (waiter->futex == futex) {
pthread_kill(waiter->thread, SIGCONT);
break;
}
}
spin_unlock(&waitq.lock);
}
You might think this has the same race condition as before, that if pthread_kill()
happens right before sigwait()
, the wakeup will be lost.
But in fact, because we blocked SIGCONT
first, it will remain pending, and sigwait()
will return immediately.
// futex_wait()
if (load(futex, relaxed) == value) {
spin_unlock(&waitq.lock);
int sig;
sigwait(&mask, &sig);
spin_lock(&waitq.lock);
}
// futex_wake()
spin_lock(&waitq.lock);
...
pthread_kill(waiter->thread, SIGCONT);
...
spin_unlock(&waitq.lock);
Scaling
The above implementation works, but the single wait queue could cause a lot of unnecessary contention if many threads are waiting on different futexes. We could reduce that contention by having multiple wait queues, and using a hash function to assign each futex to a (hopefully) different wait queue.
struct waitq table[TABLE_SIZE];
// Use the address of the futex to pick a wait queue
struct waitq *get_waitq(atomic_int *futex) {
size_t i = hash((uintptr_t)futex);
return &table[i % TABLE_SIZE];
}
That's basically what the actual futex implementation does. It's also what WebKit's ParkingLot does, and in general it's a very useful trick for reducing contention.