diff options
-rw-r--r-- | src/ioq.c | 205 |
1 files changed, 107 insertions, 98 deletions
@@ -41,114 +41,38 @@ union ioq_cmd { */ struct ioq_monitor { cache_align pthread_mutex_t mutex; - pthread_cond_t full; - pthread_cond_t empty; + pthread_cond_t cond; }; /** Initialize an ioq_monitor. */ static int ioq_monitor_init(struct ioq_monitor *monitor) { if (mutex_init(&monitor->mutex, NULL) != 0) { - goto fail; - } - - if (cond_init(&monitor->full, NULL) != 0) { - goto fail_mutex; + return -1; } - if (cond_init(&monitor->empty, NULL) != 0) { - goto fail_full; + if (cond_init(&monitor->cond, NULL) != 0) { + mutex_destroy(&monitor->mutex); + return -1; } return 0; - -fail_full: - cond_destroy(&monitor->full); -fail_mutex: - mutex_destroy(&monitor->mutex); -fail: - return -1; } /** Destroy an ioq_monitor. */ static void ioq_monitor_destroy(struct ioq_monitor *monitor) { - cond_destroy(&monitor->empty); - cond_destroy(&monitor->full); + cond_destroy(&monitor->cond); mutex_destroy(&monitor->mutex); } /** - * A slot in an I/O queue. - */ -struct ioq_slot { - struct ioq_monitor *monitor; - union ioq_cmd *cmd; -}; - -/** Initialize an ioq_slot. */ -static void ioq_slot_init(struct ioq_slot *slot, struct ioq_monitor *monitor) { - slot->monitor = monitor; - slot->cmd = NULL; -} - -/** Push a command into a slot. */ -static void ioq_slot_push(struct ioq_slot *slot, union ioq_cmd *cmd) { - struct ioq_monitor *monitor = slot->monitor; - - mutex_lock(&monitor->mutex); - while (slot->cmd) { - cond_wait(&monitor->empty, &monitor->mutex); - } - slot->cmd = cmd; - mutex_unlock(&monitor->mutex); - - cond_broadcast(&monitor->full); -} - -/** Pop a command from a slot. */ -static union ioq_cmd *ioq_slot_pop(struct ioq_slot *slot) { - struct ioq_monitor *monitor = slot->monitor; - - mutex_lock(&monitor->mutex); - while (!slot->cmd) { - cond_wait(&monitor->full, &monitor->mutex); - } - union ioq_cmd *ret = slot->cmd; - slot->cmd = NULL; - mutex_unlock(&monitor->mutex); - - cond_broadcast(&monitor->empty); - - return ret; -} - -/** Pop a command from a slot, if one exists. */ -static union ioq_cmd *ioq_slot_trypop(struct ioq_slot *slot) { - struct ioq_monitor *monitor = slot->monitor; - - if (!mutex_trylock(&monitor->mutex)) { - return NULL; - } - - union ioq_cmd *ret = slot->cmd; - slot->cmd = NULL; - - mutex_unlock(&monitor->mutex); - - if (ret) { - cond_broadcast(&monitor->empty); - } - return ret; -} - -/** * An MPMC queue of I/O commands. */ struct ioqq { /** Circular buffer index mask. */ - size_t mask; + size_t slot_mask; - /** Number of monitors. */ - size_t nmonitors; + /** Monitor index mask. */ + size_t monitor_mask; /** Array of monitors used by the slots. */ struct ioq_monitor *monitors; @@ -158,7 +82,7 @@ struct ioqq { cache_align atomic size_t tail; /** The circular buffer itself. */ - cache_align struct ioq_slot slots[]; + cache_align atomic uintptr_t slots[]; }; // If we assign slots sequentially, threads will likely be operating on @@ -169,12 +93,16 @@ struct ioqq { // still use every available slot. Since the length is a power of two, that // means the stride must be odd. -#define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(struct ioq_slot)) | 1) +#define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(atomic uintptr_t)) | 1) bfs_static_assert(IOQ_STRIDE % 2 == 1); +/** Slot flag bit to indicate waiters. */ +#define IOQ_BLOCKED ((uintptr_t)1) +bfs_static_assert(alignof(union ioq_cmd) > 1); + /** Destroy an I/O command queue. */ static void ioqq_destroy(struct ioqq *ioqq) { - for (size_t i = 0; i < ioqq->nmonitors; ++i) { + for (size_t i = 0; i < ioqq->monitor_mask + 1; ++i) { ioq_monitor_destroy(&ioqq->monitors[i]); } free(ioqq->monitors); @@ -191,9 +119,11 @@ static struct ioqq *ioqq_create(size_t size) { return NULL; } + ioqq->slot_mask = size - 1; + ioqq->monitor_mask = -1; + // Use a pool of monitors size_t nmonitors = size < 64 ? size : 64; - ioqq->nmonitors = 0; ioqq->monitors = xmemalign(alignof(struct ioq_monitor), nmonitors * sizeof(struct ioq_monitor)); if (!ioqq->monitors) { ioqq_destroy(ioqq); @@ -205,38 +135,116 @@ static struct ioqq *ioqq_create(size_t size) { ioqq_destroy(ioqq); return NULL; } - ++ioqq->nmonitors; + ++ioqq->monitor_mask; } - ioqq->mask = size - 1; - atomic_init(&ioqq->head, 0); atomic_init(&ioqq->tail, 0); for (size_t i = 0; i < size; ++i) { - ioq_slot_init(&ioqq->slots[i], &ioqq->monitors[i % nmonitors]); + atomic_init(&ioqq->slots[i], 0); } return ioqq; } +/** Atomically wait for a slot to change. */ +static uintptr_t ioqq_wait(struct ioqq *ioqq, size_t i, uintptr_t value) { + atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; + + struct ioq_monitor *monitor = &ioqq->monitors[i & ioqq->monitor_mask]; + mutex_lock(&monitor->mutex); + + uintptr_t ret; + while ((ret = load(slot, relaxed)) == value) { + // To avoid missed wakeups, it is important that + // cond_broadcast() is not called right here + cond_wait(&monitor->cond, &monitor->mutex); + } + + mutex_unlock(&monitor->mutex); + return ret; +} + +/** Wake up any threads waiting on a slot. */ +static void ioqq_wake(struct ioqq *ioqq, size_t i) { + struct ioq_monitor *monitor = &ioqq->monitors[i & ioqq->monitor_mask]; + + // The following implementation would clearly avoid the missed wakeup + // issue mentioned above in ioqq_wait(): + // + // mutex_lock(&monitor->mutex); + // cond_broadcast(&monitor->cond); + // mutex_unlock(&monitor->mutex); + // + // As a minor optimization, we move the broadcast outside of the lock. + // This optimization is correct, even though it leads to a seemingly- + // useless empty critical section. + + mutex_lock(&monitor->mutex); + mutex_unlock(&monitor->mutex); + cond_broadcast(&monitor->cond); +} + /** Push a command onto the queue. */ static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) { size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed); - ioq_slot_push(&ioqq->slots[i & ioqq->mask], cmd); + atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; + + uintptr_t addr = (uintptr_t)cmd; + bfs_assert(!(addr & IOQ_BLOCKED)); + + uintptr_t prev = load(slot, relaxed); + do { + while (prev & ~IOQ_BLOCKED) { + prev = fetch_or(slot, IOQ_BLOCKED, relaxed); + if (prev & ~IOQ_BLOCKED) { + prev = ioqq_wait(ioqq, i, prev | IOQ_BLOCKED); + } + } + } while (!compare_exchange_weak(slot, &prev, addr, release, relaxed)); + + if (prev & IOQ_BLOCKED) { + ioqq_wake(ioqq, i); + } } /** Pop a command from a queue. */ static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) { size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed); - return ioq_slot_pop(&ioqq->slots[i & ioqq->mask]); + atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; + + uintptr_t prev = load(slot, relaxed); + do { + while (!(prev & ~IOQ_BLOCKED)) { + prev = fetch_or(slot, IOQ_BLOCKED, relaxed); + if (!(prev & ~IOQ_BLOCKED)) { + prev = ioqq_wait(ioqq, i, IOQ_BLOCKED); + } + } + } while (!compare_exchange_weak(slot, &prev, 0, acquire, relaxed)); + + if (prev & IOQ_BLOCKED) { + ioqq_wake(ioqq, i); + } + prev &= ~IOQ_BLOCKED; + + return (union ioq_cmd *)prev; } /** Pop a command from a queue if one is available. */ static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) { size_t i = load(&ioqq->tail, relaxed); - union ioq_cmd *cmd = ioq_slot_trypop(&ioqq->slots[i & ioqq->mask]); - if (cmd) { + atomic uintptr_t *slot = &ioqq->slots[i & ioqq->slot_mask]; + + uintptr_t prev = exchange(slot, 0, acquire); + + if (prev & IOQ_BLOCKED) { + ioqq_wake(ioqq, i); + } + prev &= ~IOQ_BLOCKED; + + if (prev) { #ifdef NDEBUG store(&ioqq->tail, i + IOQ_STRIDE, relaxed); #else @@ -244,7 +252,8 @@ static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) { bfs_assert(j == i, "ioqq_trypop() only supports a single consumer"); #endif } - return cmd; + + return (union ioq_cmd *)prev; } /** Sentinel stop command. */ |