diff options
-rw-r--r-- | examples/c/ex_async.c | 3 | ||||
-rw-r--r-- | src/async/async_api.c | 38 | ||||
-rw-r--r-- | src/async/async_worker.c | 139 | ||||
-rw-r--r-- | src/conn/conn_async.c | 26 | ||||
-rw-r--r-- | src/include/async.h | 18 | ||||
-rw-r--r-- | src/include/extern.h | 3 |
6 files changed, 106 insertions, 121 deletions
diff --git a/examples/c/ex_async.c b/examples/c/ex_async.c index c185fdbe9c6..8e29bcf93e8 100644 --- a/examples/c/ex_async.c +++ b/examples/c/ex_async.c @@ -109,7 +109,8 @@ retry: * If we used up all the ops, pause and retry to * give the workers a chance to process them. */ - fprintf(stderr, "Iteration %d: table 1 ret %d\n",i,ret); + fprintf(stderr, + "Iteration %d: async_new_op ret %d\n",i,ret); sleep(1); goto retry; } diff --git a/src/async/async_api.c b/src/async/async_api.c index ed8e3b74245..f759901ace5 100644 --- a/src/async/async_api.c +++ b/src/async/async_api.c @@ -74,7 +74,7 @@ static int __async_op_wrap(WT_ASYNC_OP_IMPL *op, WT_ASYNC_OPTYPE type) { op->optype = type; - return (__wt_async_op_enqueue(O2C(op), op, 0)); + return (__wt_async_op_enqueue(O2C(op), op)); } /* @@ -211,35 +211,34 @@ __async_op_init(WT_CONNECTION_IMPL *conn, WT_ASYNC_OP_IMPL *op, uint32_t id) * Enqueue an operation onto the work queue. */ int -__wt_async_op_enqueue(WT_CONNECTION_IMPL *conn, - WT_ASYNC_OP_IMPL *op, int locked) +__wt_async_op_enqueue(WT_CONNECTION_IMPL *conn, WT_ASYNC_OP_IMPL *op) { WT_ASYNC *async; WT_DECL_RET; + uint64_t my_alloc, my_slot; async = conn->async; - if (!locked) - __wt_spin_lock(conn->default_session, &async->opsq_lock); /* * Enqueue op at the tail of the work queue. */ WT_ASSERT(conn->default_session, op->state == WT_ASYNCOP_READY); - STAILQ_INSERT_TAIL(&async->opqh, op, q); + /* + * We get our slot in the ring buffer to use. + */ + my_alloc = WT_ATOMIC_ADD(async->alloc_head, 1); + my_slot = my_alloc % conn->async_size; + WT_ASSERT(conn->default_session, async->async_queue[my_slot] == NULL); + async->async_queue[my_slot] = op; op->state = WT_ASYNCOP_ENQUEUED; if (++async->cur_queue > async->max_queue) async->max_queue = async->cur_queue; /* - * Signal the worker threads something is on the queue. - */ - __wt_spin_unlock(conn->default_session, &async->opsq_lock); -#if 0 - WT_ERR(__wt_cond_signal(conn->default_session, async->ops_cond)); -#endif - /* - * Relock if we need to for the caller. + * Multiple threads may be adding ops to the queue. We need to wait + * our turn to make our slot visible to workers. */ -err: if (locked) - __wt_spin_lock(conn->default_session, &async->opsq_lock); + while (async->head != (my_alloc - 1)) + __wt_yield(); + WT_ATOMIC_STORE(async->head, my_alloc); return (ret); } @@ -261,6 +260,13 @@ __wt_async_op_init(WT_CONNECTION_IMPL *conn) __async_op_init(conn, &async->flush_op, OPS_INVALID_INDEX); /* + * Allocate and initialize the work queue. This is sized so that + * the ring buffer is known to be big enough such that the head + * can never overlap the tail. Include 1 extra for the flush op. + */ + WT_RET(__wt_calloc_def(conn->default_session, + conn->async_size + 1, &async->async_queue)); + /* * Allocate and initialize all the user ops. */ WT_RET(__wt_calloc_def(conn->default_session, diff --git a/src/async/async_worker.c b/src/async/async_worker.c index 8d7dd5c1b24..0b2ccfaa224 100644 --- a/src/async/async_worker.c +++ b/src/async/async_worker.c @@ -8,28 +8,74 @@ #include "wt_internal.h" /* + * __async_op_dequeue -- + * Wait for work to be available. Then atomically take it off + * the work queue. + */ +static int +__async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session, + WT_ASYNC_OP_IMPL **op) +{ + WT_ASYNC *async; + uint64_t my_slot, old_tail; + uint32_t max_tries, tries; + + async = conn->async; + *op = NULL; + max_tries = 100; + tries = 0; + /* + * Wait for work to do. Work is available when async->head moves. + * Then grab the slot containing the work. If we lose, try again. + */ +retry: + while ((old_tail = async->tail) == async->head && + ++tries < max_tries && + !FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) + __wt_yield(); + if (tries >= max_tries || + FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) + return (0); + /* + * Try to increment the tail to claim this slot. If we lose + * a race, try again. + */ + if (!WT_ATOMIC_CAS(async->tail, old_tail, old_tail + 1)) + goto retry; + my_slot = (old_tail + 1) % conn->async_size; + *op = WT_ATOMIC_STORE(async->async_queue[my_slot], NULL); + + WT_ASSERT(session, async->cur_queue > 0); + WT_ASSERT(session, *op != NULL); + WT_ASSERT(session, (*op)->state == WT_ASYNCOP_ENQUEUED); + --async->cur_queue; + (*op)->state = WT_ASYNCOP_WORKING; + + if (*op == &async->flush_op) { + WT_ASSERT(session, FLD_ISSET(async->opsq_flush, + WT_ASYNC_FLUSH_IN_PROGRESS)); + /* + * We're the worker to take the flush op off the queue. + * Set the flushing flag and set count to 1. + */ + FLD_SET(async->opsq_flush, WT_ASYNC_FLUSHING); + WT_WRITE_BARRIER(); + } + return (0); +} + +/* * __async_flush_wait -- * Wait for the final worker to finish flushing. - * Assumes it is called with spinlock held and returns it locked. */ static int -__async_flush_wait(WT_SESSION_IMPL *session, WT_ASYNC *async, int *locked) +__async_flush_wait(WT_SESSION_IMPL *session, WT_ASYNC *async) { WT_DECL_RET; - /* - * We change the caller's locked setting so that if we return an - * error from this function the caller can properly unlock or not - * as needed. - */ - while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) { - __wt_spin_unlock(session, &async->opsq_lock); - *locked= 0; + while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) WT_ERR_TIMEDOUT_OK( __wt_cond_wait(session, async->flush_cond, 10000)); - __wt_spin_lock(session, &async->opsq_lock); - *locked= 1; - } err: return (ret); } @@ -166,8 +212,8 @@ __async_worker_op(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, /* * After the callback returns, and the transaction resolved release * the op back to the free pool and reset our cached cursor. + * We do this regardless of success or failure. */ - ret = 0; op->state = WT_ASYNCOP_FREE; F_CLR(&asyncop->c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); cursor->reset(cursor); @@ -188,19 +234,18 @@ __wt_async_worker(void *arg) WT_CONNECTION_IMPL *conn; WT_DECL_RET; WT_SESSION_IMPL *session; - int locked; session = arg; conn = S2C(session); async = conn->async; - locked = 0; worker.num_cursors = 0; STAILQ_INIT(&worker.cursorqh); while (F_ISSET(conn, WT_CONN_SERVER_RUN)) { - __wt_spin_lock(session, &async->opsq_lock); - locked = 1; - if (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) { + WT_ERR(__async_op_dequeue(conn, session, &op)); + if (op != NULL && op != &async->flush_op) + WT_ERR(__async_worker_op(session, op, &worker)); + else if (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) { /* * Worker flushing going on. Last worker to the party * needs to clear the FLUSHING flag and signal the cond. @@ -218,74 +263,20 @@ __wt_async_worker(void *arg) FLD_SET(async->opsq_flush, WT_ASYNC_FLUSH_COMPLETE); FLD_CLR(async->opsq_flush, WT_ASYNC_FLUSHING); - __wt_spin_unlock(session, &async->opsq_lock); - locked = 0; WT_ERR(__wt_cond_signal(session, async->flush_cond)); - __wt_spin_lock(session, &async->opsq_lock); - locked = 1; } else { /* * We need to wait for the last worker to * signal the condition. */ - WT_ERR(__async_flush_wait( - session, async, &locked)); + WT_ERR(__async_flush_wait(session, async)); } } - /* - * Get next op. We get here with the opsq lock held. - * Remove from the head of the queue. - */ - op = STAILQ_FIRST(&async->opqh); - if (op == NULL) { - __wt_spin_unlock(session, &async->opsq_lock); - locked = 0; - goto wait_for_work; - } - - /* - * There is work to do. - */ - STAILQ_REMOVE_HEAD(&async->opqh, q); - WT_ASSERT(session, async->cur_queue > 0); - --async->cur_queue; - WT_ASSERT(session, op->state == WT_ASYNCOP_ENQUEUED); - op->state = WT_ASYNCOP_WORKING; - if (op == &async->flush_op) { - WT_ASSERT(session, FLD_ISSET(async->opsq_flush, - WT_ASYNC_FLUSH_IN_PROGRESS)); - /* - * We're the worker to take the flush op off the queue. - * Set the flushing flag and set count to 1. - */ - FLD_SET(async->opsq_flush, WT_ASYNC_FLUSHING); - async->flush_count = 1; - WT_ERR(__async_flush_wait(session, async, &locked)); - } - /* - * Release the lock before performing the op. - */ - __wt_spin_unlock(session, &async->opsq_lock); - locked = 0; - if (op != &async->flush_op) - (void)__async_worker_op(session, op, &worker); - -wait_for_work: - WT_ASSERT(session, locked == 0); - /* Wait until the next event. */ -#if 0 - WT_ERR_TIMEDOUT_OK( - __wt_cond_wait(session, async->ops_cond, 100000)); -#else - __wt_yield(); -#endif } if (0) { err: __wt_err(session, ret, "async worker error"); - if (locked) - __wt_spin_unlock(session, &async->opsq_lock); } /* * Worker thread cleanup, close our cached cursors and diff --git a/src/conn/conn_async.c b/src/conn/conn_async.c index 3fafde17815..34242db6626 100644 --- a/src/conn/conn_async.c +++ b/src/conn/conn_async.c @@ -10,7 +10,7 @@ /* * __async_get_format -- * Find or allocate the uri/config/format structure. - * Called with the async opsq lock. + * Called with the async ops lock. */ static int __async_get_format(WT_CONNECTION_IMPL *conn, const char *uri, @@ -235,10 +235,7 @@ __wt_async_create(WT_CONNECTION_IMPL *conn, const char *cfg[]) WT_RET(__wt_calloc(session, 1, sizeof(WT_ASYNC), &conn->async)); async = conn->async; STAILQ_INIT(&async->formatqh); - STAILQ_INIT(&async->opqh); WT_RET(__wt_spin_init(session, &async->ops_lock, "ops")); - WT_RET(__wt_spin_init(session, &async->opsq_lock, "ops queue")); - WT_RET(__wt_cond_alloc(session, "async op", 0, &async->ops_cond)); WT_RET(__wt_cond_alloc(session, "async flush", 0, &async->flush_cond)); WT_RET(__wt_async_op_init(conn)); @@ -283,17 +280,12 @@ __wt_async_destroy(WT_CONNECTION_IMPL *conn) if (!conn->async_cfg) return (0); - for (i = 0; i < conn->async_workers; i++) if (async->worker_tids[i] != 0) { -#if 0 - WT_TRET(__wt_cond_signal(session, async->ops_cond)); -#endif WT_TRET(__wt_thread_join( session, async->worker_tids[i])); async->worker_tids[i] = 0; } - WT_TRET(__wt_cond_destroy(session, &async->ops_cond)); WT_TRET(__wt_cond_destroy(session, &async->flush_cond)); /* Close the server threads' sessions. */ @@ -316,7 +308,6 @@ __wt_async_destroy(WT_CONNECTION_IMPL *conn) } __wt_free(session, async->async_ops); __wt_spin_destroy(session, &async->ops_lock); - __wt_spin_destroy(session, &async->opsq_lock); __wt_free(session, conn->async); return (ret); @@ -347,18 +338,14 @@ __wt_async_flush(WT_CONNECTION_IMPL *conn) * that the flush is complete on their side. Then we * clear the flush flags and return. */ - __wt_spin_lock(session, &async->opsq_lock); - while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSH_IN_PROGRESS)) { + while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSH_IN_PROGRESS)) /* * We're racing an in-progress flush. We need to wait * our turn to start our own. We need to convoy the * racing calls because a later call may be waiting for * specific enqueued ops to be complete before this returns. */ - __wt_spin_unlock(session, &async->opsq_lock); __wt_sleep(0, 100000); - __wt_spin_lock(session, &async->opsq_lock); - } /* * We're the owner of this flush operation. Set the @@ -371,20 +358,17 @@ __wt_async_flush(WT_CONNECTION_IMPL *conn) WT_ASSERT(conn->default_session, async->flush_op.state == WT_ASYNCOP_FREE); async->flush_op.state = WT_ASYNCOP_READY; - WT_ERR(__wt_async_op_enqueue(conn, &async->flush_op, 1)); - while (!FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSH_COMPLETE)) { - __wt_spin_unlock(session, &async->opsq_lock); + WT_ERR(__wt_async_op_enqueue(conn, &async->flush_op)); + while (!FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSH_COMPLETE)) WT_ERR_TIMEDOUT_OK( __wt_cond_wait(NULL, async->flush_cond, 100000)); - __wt_spin_lock(session, &async->opsq_lock); - } /* * Flush is done. Clear the flags. */ async->flush_op.state = WT_ASYNCOP_FREE; FLD_CLR(async->opsq_flush, (WT_ASYNC_FLUSH_COMPLETE | WT_ASYNC_FLUSH_IN_PROGRESS)); -err: __wt_spin_unlock(session, &async->opsq_lock); +err: return (ret); } diff --git a/src/include/async.h b/src/include/async.h index 55e93b6a550..a8e364b9a81 100644 --- a/src/include/async.h +++ b/src/include/async.h @@ -44,7 +44,6 @@ struct __wt_async_format { struct __wt_async_op_impl { WT_ASYNC_OP iface; - STAILQ_ENTRY(__wt_async_op_impl) q; /* Work queue links. */ WT_ASYNC_CALLBACK *cb; uint32_t internal_id; /* Array position id. */ @@ -68,20 +67,25 @@ struct __wt_async { uint32_t ops_index; /* Active slot index */ uint64_t op_id; /* Unique ID counter */ /* - * Everything relating to the work queue and flushing is - * protected by the opsq_lock. + * We need to have two head pointers. One that is allocated to + * enqueue threads and one that indicates ready slots to the + * consumers. We only need one tail pointer to coordinate + * consuming threads against each other. We don't need to worry + * about head catch up with tail because we know the queue is + * large enough for all possible work. */ - WT_SPINLOCK opsq_lock; /* Locked: work queue */ + WT_ASYNC_OP_IMPL **async_queue; /* Async ops work queue */ + uint64_t alloc_head; /* Next slot to enqueue */ + uint64_t head; /* Head visible to worker */ + uint64_t tail; /* Worker consumed tail */ + STAILQ_HEAD(__wt_async_format_qh, __wt_async_format) formatqh; - STAILQ_HEAD(__wt_async_qh, __wt_async_op_impl) opqh; int cur_queue; /* Currently enqueued */ int max_queue; /* Maximum enqueued */ #define WT_ASYNC_FLUSH_COMPLETE 0x0001 /* Notify flush caller */ #define WT_ASYNC_FLUSH_IN_PROGRESS 0x0002 /* Prevent more callers */ #define WT_ASYNC_FLUSHING 0x0004 /* Notify workers */ uint32_t opsq_flush; /* Queue flush state */ - /* Notify any waiting threads when work is enqueued. */ - WT_CONDVAR *ops_cond; /* Notify any waiting threads when flushing is done. */ WT_CONDVAR *flush_cond; WT_ASYNC_OP_IMPL flush_op; /* Special flush op */ diff --git a/src/include/extern.h b/src/include/extern.h index 67b1e94b7a4..4254f8e24d3 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -1,8 +1,7 @@ /* DO NOT EDIT: automatically built by dist/s_prototypes. */ extern int __wt_async_op_enqueue(WT_CONNECTION_IMPL *conn, - WT_ASYNC_OP_IMPL *op, - int locked); + WT_ASYNC_OP_IMPL *op); extern int __wt_async_op_init(WT_CONNECTION_IMPL *conn); extern void *__wt_async_worker(void *arg); extern int __wt_block_addr_to_buffer(WT_BLOCK *block, |