summaryrefslogtreecommitdiff
path: root/src/async/async_worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/async/async_worker.c')
-rw-r--r--src/async/async_worker.c139
1 files changed, 65 insertions, 74 deletions
diff --git a/src/async/async_worker.c b/src/async/async_worker.c
index 8d7dd5c1b24..0b2ccfaa224 100644
--- a/src/async/async_worker.c
+++ b/src/async/async_worker.c
@@ -8,28 +8,74 @@
#include "wt_internal.h"
/*
+ * __async_op_dequeue --
+ * Wait for work to be available. Then atomically take it off
+ * the work queue.
+ */
+static int
+__async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session,
+ WT_ASYNC_OP_IMPL **op)
+{
+ WT_ASYNC *async;
+ uint64_t my_slot, old_tail;
+ uint32_t max_tries, tries;
+
+ async = conn->async;
+ *op = NULL;
+ max_tries = 100;
+ tries = 0;
+ /*
+ * Wait for work to do. Work is available when async->head moves.
+ * Then grab the slot containing the work. If we lose, try again.
+ */
+retry:
+ while ((old_tail = async->tail) == async->head &&
+ ++tries < max_tries &&
+ !FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING))
+ __wt_yield();
+ if (tries >= max_tries ||
+ FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING))
+ return (0);
+ /*
+ * Try to increment the tail to claim this slot. If we lose
+ * a race, try again.
+ */
+ if (!WT_ATOMIC_CAS(async->tail, old_tail, old_tail + 1))
+ goto retry;
+ my_slot = (old_tail + 1) % conn->async_size;
+ *op = WT_ATOMIC_STORE(async->async_queue[my_slot], NULL);
+
+ WT_ASSERT(session, async->cur_queue > 0);
+ WT_ASSERT(session, *op != NULL);
+ WT_ASSERT(session, (*op)->state == WT_ASYNCOP_ENQUEUED);
+ --async->cur_queue;
+ (*op)->state = WT_ASYNCOP_WORKING;
+
+ if (*op == &async->flush_op) {
+ WT_ASSERT(session, FLD_ISSET(async->opsq_flush,
+ WT_ASYNC_FLUSH_IN_PROGRESS));
+ /*
+ * We're the worker to take the flush op off the queue.
+ * Set the flushing flag and set count to 1.
+ */
+ FLD_SET(async->opsq_flush, WT_ASYNC_FLUSHING);
+ WT_WRITE_BARRIER();
+ }
+ return (0);
+}
+
+/*
* __async_flush_wait --
* Wait for the final worker to finish flushing.
- * Assumes it is called with spinlock held and returns it locked.
*/
static int
-__async_flush_wait(WT_SESSION_IMPL *session, WT_ASYNC *async, int *locked)
+__async_flush_wait(WT_SESSION_IMPL *session, WT_ASYNC *async)
{
WT_DECL_RET;
- /*
- * We change the caller's locked setting so that if we return an
- * error from this function the caller can properly unlock or not
- * as needed.
- */
- while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) {
- __wt_spin_unlock(session, &async->opsq_lock);
- *locked= 0;
+ while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING))
WT_ERR_TIMEDOUT_OK(
__wt_cond_wait(session, async->flush_cond, 10000));
- __wt_spin_lock(session, &async->opsq_lock);
- *locked= 1;
- }
err: return (ret);
}
@@ -166,8 +212,8 @@ __async_worker_op(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op,
/*
* After the callback returns, and the transaction resolved release
* the op back to the free pool and reset our cached cursor.
+ * We do this regardless of success or failure.
*/
- ret = 0;
op->state = WT_ASYNCOP_FREE;
F_CLR(&asyncop->c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
cursor->reset(cursor);
@@ -188,19 +234,18 @@ __wt_async_worker(void *arg)
WT_CONNECTION_IMPL *conn;
WT_DECL_RET;
WT_SESSION_IMPL *session;
- int locked;
session = arg;
conn = S2C(session);
async = conn->async;
- locked = 0;
worker.num_cursors = 0;
STAILQ_INIT(&worker.cursorqh);
while (F_ISSET(conn, WT_CONN_SERVER_RUN)) {
- __wt_spin_lock(session, &async->opsq_lock);
- locked = 1;
- if (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) {
+ WT_ERR(__async_op_dequeue(conn, session, &op));
+ if (op != NULL && op != &async->flush_op)
+ WT_ERR(__async_worker_op(session, op, &worker));
+ else if (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) {
/*
* Worker flushing going on. Last worker to the party
* needs to clear the FLUSHING flag and signal the cond.
@@ -218,74 +263,20 @@ __wt_async_worker(void *arg)
FLD_SET(async->opsq_flush,
WT_ASYNC_FLUSH_COMPLETE);
FLD_CLR(async->opsq_flush, WT_ASYNC_FLUSHING);
- __wt_spin_unlock(session, &async->opsq_lock);
- locked = 0;
WT_ERR(__wt_cond_signal(session,
async->flush_cond));
- __wt_spin_lock(session, &async->opsq_lock);
- locked = 1;
} else {
/*
* We need to wait for the last worker to
* signal the condition.
*/
- WT_ERR(__async_flush_wait(
- session, async, &locked));
+ WT_ERR(__async_flush_wait(session, async));
}
}
- /*
- * Get next op. We get here with the opsq lock held.
- * Remove from the head of the queue.
- */
- op = STAILQ_FIRST(&async->opqh);
- if (op == NULL) {
- __wt_spin_unlock(session, &async->opsq_lock);
- locked = 0;
- goto wait_for_work;
- }
-
- /*
- * There is work to do.
- */
- STAILQ_REMOVE_HEAD(&async->opqh, q);
- WT_ASSERT(session, async->cur_queue > 0);
- --async->cur_queue;
- WT_ASSERT(session, op->state == WT_ASYNCOP_ENQUEUED);
- op->state = WT_ASYNCOP_WORKING;
- if (op == &async->flush_op) {
- WT_ASSERT(session, FLD_ISSET(async->opsq_flush,
- WT_ASYNC_FLUSH_IN_PROGRESS));
- /*
- * We're the worker to take the flush op off the queue.
- * Set the flushing flag and set count to 1.
- */
- FLD_SET(async->opsq_flush, WT_ASYNC_FLUSHING);
- async->flush_count = 1;
- WT_ERR(__async_flush_wait(session, async, &locked));
- }
- /*
- * Release the lock before performing the op.
- */
- __wt_spin_unlock(session, &async->opsq_lock);
- locked = 0;
- if (op != &async->flush_op)
- (void)__async_worker_op(session, op, &worker);
-
-wait_for_work:
- WT_ASSERT(session, locked == 0);
- /* Wait until the next event. */
-#if 0
- WT_ERR_TIMEDOUT_OK(
- __wt_cond_wait(session, async->ops_cond, 100000));
-#else
- __wt_yield();
-#endif
}
if (0) {
err: __wt_err(session, ret, "async worker error");
- if (locked)
- __wt_spin_unlock(session, &async->opsq_lock);
}
/*
* Worker thread cleanup, close our cached cursors and