diff options
Diffstat (limited to 'src/async/async_worker.c')
-rw-r--r-- | src/async/async_worker.c | 139 |
1 files changed, 65 insertions, 74 deletions
diff --git a/src/async/async_worker.c b/src/async/async_worker.c index 8d7dd5c1b24..0b2ccfaa224 100644 --- a/src/async/async_worker.c +++ b/src/async/async_worker.c @@ -8,28 +8,74 @@ #include "wt_internal.h" /* + * __async_op_dequeue -- + * Wait for work to be available. Then atomically take it off + * the work queue. + */ +static int +__async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session, + WT_ASYNC_OP_IMPL **op) +{ + WT_ASYNC *async; + uint64_t my_slot, old_tail; + uint32_t max_tries, tries; + + async = conn->async; + *op = NULL; + max_tries = 100; + tries = 0; + /* + * Wait for work to do. Work is available when async->head moves. + * Then grab the slot containing the work. If we lose, try again. + */ +retry: + while ((old_tail = async->tail) == async->head && + ++tries < max_tries && + !FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) + __wt_yield(); + if (tries >= max_tries || + FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) + return (0); + /* + * Try to increment the tail to claim this slot. If we lose + * a race, try again. + */ + if (!WT_ATOMIC_CAS(async->tail, old_tail, old_tail + 1)) + goto retry; + my_slot = (old_tail + 1) % conn->async_size; + *op = WT_ATOMIC_STORE(async->async_queue[my_slot], NULL); + + WT_ASSERT(session, async->cur_queue > 0); + WT_ASSERT(session, *op != NULL); + WT_ASSERT(session, (*op)->state == WT_ASYNCOP_ENQUEUED); + --async->cur_queue; + (*op)->state = WT_ASYNCOP_WORKING; + + if (*op == &async->flush_op) { + WT_ASSERT(session, FLD_ISSET(async->opsq_flush, + WT_ASYNC_FLUSH_IN_PROGRESS)); + /* + * We're the worker to take the flush op off the queue. + * Set the flushing flag and set count to 1. + */ + FLD_SET(async->opsq_flush, WT_ASYNC_FLUSHING); + WT_WRITE_BARRIER(); + } + return (0); +} + +/* * __async_flush_wait -- * Wait for the final worker to finish flushing. - * Assumes it is called with spinlock held and returns it locked. */ static int -__async_flush_wait(WT_SESSION_IMPL *session, WT_ASYNC *async, int *locked) +__async_flush_wait(WT_SESSION_IMPL *session, WT_ASYNC *async) { WT_DECL_RET; - /* - * We change the caller's locked setting so that if we return an - * error from this function the caller can properly unlock or not - * as needed. - */ - while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) { - __wt_spin_unlock(session, &async->opsq_lock); - *locked= 0; + while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) WT_ERR_TIMEDOUT_OK( __wt_cond_wait(session, async->flush_cond, 10000)); - __wt_spin_lock(session, &async->opsq_lock); - *locked= 1; - } err: return (ret); } @@ -166,8 +212,8 @@ __async_worker_op(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, /* * After the callback returns, and the transaction resolved release * the op back to the free pool and reset our cached cursor. + * We do this regardless of success or failure. */ - ret = 0; op->state = WT_ASYNCOP_FREE; F_CLR(&asyncop->c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); cursor->reset(cursor); @@ -188,19 +234,18 @@ __wt_async_worker(void *arg) WT_CONNECTION_IMPL *conn; WT_DECL_RET; WT_SESSION_IMPL *session; - int locked; session = arg; conn = S2C(session); async = conn->async; - locked = 0; worker.num_cursors = 0; STAILQ_INIT(&worker.cursorqh); while (F_ISSET(conn, WT_CONN_SERVER_RUN)) { - __wt_spin_lock(session, &async->opsq_lock); - locked = 1; - if (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) { + WT_ERR(__async_op_dequeue(conn, session, &op)); + if (op != NULL && op != &async->flush_op) + WT_ERR(__async_worker_op(session, op, &worker)); + else if (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) { /* * Worker flushing going on. Last worker to the party * needs to clear the FLUSHING flag and signal the cond. @@ -218,74 +263,20 @@ __wt_async_worker(void *arg) FLD_SET(async->opsq_flush, WT_ASYNC_FLUSH_COMPLETE); FLD_CLR(async->opsq_flush, WT_ASYNC_FLUSHING); - __wt_spin_unlock(session, &async->opsq_lock); - locked = 0; WT_ERR(__wt_cond_signal(session, async->flush_cond)); - __wt_spin_lock(session, &async->opsq_lock); - locked = 1; } else { /* * We need to wait for the last worker to * signal the condition. */ - WT_ERR(__async_flush_wait( - session, async, &locked)); + WT_ERR(__async_flush_wait(session, async)); } } - /* - * Get next op. We get here with the opsq lock held. - * Remove from the head of the queue. - */ - op = STAILQ_FIRST(&async->opqh); - if (op == NULL) { - __wt_spin_unlock(session, &async->opsq_lock); - locked = 0; - goto wait_for_work; - } - - /* - * There is work to do. - */ - STAILQ_REMOVE_HEAD(&async->opqh, q); - WT_ASSERT(session, async->cur_queue > 0); - --async->cur_queue; - WT_ASSERT(session, op->state == WT_ASYNCOP_ENQUEUED); - op->state = WT_ASYNCOP_WORKING; - if (op == &async->flush_op) { - WT_ASSERT(session, FLD_ISSET(async->opsq_flush, - WT_ASYNC_FLUSH_IN_PROGRESS)); - /* - * We're the worker to take the flush op off the queue. - * Set the flushing flag and set count to 1. - */ - FLD_SET(async->opsq_flush, WT_ASYNC_FLUSHING); - async->flush_count = 1; - WT_ERR(__async_flush_wait(session, async, &locked)); - } - /* - * Release the lock before performing the op. - */ - __wt_spin_unlock(session, &async->opsq_lock); - locked = 0; - if (op != &async->flush_op) - (void)__async_worker_op(session, op, &worker); - -wait_for_work: - WT_ASSERT(session, locked == 0); - /* Wait until the next event. */ -#if 0 - WT_ERR_TIMEDOUT_OK( - __wt_cond_wait(session, async->ops_cond, 100000)); -#else - __wt_yield(); -#endif } if (0) { err: __wt_err(session, ret, "async worker error"); - if (locked) - __wt_spin_unlock(session, &async->opsq_lock); } /* * Worker thread cleanup, close our cached cursors and |