diff options
-rw-r--r-- | src/async/async_api.c | 33 | ||||
-rw-r--r-- | src/async/async_worker.c | 33 | ||||
-rw-r--r-- | src/conn/conn_async.c | 2 | ||||
-rw-r--r-- | src/include/async.h | 23 |
4 files changed, 64 insertions, 27 deletions
diff --git a/src/async/async_api.c b/src/async/async_api.c index d69e11fc3c1..c1af49363e5 100644 --- a/src/async/async_api.c +++ b/src/async/async_api.c @@ -219,7 +219,10 @@ __wt_async_op_enqueue(WT_CONNECTION_IMPL *conn, WT_ASYNC_OP_IMPL *op) { WT_ASYNC *async; WT_DECL_RET; - uint64_t my_alloc, my_slot; + uint64_t cur_head, cur_tail, my_alloc, my_slot; +#ifdef HAVE_DIAGNOSTIC + WT_ASYNC_OP_IMPL *my_op; +#endif async = conn->async; /* @@ -229,21 +232,37 @@ __wt_async_op_enqueue(WT_CONNECTION_IMPL *conn, WT_ASYNC_OP_IMPL *op) /* * We get our slot in the ring buffer to use. */ - WT_WRITE_BARRIER(); my_alloc = WT_ATOMIC_ADD(async->alloc_head, 1); my_slot = my_alloc % async->async_qsize; - WT_ASSERT(conn->default_session, async->async_queue[my_slot] == NULL); - async->async_queue[my_slot] = op; + + /* + * Make sure we haven't wrapped around the queue. + * If so, wait for the tail to advance off this slot. + */ + WT_ORDERED_READ(cur_tail, async->tail_slot); + while (cur_tail == my_slot) { + __wt_yield(); + WT_ORDERED_READ(cur_tail, async->tail_slot); + } + +#ifdef HAVE_DIAGNOSTIC + WT_ORDERED_READ(my_op, async->async_queue[my_slot]); + if (my_op != NULL) + __wt_panic(conn->default_session); +#endif + WT_PUBLISH(async->async_queue[my_slot], op); op->state = WT_ASYNCOP_ENQUEUED; - WT_WRITE_BARRIER(); if (WT_ATOMIC_ADD(async->cur_queue, 1) > async->max_queue) - async->max_queue = async->cur_queue; + WT_PUBLISH(async->max_queue, async->cur_queue); /* * Multiple threads may be adding ops to the queue. We need to wait * our turn to make our slot visible to workers. */ - while (async->head != (my_alloc - 1)) + WT_ORDERED_READ(cur_head, async->head); + while (cur_head != (my_alloc - 1)) { __wt_yield(); + WT_ORDERED_READ(cur_head, async->head); + } WT_PUBLISH(async->head, my_alloc); return (ret); } diff --git a/src/async/async_worker.c b/src/async/async_worker.c index 2110244cd2f..3240f7845da 100644 --- a/src/async/async_worker.c +++ b/src/async/async_worker.c @@ -17,7 +17,7 @@ __async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL **op) { WT_ASYNC *async; - uint64_t my_slot, old_tail; + uint64_t cur_tail, last_consume, my_consume, my_slot, prev_slot; uint32_t max_tries, tries; async = conn->async; @@ -29,11 +29,14 @@ __async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session, * Then grab the slot containing the work. If we lose, try again. */ retry: - WT_READ_BARRIER(); - while ((old_tail = async->tail) == async->head && - ++tries < max_tries && - async->flush_state != WT_ASYNC_FLUSHING) + WT_ORDERED_READ(last_consume, async->alloc_tail); + while (last_consume == async->head && ++tries < max_tries && + async->flush_state != WT_ASYNC_FLUSHING) { __wt_yield(); + WT_ORDERED_READ(last_consume, async->alloc_tail); + } + if (F_ISSET(conn, WT_CONN_PANIC)) + __wt_panic(session); if (tries >= max_tries || async->flush_state == WT_ASYNC_FLUSHING) return (0); @@ -41,16 +44,20 @@ retry: * Try to increment the tail to claim this slot. If we lose * a race, try again. */ - WT_WRITE_BARRIER(); - if (!WT_ATOMIC_CAS(async->tail, old_tail, old_tail + 1)) + my_consume = last_consume + 1; + if (!WT_ATOMIC_CAS(async->alloc_tail, last_consume, my_consume)) goto retry; - my_slot = (old_tail + 1) % async->async_qsize; + /* + * This item of work is ours to process. Clear it out of the + * queue and return. + */ + my_slot = my_consume % async->async_qsize; + prev_slot = last_consume % async->async_qsize; *op = WT_ATOMIC_STORE(async->async_queue[my_slot], NULL); WT_ASSERT(session, async->cur_queue > 0); WT_ASSERT(session, *op != NULL); WT_ASSERT(session, (*op)->state == WT_ASYNCOP_ENQUEUED); - WT_WRITE_BARRIER(); WT_ATOMIC_SUB(async->cur_queue, 1); (*op)->state = WT_ASYNCOP_WORKING; @@ -59,6 +66,12 @@ retry: * We're the worker to take the flush op off the queue. */ WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSHING); + WT_ORDERED_READ(cur_tail, async->tail_slot); + while (cur_tail != prev_slot) { + __wt_yield(); + WT_ORDERED_READ(cur_tail, async->tail_slot); + } + WT_PUBLISH(async->tail_slot, my_slot); return (0); } @@ -213,8 +226,8 @@ __async_worker_op(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, * the op back to the free pool and reset our cached cursor. * We do this regardless of success or failure. */ - op->state = WT_ASYNCOP_FREE; F_CLR(&asyncop->c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); + WT_PUBLISH(op->state, WT_ASYNCOP_FREE); cursor->reset(cursor); return (ret); } diff --git a/src/conn/conn_async.c b/src/conn/conn_async.c index b45e0aa4f3c..9e134ce7352 100644 --- a/src/conn/conn_async.c +++ b/src/conn/conn_async.c @@ -116,7 +116,7 @@ __async_new_op_alloc(WT_CONNECTION_IMPL *conn, const char *uri, *opp = NULL; retry: ret = 0; - save_i = async->ops_index; + WT_ORDERED_READ(save_i, async->ops_index); /* * Look after the last one allocated for a free one. We'd expect * ops to be freed mostly FIFO so we should quickly find one. diff --git a/src/include/async.h b/src/include/async.h index a8760319bac..d8f0b68f766 100644 --- a/src/include/async.h +++ b/src/include/async.h @@ -73,19 +73,24 @@ struct __wt_async { #define OPS_INVALID_INDEX 0xffffffff uint32_t ops_index; /* Active slot index */ uint64_t op_id; /* Unique ID counter */ - /* - * We need to have two head pointers. One that is allocated to - * enqueue threads and one that indicates ready slots to the - * consumers. We only need one tail pointer to coordinate - * consuming threads against each other. We don't need to worry - * about head catch up with tail because we know the queue is - * large enough for all possible work. - */ WT_ASYNC_OP_IMPL **async_queue; /* Async ops work queue */ uint32_t async_qsize; /* Async work queue size */ + /* + * We need to have two head and tail values. All but one is + * maintained as an ever increasing value to ease wrap around. + * + * alloc_head: the next one to allocate for producers. + * head: the current head visible to consumers. + * head is always <= alloc_head. + * alloc_tail: the next slot for consumers to dequeue. + * alloc_tail is always <= head. + * tail_slot: the last slot consumed. + * A producer may need wait for tail_slot to advance. + */ uint64_t alloc_head; /* Next slot to enqueue */ uint64_t head; /* Head visible to worker */ - uint64_t tail; /* Worker consumed tail */ + uint64_t alloc_tail; /* Next slot to dequeue */ + uint64_t tail_slot; /* Worker slot consumed */ STAILQ_HEAD(__wt_async_format_qh, __wt_async_format) formatqh; int cur_queue; /* Currently enqueued */ |