summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/c/ex_async.c3
-rw-r--r--src/async/async_api.c38
-rw-r--r--src/async/async_worker.c139
-rw-r--r--src/conn/conn_async.c26
-rw-r--r--src/include/async.h18
-rw-r--r--src/include/extern.h3
6 files changed, 106 insertions, 121 deletions
diff --git a/examples/c/ex_async.c b/examples/c/ex_async.c
index c185fdbe9c6..8e29bcf93e8 100644
--- a/examples/c/ex_async.c
+++ b/examples/c/ex_async.c
@@ -109,7 +109,8 @@ retry:
* If we used up all the ops, pause and retry to
* give the workers a chance to process them.
*/
- fprintf(stderr, "Iteration %d: table 1 ret %d\n",i,ret);
+ fprintf(stderr,
+ "Iteration %d: async_new_op ret %d\n",i,ret);
sleep(1);
goto retry;
}
diff --git a/src/async/async_api.c b/src/async/async_api.c
index ed8e3b74245..f759901ace5 100644
--- a/src/async/async_api.c
+++ b/src/async/async_api.c
@@ -74,7 +74,7 @@ static int
__async_op_wrap(WT_ASYNC_OP_IMPL *op, WT_ASYNC_OPTYPE type)
{
op->optype = type;
- return (__wt_async_op_enqueue(O2C(op), op, 0));
+ return (__wt_async_op_enqueue(O2C(op), op));
}
/*
@@ -211,35 +211,34 @@ __async_op_init(WT_CONNECTION_IMPL *conn, WT_ASYNC_OP_IMPL *op, uint32_t id)
* Enqueue an operation onto the work queue.
*/
int
-__wt_async_op_enqueue(WT_CONNECTION_IMPL *conn,
- WT_ASYNC_OP_IMPL *op, int locked)
+__wt_async_op_enqueue(WT_CONNECTION_IMPL *conn, WT_ASYNC_OP_IMPL *op)
{
WT_ASYNC *async;
WT_DECL_RET;
+ uint64_t my_alloc, my_slot;
async = conn->async;
- if (!locked)
- __wt_spin_lock(conn->default_session, &async->opsq_lock);
/*
* Enqueue op at the tail of the work queue.
*/
WT_ASSERT(conn->default_session, op->state == WT_ASYNCOP_READY);
- STAILQ_INSERT_TAIL(&async->opqh, op, q);
+ /*
+ * We get our slot in the ring buffer to use.
+ */
+ my_alloc = WT_ATOMIC_ADD(async->alloc_head, 1);
+ my_slot = my_alloc % conn->async_size;
+ WT_ASSERT(conn->default_session, async->async_queue[my_slot] == NULL);
+ async->async_queue[my_slot] = op;
op->state = WT_ASYNCOP_ENQUEUED;
if (++async->cur_queue > async->max_queue)
async->max_queue = async->cur_queue;
/*
- * Signal the worker threads something is on the queue.
- */
- __wt_spin_unlock(conn->default_session, &async->opsq_lock);
-#if 0
- WT_ERR(__wt_cond_signal(conn->default_session, async->ops_cond));
-#endif
- /*
- * Relock if we need to for the caller.
+ * Multiple threads may be adding ops to the queue. We need to wait
+ * our turn to make our slot visible to workers.
*/
-err: if (locked)
- __wt_spin_lock(conn->default_session, &async->opsq_lock);
+ while (async->head != (my_alloc - 1))
+ __wt_yield();
+ WT_ATOMIC_STORE(async->head, my_alloc);
return (ret);
}
@@ -261,6 +260,13 @@ __wt_async_op_init(WT_CONNECTION_IMPL *conn)
__async_op_init(conn, &async->flush_op, OPS_INVALID_INDEX);
/*
+ * Allocate and initialize the work queue. This is sized so that
+ * the ring buffer is known to be big enough such that the head
+ * can never overlap the tail. Include 1 extra for the flush op.
+ */
+ WT_RET(__wt_calloc_def(conn->default_session,
+ conn->async_size + 1, &async->async_queue));
+ /*
* Allocate and initialize all the user ops.
*/
WT_RET(__wt_calloc_def(conn->default_session,
diff --git a/src/async/async_worker.c b/src/async/async_worker.c
index 8d7dd5c1b24..0b2ccfaa224 100644
--- a/src/async/async_worker.c
+++ b/src/async/async_worker.c
@@ -8,28 +8,74 @@
#include "wt_internal.h"
/*
+ * __async_op_dequeue --
+ * Wait for work to be available. Then atomically take it off
+ * the work queue.
+ */
+static int
+__async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session,
+ WT_ASYNC_OP_IMPL **op)
+{
+ WT_ASYNC *async;
+ uint64_t my_slot, old_tail;
+ uint32_t max_tries, tries;
+
+ async = conn->async;
+ *op = NULL;
+ max_tries = 100;
+ tries = 0;
+ /*
+ * Wait for work to do. Work is available when async->head moves.
+ * Then grab the slot containing the work. If we lose, try again.
+ */
+retry:
+ while ((old_tail = async->tail) == async->head &&
+ ++tries < max_tries &&
+ !FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING))
+ __wt_yield();
+ if (tries >= max_tries ||
+ FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING))
+ return (0);
+ /*
+ * Try to increment the tail to claim this slot. If we lose
+ * a race, try again.
+ */
+ if (!WT_ATOMIC_CAS(async->tail, old_tail, old_tail + 1))
+ goto retry;
+ my_slot = (old_tail + 1) % conn->async_size;
+ *op = WT_ATOMIC_STORE(async->async_queue[my_slot], NULL);
+
+ WT_ASSERT(session, async->cur_queue > 0);
+ WT_ASSERT(session, *op != NULL);
+ WT_ASSERT(session, (*op)->state == WT_ASYNCOP_ENQUEUED);
+ --async->cur_queue;
+ (*op)->state = WT_ASYNCOP_WORKING;
+
+ if (*op == &async->flush_op) {
+ WT_ASSERT(session, FLD_ISSET(async->opsq_flush,
+ WT_ASYNC_FLUSH_IN_PROGRESS));
+ /*
+ * We're the worker to take the flush op off the queue.
+ * Set the flushing flag and set count to 1.
+ */
+ FLD_SET(async->opsq_flush, WT_ASYNC_FLUSHING);
+ WT_WRITE_BARRIER();
+ }
+ return (0);
+}
+
+/*
* __async_flush_wait --
* Wait for the final worker to finish flushing.
- * Assumes it is called with spinlock held and returns it locked.
*/
static int
-__async_flush_wait(WT_SESSION_IMPL *session, WT_ASYNC *async, int *locked)
+__async_flush_wait(WT_SESSION_IMPL *session, WT_ASYNC *async)
{
WT_DECL_RET;
- /*
- * We change the caller's locked setting so that if we return an
- * error from this function the caller can properly unlock or not
- * as needed.
- */
- while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) {
- __wt_spin_unlock(session, &async->opsq_lock);
- *locked= 0;
+ while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING))
WT_ERR_TIMEDOUT_OK(
__wt_cond_wait(session, async->flush_cond, 10000));
- __wt_spin_lock(session, &async->opsq_lock);
- *locked= 1;
- }
err: return (ret);
}
@@ -166,8 +212,8 @@ __async_worker_op(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op,
/*
* After the callback returns, and the transaction resolved release
* the op back to the free pool and reset our cached cursor.
+ * We do this regardless of success or failure.
*/
- ret = 0;
op->state = WT_ASYNCOP_FREE;
F_CLR(&asyncop->c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
cursor->reset(cursor);
@@ -188,19 +234,18 @@ __wt_async_worker(void *arg)
WT_CONNECTION_IMPL *conn;
WT_DECL_RET;
WT_SESSION_IMPL *session;
- int locked;
session = arg;
conn = S2C(session);
async = conn->async;
- locked = 0;
worker.num_cursors = 0;
STAILQ_INIT(&worker.cursorqh);
while (F_ISSET(conn, WT_CONN_SERVER_RUN)) {
- __wt_spin_lock(session, &async->opsq_lock);
- locked = 1;
- if (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) {
+ WT_ERR(__async_op_dequeue(conn, session, &op));
+ if (op != NULL && op != &async->flush_op)
+ WT_ERR(__async_worker_op(session, op, &worker));
+ else if (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSHING)) {
/*
* Worker flushing going on. Last worker to the party
* needs to clear the FLUSHING flag and signal the cond.
@@ -218,74 +263,20 @@ __wt_async_worker(void *arg)
FLD_SET(async->opsq_flush,
WT_ASYNC_FLUSH_COMPLETE);
FLD_CLR(async->opsq_flush, WT_ASYNC_FLUSHING);
- __wt_spin_unlock(session, &async->opsq_lock);
- locked = 0;
WT_ERR(__wt_cond_signal(session,
async->flush_cond));
- __wt_spin_lock(session, &async->opsq_lock);
- locked = 1;
} else {
/*
* We need to wait for the last worker to
* signal the condition.
*/
- WT_ERR(__async_flush_wait(
- session, async, &locked));
+ WT_ERR(__async_flush_wait(session, async));
}
}
- /*
- * Get next op. We get here with the opsq lock held.
- * Remove from the head of the queue.
- */
- op = STAILQ_FIRST(&async->opqh);
- if (op == NULL) {
- __wt_spin_unlock(session, &async->opsq_lock);
- locked = 0;
- goto wait_for_work;
- }
-
- /*
- * There is work to do.
- */
- STAILQ_REMOVE_HEAD(&async->opqh, q);
- WT_ASSERT(session, async->cur_queue > 0);
- --async->cur_queue;
- WT_ASSERT(session, op->state == WT_ASYNCOP_ENQUEUED);
- op->state = WT_ASYNCOP_WORKING;
- if (op == &async->flush_op) {
- WT_ASSERT(session, FLD_ISSET(async->opsq_flush,
- WT_ASYNC_FLUSH_IN_PROGRESS));
- /*
- * We're the worker to take the flush op off the queue.
- * Set the flushing flag and set count to 1.
- */
- FLD_SET(async->opsq_flush, WT_ASYNC_FLUSHING);
- async->flush_count = 1;
- WT_ERR(__async_flush_wait(session, async, &locked));
- }
- /*
- * Release the lock before performing the op.
- */
- __wt_spin_unlock(session, &async->opsq_lock);
- locked = 0;
- if (op != &async->flush_op)
- (void)__async_worker_op(session, op, &worker);
-
-wait_for_work:
- WT_ASSERT(session, locked == 0);
- /* Wait until the next event. */
-#if 0
- WT_ERR_TIMEDOUT_OK(
- __wt_cond_wait(session, async->ops_cond, 100000));
-#else
- __wt_yield();
-#endif
}
if (0) {
err: __wt_err(session, ret, "async worker error");
- if (locked)
- __wt_spin_unlock(session, &async->opsq_lock);
}
/*
* Worker thread cleanup, close our cached cursors and
diff --git a/src/conn/conn_async.c b/src/conn/conn_async.c
index 3fafde17815..34242db6626 100644
--- a/src/conn/conn_async.c
+++ b/src/conn/conn_async.c
@@ -10,7 +10,7 @@
/*
* __async_get_format --
* Find or allocate the uri/config/format structure.
- * Called with the async opsq lock.
+ * Called with the async ops lock.
*/
static int
__async_get_format(WT_CONNECTION_IMPL *conn, const char *uri,
@@ -235,10 +235,7 @@ __wt_async_create(WT_CONNECTION_IMPL *conn, const char *cfg[])
WT_RET(__wt_calloc(session, 1, sizeof(WT_ASYNC), &conn->async));
async = conn->async;
STAILQ_INIT(&async->formatqh);
- STAILQ_INIT(&async->opqh);
WT_RET(__wt_spin_init(session, &async->ops_lock, "ops"));
- WT_RET(__wt_spin_init(session, &async->opsq_lock, "ops queue"));
- WT_RET(__wt_cond_alloc(session, "async op", 0, &async->ops_cond));
WT_RET(__wt_cond_alloc(session, "async flush", 0, &async->flush_cond));
WT_RET(__wt_async_op_init(conn));
@@ -283,17 +280,12 @@ __wt_async_destroy(WT_CONNECTION_IMPL *conn)
if (!conn->async_cfg)
return (0);
-
for (i = 0; i < conn->async_workers; i++)
if (async->worker_tids[i] != 0) {
-#if 0
- WT_TRET(__wt_cond_signal(session, async->ops_cond));
-#endif
WT_TRET(__wt_thread_join(
session, async->worker_tids[i]));
async->worker_tids[i] = 0;
}
- WT_TRET(__wt_cond_destroy(session, &async->ops_cond));
WT_TRET(__wt_cond_destroy(session, &async->flush_cond));
/* Close the server threads' sessions. */
@@ -316,7 +308,6 @@ __wt_async_destroy(WT_CONNECTION_IMPL *conn)
}
__wt_free(session, async->async_ops);
__wt_spin_destroy(session, &async->ops_lock);
- __wt_spin_destroy(session, &async->opsq_lock);
__wt_free(session, conn->async);
return (ret);
@@ -347,18 +338,14 @@ __wt_async_flush(WT_CONNECTION_IMPL *conn)
* that the flush is complete on their side. Then we
* clear the flush flags and return.
*/
- __wt_spin_lock(session, &async->opsq_lock);
- while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSH_IN_PROGRESS)) {
+ while (FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSH_IN_PROGRESS))
/*
* We're racing an in-progress flush. We need to wait
* our turn to start our own. We need to convoy the
* racing calls because a later call may be waiting for
* specific enqueued ops to be complete before this returns.
*/
- __wt_spin_unlock(session, &async->opsq_lock);
__wt_sleep(0, 100000);
- __wt_spin_lock(session, &async->opsq_lock);
- }
/*
* We're the owner of this flush operation. Set the
@@ -371,20 +358,17 @@ __wt_async_flush(WT_CONNECTION_IMPL *conn)
WT_ASSERT(conn->default_session,
async->flush_op.state == WT_ASYNCOP_FREE);
async->flush_op.state = WT_ASYNCOP_READY;
- WT_ERR(__wt_async_op_enqueue(conn, &async->flush_op, 1));
- while (!FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSH_COMPLETE)) {
- __wt_spin_unlock(session, &async->opsq_lock);
+ WT_ERR(__wt_async_op_enqueue(conn, &async->flush_op));
+ while (!FLD_ISSET(async->opsq_flush, WT_ASYNC_FLUSH_COMPLETE))
WT_ERR_TIMEDOUT_OK(
__wt_cond_wait(NULL, async->flush_cond, 100000));
- __wt_spin_lock(session, &async->opsq_lock);
- }
/*
* Flush is done. Clear the flags.
*/
async->flush_op.state = WT_ASYNCOP_FREE;
FLD_CLR(async->opsq_flush,
(WT_ASYNC_FLUSH_COMPLETE | WT_ASYNC_FLUSH_IN_PROGRESS));
-err: __wt_spin_unlock(session, &async->opsq_lock);
+err:
return (ret);
}
diff --git a/src/include/async.h b/src/include/async.h
index 55e93b6a550..a8e364b9a81 100644
--- a/src/include/async.h
+++ b/src/include/async.h
@@ -44,7 +44,6 @@ struct __wt_async_format {
struct __wt_async_op_impl {
WT_ASYNC_OP iface;
- STAILQ_ENTRY(__wt_async_op_impl) q; /* Work queue links. */
WT_ASYNC_CALLBACK *cb;
uint32_t internal_id; /* Array position id. */
@@ -68,20 +67,25 @@ struct __wt_async {
uint32_t ops_index; /* Active slot index */
uint64_t op_id; /* Unique ID counter */
/*
- * Everything relating to the work queue and flushing is
- * protected by the opsq_lock.
+ * We need to have two head pointers. One that is allocated to
+ * enqueue threads and one that indicates ready slots to the
+ * consumers. We only need one tail pointer to coordinate
+ * consuming threads against each other. We don't need to worry
+ * about head catch up with tail because we know the queue is
+ * large enough for all possible work.
*/
- WT_SPINLOCK opsq_lock; /* Locked: work queue */
+ WT_ASYNC_OP_IMPL **async_queue; /* Async ops work queue */
+ uint64_t alloc_head; /* Next slot to enqueue */
+ uint64_t head; /* Head visible to worker */
+ uint64_t tail; /* Worker consumed tail */
+
STAILQ_HEAD(__wt_async_format_qh, __wt_async_format) formatqh;
- STAILQ_HEAD(__wt_async_qh, __wt_async_op_impl) opqh;
int cur_queue; /* Currently enqueued */
int max_queue; /* Maximum enqueued */
#define WT_ASYNC_FLUSH_COMPLETE 0x0001 /* Notify flush caller */
#define WT_ASYNC_FLUSH_IN_PROGRESS 0x0002 /* Prevent more callers */
#define WT_ASYNC_FLUSHING 0x0004 /* Notify workers */
uint32_t opsq_flush; /* Queue flush state */
- /* Notify any waiting threads when work is enqueued. */
- WT_CONDVAR *ops_cond;
/* Notify any waiting threads when flushing is done. */
WT_CONDVAR *flush_cond;
WT_ASYNC_OP_IMPL flush_op; /* Special flush op */
diff --git a/src/include/extern.h b/src/include/extern.h
index 67b1e94b7a4..4254f8e24d3 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -1,8 +1,7 @@
/* DO NOT EDIT: automatically built by dist/s_prototypes. */
extern int __wt_async_op_enqueue(WT_CONNECTION_IMPL *conn,
- WT_ASYNC_OP_IMPL *op,
- int locked);
+ WT_ASYNC_OP_IMPL *op);
extern int __wt_async_op_init(WT_CONNECTION_IMPL *conn);
extern void *__wt_async_worker(void *arg);
extern int __wt_block_addr_to_buffer(WT_BLOCK *block,