summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/async/async_api.c33
-rw-r--r--src/async/async_worker.c33
-rw-r--r--src/conn/conn_async.c2
-rw-r--r--src/include/async.h23
4 files changed, 64 insertions, 27 deletions
diff --git a/src/async/async_api.c b/src/async/async_api.c
index d69e11fc3c1..c1af49363e5 100644
--- a/src/async/async_api.c
+++ b/src/async/async_api.c
@@ -219,7 +219,10 @@ __wt_async_op_enqueue(WT_CONNECTION_IMPL *conn, WT_ASYNC_OP_IMPL *op)
{
WT_ASYNC *async;
WT_DECL_RET;
- uint64_t my_alloc, my_slot;
+ uint64_t cur_head, cur_tail, my_alloc, my_slot;
+#ifdef HAVE_DIAGNOSTIC
+ WT_ASYNC_OP_IMPL *my_op;
+#endif
async = conn->async;
/*
@@ -229,21 +232,37 @@ __wt_async_op_enqueue(WT_CONNECTION_IMPL *conn, WT_ASYNC_OP_IMPL *op)
/*
* We get our slot in the ring buffer to use.
*/
- WT_WRITE_BARRIER();
my_alloc = WT_ATOMIC_ADD(async->alloc_head, 1);
my_slot = my_alloc % async->async_qsize;
- WT_ASSERT(conn->default_session, async->async_queue[my_slot] == NULL);
- async->async_queue[my_slot] = op;
+
+ /*
+ * Make sure we haven't wrapped around the queue.
+ * If so, wait for the tail to advance off this slot.
+ */
+ WT_ORDERED_READ(cur_tail, async->tail_slot);
+ while (cur_tail == my_slot) {
+ __wt_yield();
+ WT_ORDERED_READ(cur_tail, async->tail_slot);
+ }
+
+#ifdef HAVE_DIAGNOSTIC
+ WT_ORDERED_READ(my_op, async->async_queue[my_slot]);
+ if (my_op != NULL)
+ __wt_panic(conn->default_session);
+#endif
+ WT_PUBLISH(async->async_queue[my_slot], op);
op->state = WT_ASYNCOP_ENQUEUED;
- WT_WRITE_BARRIER();
if (WT_ATOMIC_ADD(async->cur_queue, 1) > async->max_queue)
- async->max_queue = async->cur_queue;
+ WT_PUBLISH(async->max_queue, async->cur_queue);
/*
* Multiple threads may be adding ops to the queue. We need to wait
* our turn to make our slot visible to workers.
*/
- while (async->head != (my_alloc - 1))
+ WT_ORDERED_READ(cur_head, async->head);
+ while (cur_head != (my_alloc - 1)) {
__wt_yield();
+ WT_ORDERED_READ(cur_head, async->head);
+ }
WT_PUBLISH(async->head, my_alloc);
return (ret);
}
diff --git a/src/async/async_worker.c b/src/async/async_worker.c
index 2110244cd2f..3240f7845da 100644
--- a/src/async/async_worker.c
+++ b/src/async/async_worker.c
@@ -17,7 +17,7 @@ __async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session,
WT_ASYNC_OP_IMPL **op)
{
WT_ASYNC *async;
- uint64_t my_slot, old_tail;
+ uint64_t cur_tail, last_consume, my_consume, my_slot, prev_slot;
uint32_t max_tries, tries;
async = conn->async;
@@ -29,11 +29,14 @@ __async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session,
* Then grab the slot containing the work. If we lose, try again.
*/
retry:
- WT_READ_BARRIER();
- while ((old_tail = async->tail) == async->head &&
- ++tries < max_tries &&
- async->flush_state != WT_ASYNC_FLUSHING)
+ WT_ORDERED_READ(last_consume, async->alloc_tail);
+ while (last_consume == async->head && ++tries < max_tries &&
+ async->flush_state != WT_ASYNC_FLUSHING) {
__wt_yield();
+ WT_ORDERED_READ(last_consume, async->alloc_tail);
+ }
+ if (F_ISSET(conn, WT_CONN_PANIC))
+ __wt_panic(session);
if (tries >= max_tries ||
async->flush_state == WT_ASYNC_FLUSHING)
return (0);
@@ -41,16 +44,20 @@ retry:
* Try to increment the tail to claim this slot. If we lose
* a race, try again.
*/
- WT_WRITE_BARRIER();
- if (!WT_ATOMIC_CAS(async->tail, old_tail, old_tail + 1))
+ my_consume = last_consume + 1;
+ if (!WT_ATOMIC_CAS(async->alloc_tail, last_consume, my_consume))
goto retry;
- my_slot = (old_tail + 1) % async->async_qsize;
+ /*
+ * This item of work is ours to process. Clear it out of the
+ * queue and return.
+ */
+ my_slot = my_consume % async->async_qsize;
+ prev_slot = last_consume % async->async_qsize;
*op = WT_ATOMIC_STORE(async->async_queue[my_slot], NULL);
WT_ASSERT(session, async->cur_queue > 0);
WT_ASSERT(session, *op != NULL);
WT_ASSERT(session, (*op)->state == WT_ASYNCOP_ENQUEUED);
- WT_WRITE_BARRIER();
WT_ATOMIC_SUB(async->cur_queue, 1);
(*op)->state = WT_ASYNCOP_WORKING;
@@ -59,6 +66,12 @@ retry:
* We're the worker to take the flush op off the queue.
*/
WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSHING);
+ WT_ORDERED_READ(cur_tail, async->tail_slot);
+ while (cur_tail != prev_slot) {
+ __wt_yield();
+ WT_ORDERED_READ(cur_tail, async->tail_slot);
+ }
+ WT_PUBLISH(async->tail_slot, my_slot);
return (0);
}
@@ -213,8 +226,8 @@ __async_worker_op(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op,
* the op back to the free pool and reset our cached cursor.
* We do this regardless of success or failure.
*/
- op->state = WT_ASYNCOP_FREE;
F_CLR(&asyncop->c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
+ WT_PUBLISH(op->state, WT_ASYNCOP_FREE);
cursor->reset(cursor);
return (ret);
}
diff --git a/src/conn/conn_async.c b/src/conn/conn_async.c
index b45e0aa4f3c..9e134ce7352 100644
--- a/src/conn/conn_async.c
+++ b/src/conn/conn_async.c
@@ -116,7 +116,7 @@ __async_new_op_alloc(WT_CONNECTION_IMPL *conn, const char *uri,
*opp = NULL;
retry:
ret = 0;
- save_i = async->ops_index;
+ WT_ORDERED_READ(save_i, async->ops_index);
/*
* Look after the last one allocated for a free one. We'd expect
* ops to be freed mostly FIFO so we should quickly find one.
diff --git a/src/include/async.h b/src/include/async.h
index a8760319bac..d8f0b68f766 100644
--- a/src/include/async.h
+++ b/src/include/async.h
@@ -73,19 +73,24 @@ struct __wt_async {
#define OPS_INVALID_INDEX 0xffffffff
uint32_t ops_index; /* Active slot index */
uint64_t op_id; /* Unique ID counter */
- /*
- * We need to have two head pointers. One that is allocated to
- * enqueue threads and one that indicates ready slots to the
- * consumers. We only need one tail pointer to coordinate
- * consuming threads against each other. We don't need to worry
- * about head catch up with tail because we know the queue is
- * large enough for all possible work.
- */
WT_ASYNC_OP_IMPL **async_queue; /* Async ops work queue */
uint32_t async_qsize; /* Async work queue size */
+ /*
+ * We need to have two head and tail values. All but one is
+ * maintained as an ever increasing value to ease wrap around.
+ *
+ * alloc_head: the next one to allocate for producers.
+ * head: the current head visible to consumers.
+ * head is always <= alloc_head.
+ * alloc_tail: the next slot for consumers to dequeue.
+ * alloc_tail is always <= head.
+ * tail_slot: the last slot consumed.
+ * A producer may need wait for tail_slot to advance.
+ */
uint64_t alloc_head; /* Next slot to enqueue */
uint64_t head; /* Head visible to worker */
- uint64_t tail; /* Worker consumed tail */
+ uint64_t alloc_tail; /* Next slot to dequeue */
+ uint64_t tail_slot; /* Worker slot consumed */
STAILQ_HEAD(__wt_async_format_qh, __wt_async_format) formatqh;
int cur_queue; /* Currently enqueued */