diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ioq.c | 176 |
1 files changed, 127 insertions, 49 deletions
@@ -339,17 +339,6 @@ static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent return !(prev & IOQ_SKIP); } -/** Push an entry onto the queue. */ -static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) { - while (true) { - size_t i = fetch_add(&ioqq->head, 1, relaxed); - ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask]; - if (ioq_slot_push(ioqq, slot, ent)) { - break; - } - } -} - /** (Try to) pop an entry from a slot. */ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) { uintptr_t prev = load(slot, relaxed); @@ -383,6 +372,32 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc return (struct ioq_ent *)(prev << 1); } +/** Push an entry onto the queue. */ +static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) { + while (true) { + size_t i = fetch_add(&ioqq->head, 1, relaxed); + ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask]; + if (ioq_slot_push(ioqq, slot, ent)) { + break; + } + } +} + +/** Push a batch of entries to the queue. */ +static void ioqq_push_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t size) { + size_t mask = ioqq->slot_mask; + do { + size_t i = fetch_add(&ioqq->head, size, relaxed); + for (size_t j = i + size; i != j; ++i) { + ioq_slot *slot = &ioqq->slots[i & mask]; + if (ioq_slot_push(ioqq, slot, *batch)) { + ++batch; + --size; + } + } + } while (size > 0); +} + /** Pop an entry from the queue. */ static struct ioq_ent *ioqq_pop(struct ioqq *ioqq, bool block) { size_t i = fetch_add(&ioqq->tail, 1, relaxed); @@ -390,6 +405,47 @@ static struct ioq_ent *ioqq_pop(struct ioqq *ioqq, bool block) { return ioq_slot_pop(ioqq, slot, block); } +/** Pop a batch of entries from the queue. */ +static void ioqq_pop_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t size, bool block) { + size_t mask = ioqq->slot_mask; + size_t i = fetch_add(&ioqq->tail, size, relaxed); + for (size_t j = i + size; i != j; ++i) { + ioq_slot *slot = &ioqq->slots[i & mask]; + *batch++ = ioq_slot_pop(ioqq, slot, block); + block = false; + } +} + +/** Use cache-line-sized batches. */ +#define IOQ_BATCH (FALSE_SHARING_SIZE / sizeof(ioq_slot)) + +/** + * A batch of entries to send all at once. + */ +struct ioq_batch { + /** The current batch size. */ + size_t size; + /** The array of entries. */ + struct ioq_ent *entries[IOQ_BATCH]; +}; + +/** Send the batch to a queue. */ +static void ioq_batch_flush(struct ioqq *ioqq, struct ioq_batch *batch) { + if (batch->size > 0) { + ioqq_push_batch(ioqq, batch->entries, batch->size); + batch->size = 0; + } +} + +/** An an entry to a batch, flushing if necessary. */ +static void ioq_batch_push(struct ioqq *ioqq, struct ioq_batch *batch, struct ioq_ent *ent) { + if (batch->size >= IOQ_BATCH) { + ioq_batch_flush(ioqq, batch); + } + + batch->entries[batch->size++] = ent; +} + /** Sentinel stop command. */ static struct ioq_ent IOQ_STOP; @@ -494,28 +550,10 @@ struct ioq_ring_state { size_t submitted; /** Whether to stop the loop. */ bool stop; + /** A batch of ready entries. */ + struct ioq_batch ready; }; -/** Pop a request for ioq_ring_prep(). */ -static struct ioq_ent *ioq_ring_pop(struct ioq_ring_state *state) { - if (state->stop) { - return NULL; - } - - // Block if we have nothing else to do - bool block = !state->prepped && !state->submitted; - struct ioqq *pending = state->ioq->pending; - struct ioq_ent *ret = ioqq_pop(pending, block); - - if (ret == &IOQ_STOP) { - ioqq_push(pending, &IOQ_STOP); - state->stop = true; - ret = NULL; - } - - return ret; -} - /** Dispatch a single request asynchronously. */ static struct io_uring_sqe *ioq_dispatch_async(struct io_uring *ring, struct ioq_ent *ent) { struct io_uring_sqe *sqe = NULL; @@ -557,11 +595,16 @@ static struct io_uring_sqe *ioq_dispatch_async(struct io_uring *ring, struct ioq return NULL; } +/** Check if ioq_ring_reap() has work to do. */ +static bool ioq_ring_empty(struct ioq_ring_state *state) { + return !state->prepped && !state->submitted && !state->ready.size; +} + /** Prep a single SQE. */ static void ioq_prep_sqe(struct ioq_ring_state *state, struct ioq_ent *ent) { struct ioq *ioq = state->ioq; if (ioq_check_cancel(ioq, ent)) { - ioqq_push(ioq->ready, ent); + ioq_batch_push(ioq->ready, &state->ready, ent); return; } @@ -571,24 +614,44 @@ static void ioq_prep_sqe(struct ioq_ring_state *state, struct ioq_ent *ent) { ++state->prepped; } else { ioq_dispatch_sync(ioq, ent); - ioqq_push(ioq->ready, ent); + ioq_batch_push(ioq->ready, &state->ready, ent); } } /** Prep a batch of SQEs. */ static bool ioq_ring_prep(struct ioq_ring_state *state) { + if (state->stop) { + return false; + } + + struct ioq *ioq = state->ioq; struct io_uring *ring = state->ring; + struct ioq_ent *pending[IOQ_BATCH]; + + while (io_uring_sq_space_left(ring) >= IOQ_BATCH) { + bool block = ioq_ring_empty(state); + ioqq_pop_batch(ioq->pending, pending, IOQ_BATCH, block); + + bool any = false; + for (size_t i = 0; i < IOQ_BATCH; ++i) { + struct ioq_ent *ent = pending[i]; + if (ent == &IOQ_STOP) { + ioqq_push(ioq->pending, &IOQ_STOP); + state->stop = true; + goto done; + } else if (ent) { + ioq_prep_sqe(state, ent); + any = true; + } + } - while (io_uring_sq_space_left(ring)) { - struct ioq_ent *ent = ioq_ring_pop(state); - if (!ent) { + if (!any) { break; } - - ioq_prep_sqe(state, ent); } - return state->prepped || state->submitted; +done: + return !ioq_ring_empty(state); } /** Reap a single CQE. */ @@ -638,11 +701,12 @@ static void ioq_reap_cqe(struct ioq_ring_state *state, struct io_uring_cqe *cqe) } push: - ioqq_push(ioq->ready, ent); + ioq_batch_push(ioq->ready, &state->ready, ent); } /** Reap a batch of CQEs. */ static void ioq_ring_reap(struct ioq_ring_state *state) { + struct ioq *ioq = state->ioq; struct io_uring *ring = state->ring; while (state->prepped) { @@ -661,6 +725,8 @@ static void ioq_ring_reap(struct ioq_ring_state *state) { ioq_reap_cqe(state, cqe); } + + ioq_batch_flush(ioq->ready, &state->ready); } /** io_uring worker loop. */ @@ -681,17 +747,29 @@ static void ioq_ring_work(struct ioq_thread *thread) { static void ioq_sync_work(struct ioq_thread *thread) { struct ioq *ioq = thread->parent; - while (true) { - struct ioq_ent *ent = ioqq_pop(ioq->pending, true); - if (ent == &IOQ_STOP) { - ioqq_push(ioq->pending, &IOQ_STOP); - break; + bool stop = false; + while (!stop) { + struct ioq_ent *pending[IOQ_BATCH]; + ioqq_pop_batch(ioq->pending, pending, IOQ_BATCH, true); + + struct ioq_batch ready; + ready.size = 0; + + for (size_t i = 0; i < IOQ_BATCH; ++i) { + struct ioq_ent *ent = pending[i]; + if (ent == &IOQ_STOP) { + ioqq_push(ioq->pending, &IOQ_STOP); + stop = true; + break; + } else if (ent) { + if (!ioq_check_cancel(ioq, ent)) { + ioq_dispatch_sync(ioq, ent); + } + ioq_batch_push(ioq->ready, &ready, ent); + } } - if (!ioq_check_cancel(ioq, ent)) { - ioq_dispatch_sync(ioq, ent); - } - ioqq_push(ioq->ready, ent); + ioq_batch_flush(ioq->ready, &ready); } } |