summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ioq.c176
1 files changed, 127 insertions, 49 deletions
diff --git a/src/ioq.c b/src/ioq.c
index b57daba..cf0b927 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -339,17 +339,6 @@ static bool ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent
return !(prev & IOQ_SKIP);
}
-/** Push an entry onto the queue. */
-static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) {
- while (true) {
- size_t i = fetch_add(&ioqq->head, 1, relaxed);
- ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask];
- if (ioq_slot_push(ioqq, slot, ent)) {
- break;
- }
- }
-}
-
/** (Try to) pop an entry from a slot. */
static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) {
uintptr_t prev = load(slot, relaxed);
@@ -383,6 +372,32 @@ static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool bloc
return (struct ioq_ent *)(prev << 1);
}
+/** Push an entry onto the queue. */
+static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) {
+ while (true) {
+ size_t i = fetch_add(&ioqq->head, 1, relaxed);
+ ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask];
+ if (ioq_slot_push(ioqq, slot, ent)) {
+ break;
+ }
+ }
+}
+
+/** Push a batch of entries to the queue. */
+static void ioqq_push_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t size) {
+ size_t mask = ioqq->slot_mask;
+ do {
+ size_t i = fetch_add(&ioqq->head, size, relaxed);
+ for (size_t j = i + size; i != j; ++i) {
+ ioq_slot *slot = &ioqq->slots[i & mask];
+ if (ioq_slot_push(ioqq, slot, *batch)) {
+ ++batch;
+ --size;
+ }
+ }
+ } while (size > 0);
+}
+
/** Pop an entry from the queue. */
static struct ioq_ent *ioqq_pop(struct ioqq *ioqq, bool block) {
size_t i = fetch_add(&ioqq->tail, 1, relaxed);
@@ -390,6 +405,47 @@ static struct ioq_ent *ioqq_pop(struct ioqq *ioqq, bool block) {
return ioq_slot_pop(ioqq, slot, block);
}
+/** Pop a batch of entries from the queue. */
+static void ioqq_pop_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t size, bool block) {
+ size_t mask = ioqq->slot_mask;
+ size_t i = fetch_add(&ioqq->tail, size, relaxed);
+ for (size_t j = i + size; i != j; ++i) {
+ ioq_slot *slot = &ioqq->slots[i & mask];
+ *batch++ = ioq_slot_pop(ioqq, slot, block);
+ block = false;
+ }
+}
+
+/** Use cache-line-sized batches. */
+#define IOQ_BATCH (FALSE_SHARING_SIZE / sizeof(ioq_slot))
+
+/**
+ * A batch of entries to send all at once.
+ */
+struct ioq_batch {
+ /** The current batch size. */
+ size_t size;
+ /** The array of entries. */
+ struct ioq_ent *entries[IOQ_BATCH];
+};
+
+/** Send the batch to a queue. */
+static void ioq_batch_flush(struct ioqq *ioqq, struct ioq_batch *batch) {
+ if (batch->size > 0) {
+ ioqq_push_batch(ioqq, batch->entries, batch->size);
+ batch->size = 0;
+ }
+}
+
+/** An an entry to a batch, flushing if necessary. */
+static void ioq_batch_push(struct ioqq *ioqq, struct ioq_batch *batch, struct ioq_ent *ent) {
+ if (batch->size >= IOQ_BATCH) {
+ ioq_batch_flush(ioqq, batch);
+ }
+
+ batch->entries[batch->size++] = ent;
+}
+
/** Sentinel stop command. */
static struct ioq_ent IOQ_STOP;
@@ -494,28 +550,10 @@ struct ioq_ring_state {
size_t submitted;
/** Whether to stop the loop. */
bool stop;
+ /** A batch of ready entries. */
+ struct ioq_batch ready;
};
-/** Pop a request for ioq_ring_prep(). */
-static struct ioq_ent *ioq_ring_pop(struct ioq_ring_state *state) {
- if (state->stop) {
- return NULL;
- }
-
- // Block if we have nothing else to do
- bool block = !state->prepped && !state->submitted;
- struct ioqq *pending = state->ioq->pending;
- struct ioq_ent *ret = ioqq_pop(pending, block);
-
- if (ret == &IOQ_STOP) {
- ioqq_push(pending, &IOQ_STOP);
- state->stop = true;
- ret = NULL;
- }
-
- return ret;
-}
-
/** Dispatch a single request asynchronously. */
static struct io_uring_sqe *ioq_dispatch_async(struct io_uring *ring, struct ioq_ent *ent) {
struct io_uring_sqe *sqe = NULL;
@@ -557,11 +595,16 @@ static struct io_uring_sqe *ioq_dispatch_async(struct io_uring *ring, struct ioq
return NULL;
}
+/** Check if ioq_ring_reap() has work to do. */
+static bool ioq_ring_empty(struct ioq_ring_state *state) {
+ return !state->prepped && !state->submitted && !state->ready.size;
+}
+
/** Prep a single SQE. */
static void ioq_prep_sqe(struct ioq_ring_state *state, struct ioq_ent *ent) {
struct ioq *ioq = state->ioq;
if (ioq_check_cancel(ioq, ent)) {
- ioqq_push(ioq->ready, ent);
+ ioq_batch_push(ioq->ready, &state->ready, ent);
return;
}
@@ -571,24 +614,44 @@ static void ioq_prep_sqe(struct ioq_ring_state *state, struct ioq_ent *ent) {
++state->prepped;
} else {
ioq_dispatch_sync(ioq, ent);
- ioqq_push(ioq->ready, ent);
+ ioq_batch_push(ioq->ready, &state->ready, ent);
}
}
/** Prep a batch of SQEs. */
static bool ioq_ring_prep(struct ioq_ring_state *state) {
+ if (state->stop) {
+ return false;
+ }
+
+ struct ioq *ioq = state->ioq;
struct io_uring *ring = state->ring;
+ struct ioq_ent *pending[IOQ_BATCH];
+
+ while (io_uring_sq_space_left(ring) >= IOQ_BATCH) {
+ bool block = ioq_ring_empty(state);
+ ioqq_pop_batch(ioq->pending, pending, IOQ_BATCH, block);
+
+ bool any = false;
+ for (size_t i = 0; i < IOQ_BATCH; ++i) {
+ struct ioq_ent *ent = pending[i];
+ if (ent == &IOQ_STOP) {
+ ioqq_push(ioq->pending, &IOQ_STOP);
+ state->stop = true;
+ goto done;
+ } else if (ent) {
+ ioq_prep_sqe(state, ent);
+ any = true;
+ }
+ }
- while (io_uring_sq_space_left(ring)) {
- struct ioq_ent *ent = ioq_ring_pop(state);
- if (!ent) {
+ if (!any) {
break;
}
-
- ioq_prep_sqe(state, ent);
}
- return state->prepped || state->submitted;
+done:
+ return !ioq_ring_empty(state);
}
/** Reap a single CQE. */
@@ -638,11 +701,12 @@ static void ioq_reap_cqe(struct ioq_ring_state *state, struct io_uring_cqe *cqe)
}
push:
- ioqq_push(ioq->ready, ent);
+ ioq_batch_push(ioq->ready, &state->ready, ent);
}
/** Reap a batch of CQEs. */
static void ioq_ring_reap(struct ioq_ring_state *state) {
+ struct ioq *ioq = state->ioq;
struct io_uring *ring = state->ring;
while (state->prepped) {
@@ -661,6 +725,8 @@ static void ioq_ring_reap(struct ioq_ring_state *state) {
ioq_reap_cqe(state, cqe);
}
+
+ ioq_batch_flush(ioq->ready, &state->ready);
}
/** io_uring worker loop. */
@@ -681,17 +747,29 @@ static void ioq_ring_work(struct ioq_thread *thread) {
static void ioq_sync_work(struct ioq_thread *thread) {
struct ioq *ioq = thread->parent;
- while (true) {
- struct ioq_ent *ent = ioqq_pop(ioq->pending, true);
- if (ent == &IOQ_STOP) {
- ioqq_push(ioq->pending, &IOQ_STOP);
- break;
+ bool stop = false;
+ while (!stop) {
+ struct ioq_ent *pending[IOQ_BATCH];
+ ioqq_pop_batch(ioq->pending, pending, IOQ_BATCH, true);
+
+ struct ioq_batch ready;
+ ready.size = 0;
+
+ for (size_t i = 0; i < IOQ_BATCH; ++i) {
+ struct ioq_ent *ent = pending[i];
+ if (ent == &IOQ_STOP) {
+ ioqq_push(ioq->pending, &IOQ_STOP);
+ stop = true;
+ break;
+ } else if (ent) {
+ if (!ioq_check_cancel(ioq, ent)) {
+ ioq_dispatch_sync(ioq, ent);
+ }
+ ioq_batch_push(ioq->ready, &ready, ent);
+ }
}
- if (!ioq_check_cancel(ioq, ent)) {
- ioq_dispatch_sync(ioq, ent);
- }
- ioqq_push(ioq->ready, ent);
+ ioq_batch_flush(ioq->ready, &ready);
}
}