summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/src/async
diff options
context:
space:
mode:
authorLuke Chen <luke.chen@mongodb.com>2019-09-09 14:45:50 +1000
committerLuke Chen <luke.chen@mongodb.com>2019-09-09 14:48:15 +1000
commitd1c52b7aafac44e16dd62ce94663eab1aa16f921 (patch)
tree2aea018150b745b0fc7ef3dc626c4ef4164d2704 /src/third_party/wiredtiger/src/async
parentb326fd656716e95418e563ff12368a3015994b5e (diff)
downloadmongo-d1c52b7aafac44e16dd62ce94663eab1aa16f921.tar.gz
Import wiredtiger: c600bde20363629405082a3ea985b70dfb00850e from branch mongodb-4.2
ref: 280c572c80..c600bde203 for: 4.2.1 WT-4535 Enhance wt command line utility to use read-only mode WT-4658 Apply Clang Format WT-4810 Adding WT_ERR_ASSERT and WT_RET_ASSERT macros WT-4831 Add option to python tests to not fail if stdout is not empty WT-4884 Test for recovery correctness with modify operations WT-4935 Add a perf test to find out the wiredtiger_calc_modify overhead WT-4966 Fix valgrind detected memory leak from test/csuite/import/smoke.sh WT-5043 Add debugging to aid in test/format hang WT-5046 Prepared transactions aren't properly cleared from global table with WT_CONN_LOG_DEBUG_MODE enabled WT-5062 Adjust the record size to consume less size WT-5063 Return proper error message for cursor modify operation for not supported cursor types WT-5074 Fix "make check" on exotic architectures WT-5076 Cut WiredTiger 3.2.1 release WT-5080 New wtperf workload modify_distribute option WT-5085 Fix spacing in wtperf output WT-5087 Add time tracking at start of reconciliation WT-5088 Refine clearing and setting of debug timer WT-5100 Update test format to disable readonly mode in utility
Diffstat (limited to 'src/third_party/wiredtiger/src/async')
-rw-r--r--src/third_party/wiredtiger/src/async/async_api.c991
-rw-r--r--src/third_party/wiredtiger/src/async/async_op.c441
-rw-r--r--src/third_party/wiredtiger/src/async/async_worker.c538
3 files changed, 959 insertions, 1011 deletions
diff --git a/src/third_party/wiredtiger/src/async/async_api.c b/src/third_party/wiredtiger/src/async/async_api.c
index 0ef85b8cd28..81b23b238e7 100644
--- a/src/third_party/wiredtiger/src/async/async_api.c
+++ b/src/third_party/wiredtiger/src/async/async_api.c
@@ -10,602 +10,581 @@
/*
* __async_get_format --
- * Find or allocate the uri/config/format structure.
+ * Find or allocate the uri/config/format structure.
*/
static int
-__async_get_format(WT_CONNECTION_IMPL *conn, const char *uri,
- const char *config, WT_ASYNC_OP_IMPL *op)
+__async_get_format(
+ WT_CONNECTION_IMPL *conn, const char *uri, const char *config, WT_ASYNC_OP_IMPL *op)
{
- WT_ASYNC *async;
- WT_ASYNC_FORMAT *af;
- WT_CURSOR *c;
- WT_DECL_RET;
- WT_SESSION *wt_session;
- WT_SESSION_IMPL *session;
- uint64_t cfg_hash, uri_hash;
-
- async = conn->async;
- c = NULL;
- op->format = NULL;
-
- if (uri != NULL)
- uri_hash = __wt_hash_city64(uri, strlen(uri));
- else
- uri_hash = 0;
- if (config != NULL)
- cfg_hash = __wt_hash_city64(config, strlen(config));
- else
- cfg_hash = 0;
-
- /*
- * We don't need to hold a lock around this walk. The list is
- * permanent and always valid. We might race an insert and there
- * is a possibility a duplicate entry might be inserted, but
- * that is not harmful.
- */
- TAILQ_FOREACH(af, &async->formatqh, q) {
- if (af->uri_hash == uri_hash && af->cfg_hash == cfg_hash)
- goto setup;
- }
- /*
- * We didn't find one in the cache. Allocate and initialize one.
- * Insert it at the head expecting LRU usage. We need a real session
- * for the cursor.
- */
- WT_RET(__wt_open_internal_session(
- conn, "async-cursor", true, 0, &session));
- __wt_spin_lock(session, &async->ops_lock);
- WT_ERR(__wt_calloc_one(session, &af));
- WT_ERR(__wt_strdup(session, uri, &af->uri));
- WT_ERR(__wt_strdup(session, config, &af->config));
- af->uri_hash = uri_hash;
- af->cfg_hash = cfg_hash;
- /*
- * Get the key_format and value_format for this URI and store
- * it in the structure so that async->set_key/value work.
- */
- wt_session = &session->iface;
- WT_ERR(wt_session->open_cursor(wt_session, uri, NULL, NULL, &c));
- WT_ERR(__wt_strdup(session, c->key_format, &af->key_format));
- WT_ERR(__wt_strdup(session, c->value_format, &af->value_format));
- WT_ERR(c->close(c));
- c = NULL;
-
- TAILQ_INSERT_HEAD(&async->formatqh, af, q);
- __wt_spin_unlock(session, &async->ops_lock);
- WT_ERR(wt_session->close(wt_session, NULL));
-
-setup: op->format = af;
- /*
- * Copy the pointers for the formats. Items in the async format
- * queue remain there until the connection is closed. We must
- * initialize the format fields in the async_op, which are publicly
- * visible, and its internal cursor used by internal key/value
- * functions.
- */
- op->iface.c.key_format = op->iface.key_format = af->key_format;
- op->iface.c.value_format = op->iface.value_format = af->value_format;
- return (0);
+ WT_ASYNC *async;
+ WT_ASYNC_FORMAT *af;
+ WT_CURSOR *c;
+ WT_DECL_RET;
+ WT_SESSION *wt_session;
+ WT_SESSION_IMPL *session;
+ uint64_t cfg_hash, uri_hash;
+
+ async = conn->async;
+ c = NULL;
+ op->format = NULL;
+
+ if (uri != NULL)
+ uri_hash = __wt_hash_city64(uri, strlen(uri));
+ else
+ uri_hash = 0;
+ if (config != NULL)
+ cfg_hash = __wt_hash_city64(config, strlen(config));
+ else
+ cfg_hash = 0;
+
+ /*
+ * We don't need to hold a lock around this walk. The list is permanent and always valid. We
+ * might race an insert and there is a possibility a duplicate entry might be inserted, but that
+ * is not harmful.
+ */
+ TAILQ_FOREACH (af, &async->formatqh, q) {
+ if (af->uri_hash == uri_hash && af->cfg_hash == cfg_hash)
+ goto setup;
+ }
+ /*
+ * We didn't find one in the cache. Allocate and initialize one. Insert it at the head expecting
+ * LRU usage. We need a real session for the cursor.
+ */
+ WT_RET(__wt_open_internal_session(conn, "async-cursor", true, 0, &session));
+ __wt_spin_lock(session, &async->ops_lock);
+ WT_ERR(__wt_calloc_one(session, &af));
+ WT_ERR(__wt_strdup(session, uri, &af->uri));
+ WT_ERR(__wt_strdup(session, config, &af->config));
+ af->uri_hash = uri_hash;
+ af->cfg_hash = cfg_hash;
+ /*
+ * Get the key_format and value_format for this URI and store it in the structure so that
+ * async->set_key/value work.
+ */
+ wt_session = &session->iface;
+ WT_ERR(wt_session->open_cursor(wt_session, uri, NULL, NULL, &c));
+ WT_ERR(__wt_strdup(session, c->key_format, &af->key_format));
+ WT_ERR(__wt_strdup(session, c->value_format, &af->value_format));
+ WT_ERR(c->close(c));
+ c = NULL;
+
+ TAILQ_INSERT_HEAD(&async->formatqh, af, q);
+ __wt_spin_unlock(session, &async->ops_lock);
+ WT_ERR(wt_session->close(wt_session, NULL));
+
+setup:
+ op->format = af;
+ /*
+ * Copy the pointers for the formats. Items in the async format queue remain there until the
+ * connection is closed. We must initialize the format fields in the async_op, which are
+ * publicly visible, and its internal cursor used by internal key/value functions.
+ */
+ op->iface.c.key_format = op->iface.key_format = af->key_format;
+ op->iface.c.value_format = op->iface.value_format = af->value_format;
+ return (0);
err:
- if (c != NULL)
- WT_TRET(c->close(c));
- __wt_free(session, af->uri);
- __wt_free(session, af->config);
- __wt_free(session, af->key_format);
- __wt_free(session, af->value_format);
- __wt_free(session, af);
- return (ret);
+ if (c != NULL)
+ WT_TRET(c->close(c));
+ __wt_free(session, af->uri);
+ __wt_free(session, af->config);
+ __wt_free(session, af->key_format);
+ __wt_free(session, af->value_format);
+ __wt_free(session, af);
+ return (ret);
}
/*
* __async_new_op_alloc --
- * Find and allocate the next available async op handle.
+ * Find and allocate the next available async op handle.
*/
static int
-__async_new_op_alloc(WT_SESSION_IMPL *session, const char *uri,
- const char *config, WT_ASYNC_OP_IMPL **opp)
+__async_new_op_alloc(
+ WT_SESSION_IMPL *session, const char *uri, const char *config, WT_ASYNC_OP_IMPL **opp)
{
- WT_ASYNC *async;
- WT_ASYNC_OP_IMPL *op;
- WT_CONNECTION_IMPL *conn;
- uint32_t i, save_i, view;
+ WT_ASYNC *async;
+ WT_ASYNC_OP_IMPL *op;
+ WT_CONNECTION_IMPL *conn;
+ uint32_t i, save_i, view;
- *opp = NULL;
+ *opp = NULL;
- conn = S2C(session);
- async = conn->async;
- WT_STAT_CONN_INCR(session, async_op_alloc);
+ conn = S2C(session);
+ async = conn->async;
+ WT_STAT_CONN_INCR(session, async_op_alloc);
retry:
- op = NULL;
- WT_ORDERED_READ(save_i, async->ops_index);
- /*
- * Look after the last one allocated for a free one. We'd expect
- * ops to be freed mostly FIFO so we should quickly find one.
- */
- for (view = 1, i = save_i; i < conn->async_size; i++, view++) {
- op = &async->async_ops[i];
- if (op->state == WT_ASYNCOP_FREE)
- break;
- }
-
- /*
- * Loop around back to the beginning if we need to.
- */
- if (op == NULL || op->state != WT_ASYNCOP_FREE)
- for (i = 0; i < save_i; i++, view++) {
- op = &async->async_ops[i];
- if (op->state == WT_ASYNCOP_FREE)
- break;
- }
-
- /*
- * We still haven't found one. Return an error.
- */
- if (op == NULL || op->state != WT_ASYNCOP_FREE) {
- WT_STAT_CONN_INCR(session, async_full);
- return (__wt_set_return(session, EBUSY));
- }
- /*
- * Set the state of this op handle as READY for the user to use.
- * If we can set the state then the op entry is ours.
- * Start the next search at the next entry after this one.
- */
- if (!__wt_atomic_cas32(&op->state, WT_ASYNCOP_FREE, WT_ASYNCOP_READY)) {
- WT_STAT_CONN_INCR(session, async_alloc_race);
- goto retry;
- }
- WT_STAT_CONN_INCRV(session, async_alloc_view, view);
- WT_RET(__async_get_format(conn, uri, config, op));
- op->unique_id = __wt_atomic_add64(&async->op_id, 1);
- op->optype = WT_AOP_NONE;
- async->ops_index = (i + 1) % conn->async_size;
- *opp = op;
- return (0);
+ op = NULL;
+ WT_ORDERED_READ(save_i, async->ops_index);
+ /*
+ * Look after the last one allocated for a free one. We'd expect ops to be freed mostly FIFO so
+ * we should quickly find one.
+ */
+ for (view = 1, i = save_i; i < conn->async_size; i++, view++) {
+ op = &async->async_ops[i];
+ if (op->state == WT_ASYNCOP_FREE)
+ break;
+ }
+
+ /*
+ * Loop around back to the beginning if we need to.
+ */
+ if (op == NULL || op->state != WT_ASYNCOP_FREE)
+ for (i = 0; i < save_i; i++, view++) {
+ op = &async->async_ops[i];
+ if (op->state == WT_ASYNCOP_FREE)
+ break;
+ }
+
+ /*
+ * We still haven't found one. Return an error.
+ */
+ if (op == NULL || op->state != WT_ASYNCOP_FREE) {
+ WT_STAT_CONN_INCR(session, async_full);
+ return (__wt_set_return(session, EBUSY));
+ }
+ /*
+ * Set the state of this op handle as READY for the user to use. If we can set the state then
+ * the op entry is ours. Start the next search at the next entry after this one.
+ */
+ if (!__wt_atomic_cas32(&op->state, WT_ASYNCOP_FREE, WT_ASYNCOP_READY)) {
+ WT_STAT_CONN_INCR(session, async_alloc_race);
+ goto retry;
+ }
+ WT_STAT_CONN_INCRV(session, async_alloc_view, view);
+ WT_RET(__async_get_format(conn, uri, config, op));
+ op->unique_id = __wt_atomic_add64(&async->op_id, 1);
+ op->optype = WT_AOP_NONE;
+ async->ops_index = (i + 1) % conn->async_size;
+ *opp = op;
+ return (0);
}
/*
* __async_config --
- * Parse and setup the async API options.
+ * Parse and setup the async API options.
*/
static int
-__async_config(WT_SESSION_IMPL *session,
- WT_CONNECTION_IMPL *conn, const char **cfg, bool *runp)
+__async_config(WT_SESSION_IMPL *session, WT_CONNECTION_IMPL *conn, const char **cfg, bool *runp)
{
- WT_CONFIG_ITEM cval;
-
- /*
- * The async configuration is off by default.
- */
- WT_RET(__wt_config_gets(session, cfg, "async.enabled", &cval));
- *runp = cval.val != 0;
-
- /*
- * Even if async is turned off, we want to parse and store the default
- * values so that reconfigure can just enable them.
- *
- * Bound the minimum maximum operations at 10.
- */
- WT_RET(__wt_config_gets(session, cfg, "async.ops_max", &cval));
- conn->async_size = (uint32_t)WT_MAX(cval.val, 10);
-
- WT_RET(__wt_config_gets(session, cfg, "async.threads", &cval));
- conn->async_workers = (uint32_t)cval.val;
- /* Sanity check that api_data.py is in sync with async.h */
- WT_ASSERT(session, conn->async_workers <= WT_ASYNC_MAX_WORKERS);
-
- return (0);
+ WT_CONFIG_ITEM cval;
+
+ /*
+ * The async configuration is off by default.
+ */
+ WT_RET(__wt_config_gets(session, cfg, "async.enabled", &cval));
+ *runp = cval.val != 0;
+
+ /*
+ * Even if async is turned off, we want to parse and store the default
+ * values so that reconfigure can just enable them.
+ *
+ * Bound the minimum maximum operations at 10.
+ */
+ WT_RET(__wt_config_gets(session, cfg, "async.ops_max", &cval));
+ conn->async_size = (uint32_t)WT_MAX(cval.val, 10);
+
+ WT_RET(__wt_config_gets(session, cfg, "async.threads", &cval));
+ conn->async_workers = (uint32_t)cval.val;
+ /* Sanity check that api_data.py is in sync with async.h */
+ WT_ASSERT(session, conn->async_workers <= WT_ASYNC_MAX_WORKERS);
+
+ return (0);
}
/*
* __wt_async_stats_update --
- * Update the async stats for return to the application.
+ * Update the async stats for return to the application.
*/
void
__wt_async_stats_update(WT_SESSION_IMPL *session)
{
- WT_ASYNC *async;
- WT_CONNECTION_IMPL *conn;
- WT_CONNECTION_STATS **stats;
-
- conn = S2C(session);
- async = conn->async;
- if (async == NULL)
- return;
- stats = conn->stats;
- WT_STAT_SET(session, stats, async_cur_queue, async->cur_queue);
- WT_STAT_SET(session, stats, async_max_queue, async->max_queue);
+ WT_ASYNC *async;
+ WT_CONNECTION_IMPL *conn;
+ WT_CONNECTION_STATS **stats;
+
+ conn = S2C(session);
+ async = conn->async;
+ if (async == NULL)
+ return;
+ stats = conn->stats;
+ WT_STAT_SET(session, stats, async_cur_queue, async->cur_queue);
+ WT_STAT_SET(session, stats, async_max_queue, async->max_queue);
}
/*
* __async_start --
- * Start the async subsystem. All configuration processing has
- * already been done by the caller.
+ * Start the async subsystem. All configuration processing has already been done by the caller.
*/
static int
__async_start(WT_SESSION_IMPL *session)
{
- WT_ASYNC *async;
- WT_CONNECTION_IMPL *conn;
- uint32_t i, session_flags;
-
- conn = S2C(session);
- conn->async_cfg = true;
- /*
- * Async is on, allocate the WT_ASYNC structure and initialize the ops.
- */
- WT_RET(__wt_calloc_one(session, &conn->async));
- async = conn->async;
- TAILQ_INIT(&async->formatqh);
- WT_RET(__wt_spin_init(session, &async->ops_lock, "ops"));
- WT_RET(__wt_cond_alloc(session, "async flush", &async->flush_cond));
- WT_RET(__wt_async_op_init(session));
-
- /*
- * Start up the worker threads.
- */
- F_SET(conn, WT_CONN_SERVER_ASYNC);
- for (i = 0; i < conn->async_workers; i++) {
- /*
- * Each worker has its own session. We set both a general
- * server flag in the connection and an individual flag
- * in the session. The user may reconfigure the number of
- * workers and we may want to selectively stop some workers
- * while leaving the rest running.
- */
- session_flags = WT_SESSION_SERVER_ASYNC;
- WT_RET(__wt_open_internal_session(conn, "async-worker",
- true, session_flags, &async->worker_sessions[i]));
- }
- for (i = 0; i < conn->async_workers; i++) {
- /*
- * Start the threads.
- */
- WT_RET(__wt_thread_create(session, &async->worker_tids[i],
- __wt_async_worker, async->worker_sessions[i]));
- }
- __wt_async_stats_update(session);
- return (0);
+ WT_ASYNC *async;
+ WT_CONNECTION_IMPL *conn;
+ uint32_t i, session_flags;
+
+ conn = S2C(session);
+ conn->async_cfg = true;
+ /*
+ * Async is on, allocate the WT_ASYNC structure and initialize the ops.
+ */
+ WT_RET(__wt_calloc_one(session, &conn->async));
+ async = conn->async;
+ TAILQ_INIT(&async->formatqh);
+ WT_RET(__wt_spin_init(session, &async->ops_lock, "ops"));
+ WT_RET(__wt_cond_alloc(session, "async flush", &async->flush_cond));
+ WT_RET(__wt_async_op_init(session));
+
+ /*
+ * Start up the worker threads.
+ */
+ F_SET(conn, WT_CONN_SERVER_ASYNC);
+ for (i = 0; i < conn->async_workers; i++) {
+ /*
+ * Each worker has its own session. We set both a general server flag in the connection and
+ * an individual flag in the session. The user may reconfigure the number of workers and we
+ * may want to selectively stop some workers while leaving the rest running.
+ */
+ session_flags = WT_SESSION_SERVER_ASYNC;
+ WT_RET(__wt_open_internal_session(
+ conn, "async-worker", true, session_flags, &async->worker_sessions[i]));
+ }
+ for (i = 0; i < conn->async_workers; i++) {
+ /*
+ * Start the threads.
+ */
+ WT_RET(__wt_thread_create(
+ session, &async->worker_tids[i], __wt_async_worker, async->worker_sessions[i]));
+ }
+ __wt_async_stats_update(session);
+ return (0);
}
/*
* __wt_async_create --
- * Start the async subsystem and worker threads.
+ * Start the async subsystem and worker threads.
*/
int
__wt_async_create(WT_SESSION_IMPL *session, const char *cfg[])
{
- WT_CONNECTION_IMPL *conn;
- bool run;
+ WT_CONNECTION_IMPL *conn;
+ bool run;
- conn = S2C(session);
+ conn = S2C(session);
- /* Handle configuration. */
- run = false;
- WT_RET(__async_config(session, conn, cfg, &run));
+ /* Handle configuration. */
+ run = false;
+ WT_RET(__async_config(session, conn, cfg, &run));
- /* If async is not configured, we're done. */
- if (!run)
- return (0);
- return (__async_start(session));
+ /* If async is not configured, we're done. */
+ if (!run)
+ return (0);
+ return (__async_start(session));
}
/*
* __wt_async_reconfig --
- * Start the async subsystem and worker threads.
+ * Start the async subsystem and worker threads.
*/
int
__wt_async_reconfig(WT_SESSION_IMPL *session, const char *cfg[])
{
- WT_ASYNC *async;
- WT_CONNECTION_IMPL *conn, tmp_conn;
- WT_DECL_RET;
- WT_SESSION *wt_session;
- uint32_t i, session_flags;
- bool run;
-
- conn = S2C(session);
- async = conn->async;
- memset(&tmp_conn, 0, sizeof(tmp_conn));
- tmp_conn.async_cfg = conn->async_cfg;
- tmp_conn.async_workers = conn->async_workers;
- tmp_conn.async_size = conn->async_size;
-
- /* Handle configuration. */
- run = conn->async_cfg;
- WT_RET(__async_config(session, &tmp_conn, cfg, &run));
-
- /*
- * There are some restrictions on the live reconfiguration of async.
- * Unlike other subsystems where we simply destroy anything existing
- * and restart with the new configuration, async is not so easy.
- * If the user is just changing the number of workers, we want to
- * allow the existing op handles and other information to remain in
- * existence. So we must handle various combinations of changes
- * individually.
- *
- * One restriction is that if async is currently on, the user cannot
- * change the number of async op handles available. The user can try
- * but we do nothing with it. However we must allow the ops_max config
- * string so that a user can completely start async via reconfigure.
- */
-
- /*
- * Easy cases:
- * 1. If async is on and the user wants it off, shut it down.
- * 2. If async is off, and the user wants it on, start it.
- * 3. If not a toggle and async is off, we're done.
- */
- if (conn->async_cfg && !run) { /* Case 1 */
- WT_TRET(__wt_async_flush(session));
- ret = __wt_async_destroy(session);
- conn->async_cfg = false;
- return (ret);
- }
- if (!conn->async_cfg && run) /* Case 2 */
- return (__wt_async_create(session, cfg));
- if (!conn->async_cfg) /* Case 3 */
- return (0);
-
- /*
- * Running async worker modification cases:
- * 4. If number of workers didn't change, we're done.
- * 5. If more workers, start new ones.
- * 6. If fewer workers, kill some.
- */
- if (conn->async_workers == tmp_conn.async_workers)
- /* No change in the number of workers. */
- return (0);
- if (conn->async_workers < tmp_conn.async_workers) {
- /* Case 5 */
- /*
- * The worker_sessions array is allocated for the maximum
- * allowed number of workers, so starting more is easy.
- */
- for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
- /*
- * Each worker has its own session.
- */
- session_flags = WT_SESSION_SERVER_ASYNC;
- WT_RET(__wt_open_internal_session(conn, "async-worker",
- true, session_flags, &async->worker_sessions[i]));
- }
- for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
- /*
- * Start the threads.
- */
- WT_RET(__wt_thread_create(session,
- &async->worker_tids[i], __wt_async_worker,
- async->worker_sessions[i]));
- }
- conn->async_workers = tmp_conn.async_workers;
- }
- if (conn->async_workers > tmp_conn.async_workers) {
- /* Case 6 */
- /*
- * Stopping an individual async worker is the most complex case.
- * We clear the session async flag on the targeted worker thread
- * so that only that thread stops, and the others keep running.
- */
- for (i = conn->async_workers - 1;
- i >= tmp_conn.async_workers; i--) {
- /*
- * Join any worker we're stopping.
- * After the thread is stopped, close its session.
- */
- WT_ASSERT(session, async->worker_tids[i].created);
- WT_ASSERT(session, async->worker_sessions[i] != NULL);
- F_CLR(async->worker_sessions[i],
- WT_SESSION_SERVER_ASYNC);
- WT_TRET(__wt_thread_join(
- session, &async->worker_tids[i]));
- wt_session = &async->worker_sessions[i]->iface;
- WT_TRET(wt_session->close(wt_session, NULL));
- async->worker_sessions[i] = NULL;
- }
- conn->async_workers = tmp_conn.async_workers;
- }
-
- return (0);
+ WT_ASYNC *async;
+ WT_CONNECTION_IMPL *conn, tmp_conn;
+ WT_DECL_RET;
+ WT_SESSION *wt_session;
+ uint32_t i, session_flags;
+ bool run;
+
+ conn = S2C(session);
+ async = conn->async;
+ memset(&tmp_conn, 0, sizeof(tmp_conn));
+ tmp_conn.async_cfg = conn->async_cfg;
+ tmp_conn.async_workers = conn->async_workers;
+ tmp_conn.async_size = conn->async_size;
+
+ /* Handle configuration. */
+ run = conn->async_cfg;
+ WT_RET(__async_config(session, &tmp_conn, cfg, &run));
+
+ /*
+ * There are some restrictions on the live reconfiguration of async.
+ * Unlike other subsystems where we simply destroy anything existing
+ * and restart with the new configuration, async is not so easy.
+ * If the user is just changing the number of workers, we want to
+ * allow the existing op handles and other information to remain in
+ * existence. So we must handle various combinations of changes
+ * individually.
+ *
+ * One restriction is that if async is currently on, the user cannot
+ * change the number of async op handles available. The user can try
+ * but we do nothing with it. However we must allow the ops_max config
+ * string so that a user can completely start async via reconfigure.
+ */
+
+ /*
+ * Easy cases:
+ * 1. If async is on and the user wants it off, shut it down.
+ * 2. If async is off, and the user wants it on, start it.
+ * 3. If not a toggle and async is off, we're done.
+ */
+ if (conn->async_cfg && !run) { /* Case 1 */
+ WT_TRET(__wt_async_flush(session));
+ ret = __wt_async_destroy(session);
+ conn->async_cfg = false;
+ return (ret);
+ }
+ if (!conn->async_cfg && run) /* Case 2 */
+ return (__wt_async_create(session, cfg));
+ if (!conn->async_cfg) /* Case 3 */
+ return (0);
+
+ /*
+ * Running async worker modification cases:
+ * 4. If number of workers didn't change, we're done.
+ * 5. If more workers, start new ones.
+ * 6. If fewer workers, kill some.
+ */
+ if (conn->async_workers == tmp_conn.async_workers)
+ /* No change in the number of workers. */
+ return (0);
+ if (conn->async_workers < tmp_conn.async_workers) {
+ /* Case 5 */
+ /*
+ * The worker_sessions array is allocated for the maximum allowed number of workers, so
+ * starting more is easy.
+ */
+ for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
+ /*
+ * Each worker has its own session.
+ */
+ session_flags = WT_SESSION_SERVER_ASYNC;
+ WT_RET(__wt_open_internal_session(
+ conn, "async-worker", true, session_flags, &async->worker_sessions[i]));
+ }
+ for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
+ /*
+ * Start the threads.
+ */
+ WT_RET(__wt_thread_create(
+ session, &async->worker_tids[i], __wt_async_worker, async->worker_sessions[i]));
+ }
+ conn->async_workers = tmp_conn.async_workers;
+ }
+ if (conn->async_workers > tmp_conn.async_workers) {
+ /* Case 6 */
+ /*
+ * Stopping an individual async worker is the most complex case. We clear the session async
+ * flag on the targeted worker thread so that only that thread stops, and the others keep
+ * running.
+ */
+ for (i = conn->async_workers - 1; i >= tmp_conn.async_workers; i--) {
+ /*
+ * Join any worker we're stopping. After the thread is stopped, close its session.
+ */
+ WT_ASSERT(session, async->worker_tids[i].created);
+ WT_ASSERT(session, async->worker_sessions[i] != NULL);
+ F_CLR(async->worker_sessions[i], WT_SESSION_SERVER_ASYNC);
+ WT_TRET(__wt_thread_join(session, &async->worker_tids[i]));
+ wt_session = &async->worker_sessions[i]->iface;
+ WT_TRET(wt_session->close(wt_session, NULL));
+ async->worker_sessions[i] = NULL;
+ }
+ conn->async_workers = tmp_conn.async_workers;
+ }
+
+ return (0);
}
/*
* __wt_async_destroy --
- * Destroy the async worker threads and async subsystem.
+ * Destroy the async worker threads and async subsystem.
*/
int
__wt_async_destroy(WT_SESSION_IMPL *session)
{
- WT_ASYNC *async;
- WT_ASYNC_FORMAT *af;
- WT_ASYNC_OP *op;
- WT_CONNECTION_IMPL *conn;
- WT_DECL_RET;
- WT_SESSION *wt_session;
- uint32_t i;
-
- conn = S2C(session);
- async = conn->async;
-
- if (!conn->async_cfg)
- return (0);
-
- F_CLR(conn, WT_CONN_SERVER_ASYNC);
- for (i = 0; i < conn->async_workers; i++)
- WT_TRET(__wt_thread_join(session, &async->worker_tids[i]));
- __wt_cond_destroy(session, &async->flush_cond);
-
- /* Close the server threads' sessions. */
- for (i = 0; i < conn->async_workers; i++)
- if (async->worker_sessions[i] != NULL) {
- wt_session = &async->worker_sessions[i]->iface;
- WT_TRET(wt_session->close(wt_session, NULL));
- async->worker_sessions[i] = NULL;
- }
- /* Free any op key/value buffers. */
- for (i = 0; i < conn->async_size; i++) {
- op = (WT_ASYNC_OP *)&async->async_ops[i];
- if (op->c.key.data != NULL)
- __wt_buf_free(session, &op->c.key);
- if (op->c.value.data != NULL)
- __wt_buf_free(session, &op->c.value);
- }
-
- /* Free format resources */
- while ((af = TAILQ_FIRST(&async->formatqh)) != NULL) {
- TAILQ_REMOVE(&async->formatqh, af, q);
- __wt_free(session, af->uri);
- __wt_free(session, af->config);
- __wt_free(session, af->key_format);
- __wt_free(session, af->value_format);
- __wt_free(session, af);
- }
- __wt_free(session, async->async_queue);
- __wt_free(session, async->async_ops);
- __wt_spin_destroy(session, &async->ops_lock);
- __wt_free(session, conn->async);
-
- return (ret);
+ WT_ASYNC *async;
+ WT_ASYNC_FORMAT *af;
+ WT_ASYNC_OP *op;
+ WT_CONNECTION_IMPL *conn;
+ WT_DECL_RET;
+ WT_SESSION *wt_session;
+ uint32_t i;
+
+ conn = S2C(session);
+ async = conn->async;
+
+ if (!conn->async_cfg)
+ return (0);
+
+ F_CLR(conn, WT_CONN_SERVER_ASYNC);
+ for (i = 0; i < conn->async_workers; i++)
+ WT_TRET(__wt_thread_join(session, &async->worker_tids[i]));
+ __wt_cond_destroy(session, &async->flush_cond);
+
+ /* Close the server threads' sessions. */
+ for (i = 0; i < conn->async_workers; i++)
+ if (async->worker_sessions[i] != NULL) {
+ wt_session = &async->worker_sessions[i]->iface;
+ WT_TRET(wt_session->close(wt_session, NULL));
+ async->worker_sessions[i] = NULL;
+ }
+ /* Free any op key/value buffers. */
+ for (i = 0; i < conn->async_size; i++) {
+ op = (WT_ASYNC_OP *)&async->async_ops[i];
+ if (op->c.key.data != NULL)
+ __wt_buf_free(session, &op->c.key);
+ if (op->c.value.data != NULL)
+ __wt_buf_free(session, &op->c.value);
+ }
+
+ /* Free format resources */
+ while ((af = TAILQ_FIRST(&async->formatqh)) != NULL) {
+ TAILQ_REMOVE(&async->formatqh, af, q);
+ __wt_free(session, af->uri);
+ __wt_free(session, af->config);
+ __wt_free(session, af->key_format);
+ __wt_free(session, af->value_format);
+ __wt_free(session, af);
+ }
+ __wt_free(session, async->async_queue);
+ __wt_free(session, async->async_ops);
+ __wt_spin_destroy(session, &async->ops_lock);
+ __wt_free(session, conn->async);
+
+ return (ret);
}
/*
* __wt_async_flush --
- * Implementation of the WT_CONN->async_flush method.
+ * Implementation of the WT_CONN->async_flush method.
*/
int
__wt_async_flush(WT_SESSION_IMPL *session)
{
- WT_ASYNC *async;
- WT_CONNECTION_IMPL *conn;
- uint32_t i, workers;
-
- conn = S2C(session);
- if (!conn->async_cfg)
- return (0);
-
- async = conn->async;
- /*
- * Only add a flush operation if there are workers who can process
- * it. Otherwise we will wait forever.
- */
- workers = 0;
- for (i = 0; i < conn->async_workers; ++i)
- if (async->worker_tids[i].created)
- ++workers;
- if (workers == 0)
- return (0);
-
- WT_STAT_CONN_INCR(session, async_flush);
- /*
- * We have to do several things. First we have to prevent
- * other callers from racing with us so that only one
- * flush is happening at a time. Next we have to wait for
- * the worker threads to notice the flush and indicate
- * that the flush is complete on their side. Then we
- * clear the flush flags and return.
- */
+ WT_ASYNC *async;
+ WT_CONNECTION_IMPL *conn;
+ uint32_t i, workers;
+
+ conn = S2C(session);
+ if (!conn->async_cfg)
+ return (0);
+
+ async = conn->async;
+ /*
+ * Only add a flush operation if there are workers who can process it. Otherwise we will wait
+ * forever.
+ */
+ workers = 0;
+ for (i = 0; i < conn->async_workers; ++i)
+ if (async->worker_tids[i].created)
+ ++workers;
+ if (workers == 0)
+ return (0);
+
+ WT_STAT_CONN_INCR(session, async_flush);
+/*
+ * We have to do several things. First we have to prevent other callers from racing with us so that
+ * only one flush is happening at a time. Next we have to wait for the worker threads to notice the
+ * flush and indicate that the flush is complete on their side. Then we clear the flush flags and
+ * return.
+ */
retry:
- while (async->flush_state != WT_ASYNC_FLUSH_NONE)
- /*
- * We're racing an in-progress flush. We need to wait
- * our turn to start our own. We need to convoy the
- * racing calls because a later call may be waiting for
- * specific enqueued ops to be complete before this returns.
- */
- __wt_sleep(0, 100000);
-
- if (!__wt_atomic_cas32(&async->flush_state, WT_ASYNC_FLUSH_NONE,
- WT_ASYNC_FLUSH_IN_PROGRESS))
- goto retry;
- /*
- * We're the owner of this flush operation. Set the
- * WT_ASYNC_FLUSH_IN_PROGRESS to block other callers.
- * We're also preventing all worker threads from taking
- * things off the work queue with the lock.
- */
- async->flush_count = 0;
- (void)__wt_atomic_add64(&async->flush_gen, 1);
- WT_ASSERT(session, async->flush_op.state == WT_ASYNCOP_FREE);
- async->flush_op.state = WT_ASYNCOP_READY;
- WT_RET(__wt_async_op_enqueue(session, &async->flush_op));
- while (async->flush_state != WT_ASYNC_FLUSH_COMPLETE)
- __wt_cond_wait(session, async->flush_cond, 100000, NULL);
- /*
- * Flush is done. Clear the flags.
- */
- async->flush_op.state = WT_ASYNCOP_FREE;
- WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSH_NONE);
- return (0);
+ while (async->flush_state != WT_ASYNC_FLUSH_NONE)
+ /*
+ * We're racing an in-progress flush. We need to wait our turn to start our own. We need to
+ * convoy the racing calls because a later call may be waiting for specific enqueued ops to
+ * be complete before this returns.
+ */
+ __wt_sleep(0, 100000);
+
+ if (!__wt_atomic_cas32(&async->flush_state, WT_ASYNC_FLUSH_NONE, WT_ASYNC_FLUSH_IN_PROGRESS))
+ goto retry;
+ /*
+ * We're the owner of this flush operation. Set the WT_ASYNC_FLUSH_IN_PROGRESS to block other
+ * callers. We're also preventing all worker threads from taking things off the work queue with
+ * the lock.
+ */
+ async->flush_count = 0;
+ (void)__wt_atomic_add64(&async->flush_gen, 1);
+ WT_ASSERT(session, async->flush_op.state == WT_ASYNCOP_FREE);
+ async->flush_op.state = WT_ASYNCOP_READY;
+ WT_RET(__wt_async_op_enqueue(session, &async->flush_op));
+ while (async->flush_state != WT_ASYNC_FLUSH_COMPLETE)
+ __wt_cond_wait(session, async->flush_cond, 100000, NULL);
+ /*
+ * Flush is done. Clear the flags.
+ */
+ async->flush_op.state = WT_ASYNCOP_FREE;
+ WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSH_NONE);
+ return (0);
}
/*
* __async_runtime_config --
- * Configure runtime fields at allocation.
+ * Configure runtime fields at allocation.
*/
static int
__async_runtime_config(WT_ASYNC_OP_IMPL *op, const char *cfg[])
{
- WT_ASYNC_OP *asyncop;
- WT_CONFIG_ITEM cval;
- WT_SESSION_IMPL *session;
-
- session = O2S(op);
- asyncop = (WT_ASYNC_OP *)op;
- WT_RET(__wt_config_gets_def(session, cfg, "append", 0, &cval));
- if (cval.val)
- F_SET(&asyncop->c, WT_CURSTD_APPEND);
- else
- F_CLR(&asyncop->c, WT_CURSTD_APPEND);
- WT_RET(__wt_config_gets_def(session, cfg, "overwrite", 1, &cval));
- if (cval.val)
- F_SET(&asyncop->c, WT_CURSTD_OVERWRITE);
- else
- F_CLR(&asyncop->c, WT_CURSTD_OVERWRITE);
- WT_RET(__wt_config_gets_def(session, cfg, "raw", 0, &cval));
- if (cval.val)
- F_SET(&asyncop->c, WT_CURSTD_RAW);
- else
- F_CLR(&asyncop->c, WT_CURSTD_RAW);
- return (0);
-
+ WT_ASYNC_OP *asyncop;
+ WT_CONFIG_ITEM cval;
+ WT_SESSION_IMPL *session;
+
+ session = O2S(op);
+ asyncop = (WT_ASYNC_OP *)op;
+ WT_RET(__wt_config_gets_def(session, cfg, "append", 0, &cval));
+ if (cval.val)
+ F_SET(&asyncop->c, WT_CURSTD_APPEND);
+ else
+ F_CLR(&asyncop->c, WT_CURSTD_APPEND);
+ WT_RET(__wt_config_gets_def(session, cfg, "overwrite", 1, &cval));
+ if (cval.val)
+ F_SET(&asyncop->c, WT_CURSTD_OVERWRITE);
+ else
+ F_CLR(&asyncop->c, WT_CURSTD_OVERWRITE);
+ WT_RET(__wt_config_gets_def(session, cfg, "raw", 0, &cval));
+ if (cval.val)
+ F_SET(&asyncop->c, WT_CURSTD_RAW);
+ else
+ F_CLR(&asyncop->c, WT_CURSTD_RAW);
+ return (0);
}
/*
* __wt_async_new_op --
- * Implementation of the WT_CONN->async_new_op method.
+ * Implementation of the WT_CONN->async_new_op method.
*/
int
-__wt_async_new_op(WT_SESSION_IMPL *session, const char *uri,
- const char *config, WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP_IMPL **opp)
+__wt_async_new_op(WT_SESSION_IMPL *session, const char *uri, const char *config,
+ WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP_IMPL **opp)
{
- WT_ASYNC_OP_IMPL *op;
- WT_CONNECTION_IMPL *conn;
- WT_DECL_RET;
- const char *cfg[] = { S2C(session)->cfg, NULL, NULL };
-
- *opp = NULL;
-
- conn = S2C(session);
- if (!conn->async_cfg)
- WT_RET(__wt_async_create(session, cfg));
- if (!conn->async_cfg)
- WT_RET_MSG(
- session, ENOTSUP, "Asynchronous operations not configured");
-
- op = NULL;
- WT_ERR(__async_new_op_alloc(session, uri, config, &op));
- cfg[1] = config;
- WT_ERR(__async_runtime_config(op, cfg));
- op->cb = cb;
- *opp = op;
- return (0);
+ WT_ASYNC_OP_IMPL *op;
+ WT_CONNECTION_IMPL *conn;
+ WT_DECL_RET;
+ const char *cfg[] = {S2C(session)->cfg, NULL, NULL};
+
+ *opp = NULL;
+
+ conn = S2C(session);
+ if (!conn->async_cfg)
+ WT_RET(__wt_async_create(session, cfg));
+ if (!conn->async_cfg)
+ WT_RET_MSG(session, ENOTSUP, "Asynchronous operations not configured");
+
+ op = NULL;
+ WT_ERR(__async_new_op_alloc(session, uri, config, &op));
+ cfg[1] = config;
+ WT_ERR(__async_runtime_config(op, cfg));
+ op->cb = cb;
+ *opp = op;
+ return (0);
err:
- /*
- * If we get an error after allocating op, set its state to free.
- */
- if (op != NULL)
- op->state = WT_ASYNCOP_FREE;
- return (ret);
+ /*
+ * If we get an error after allocating op, set its state to free.
+ */
+ if (op != NULL)
+ op->state = WT_ASYNCOP_FREE;
+ return (ret);
}
diff --git a/src/third_party/wiredtiger/src/async/async_op.c b/src/third_party/wiredtiger/src/async/async_op.c
index 41cabe0297a..5ba9af81055 100644
--- a/src/third_party/wiredtiger/src/async/async_op.c
+++ b/src/third_party/wiredtiger/src/async/async_op.c
@@ -10,346 +10,345 @@
/*
* __async_get_key --
- * WT_ASYNC_OP->get_key implementation for op handles.
+ * WT_ASYNC_OP->get_key implementation for op handles.
*/
static int
__async_get_key(WT_ASYNC_OP *asyncop, ...)
{
- WT_DECL_RET;
- va_list ap;
+ WT_DECL_RET;
+ va_list ap;
- va_start(ap, asyncop);
- ret = __wt_cursor_get_keyv(&asyncop->c, asyncop->c.flags, ap);
- va_end(ap);
- return (ret);
+ va_start(ap, asyncop);
+ ret = __wt_cursor_get_keyv(&asyncop->c, asyncop->c.flags, ap);
+ va_end(ap);
+ return (ret);
}
/*
* __async_set_key --
- * WT_ASYNC_OP->set_key implementation for op handles.
+ * WT_ASYNC_OP->set_key implementation for op handles.
*/
static void
__async_set_key(WT_ASYNC_OP *asyncop, ...)
{
- WT_CURSOR *c;
- va_list ap;
-
- c = &asyncop->c;
- va_start(ap, asyncop);
- __wt_cursor_set_keyv(c, c->flags, ap);
- if (!WT_DATA_IN_ITEM(&c->key) && !WT_CURSOR_RECNO(c))
- c->saved_err = __wt_buf_set(
- O2S((WT_ASYNC_OP_IMPL *)asyncop),
- &c->key, c->key.data, c->key.size);
- va_end(ap);
+ WT_CURSOR *c;
+ va_list ap;
+
+ c = &asyncop->c;
+ va_start(ap, asyncop);
+ __wt_cursor_set_keyv(c, c->flags, ap);
+ if (!WT_DATA_IN_ITEM(&c->key) && !WT_CURSOR_RECNO(c))
+ c->saved_err =
+ __wt_buf_set(O2S((WT_ASYNC_OP_IMPL *)asyncop), &c->key, c->key.data, c->key.size);
+ va_end(ap);
}
/*
* __async_get_value --
- * WT_ASYNC_OP->get_value implementation for op handles.
+ * WT_ASYNC_OP->get_value implementation for op handles.
*/
static int
__async_get_value(WT_ASYNC_OP *asyncop, ...)
{
- WT_DECL_RET;
- va_list ap;
+ WT_DECL_RET;
+ va_list ap;
- va_start(ap, asyncop);
- ret = __wt_cursor_get_valuev(&asyncop->c, ap);
- va_end(ap);
- return (ret);
+ va_start(ap, asyncop);
+ ret = __wt_cursor_get_valuev(&asyncop->c, ap);
+ va_end(ap);
+ return (ret);
}
/*
* __async_set_value --
- * WT_ASYNC_OP->set_value implementation for op handles.
+ * WT_ASYNC_OP->set_value implementation for op handles.
*/
static void
__async_set_value(WT_ASYNC_OP *asyncop, ...)
{
- WT_CURSOR *c;
- va_list ap;
-
- c = &asyncop->c;
- va_start(ap, asyncop);
- __wt_cursor_set_valuev(c, ap);
- /* Copy the data, if it is pointing at data elsewhere. */
- if (!WT_DATA_IN_ITEM(&c->value))
- c->saved_err = __wt_buf_set(
- O2S((WT_ASYNC_OP_IMPL *)asyncop),
- &c->value, c->value.data, c->value.size);
- va_end(ap);
+ WT_CURSOR *c;
+ va_list ap;
+
+ c = &asyncop->c;
+ va_start(ap, asyncop);
+ __wt_cursor_set_valuev(c, ap);
+ /* Copy the data, if it is pointing at data elsewhere. */
+ if (!WT_DATA_IN_ITEM(&c->value))
+ c->saved_err =
+ __wt_buf_set(O2S((WT_ASYNC_OP_IMPL *)asyncop), &c->value, c->value.data, c->value.size);
+ va_end(ap);
}
/*
* __async_op_wrap --
- * Common wrapper for all async operations.
+ * Common wrapper for all async operations.
*/
static int
__async_op_wrap(WT_ASYNC_OP_IMPL *op, WT_ASYNC_OPTYPE type)
{
- op->optype = type;
- return (__wt_async_op_enqueue(O2S(op), op));
+ op->optype = type;
+ return (__wt_async_op_enqueue(O2S(op), op));
}
/*
* __async_search --
- * WT_ASYNC_OP->search implementation for op handles.
+ * WT_ASYNC_OP->search implementation for op handles.
*/
static int
__async_search(WT_ASYNC_OP *asyncop)
{
- WT_ASYNC_OP_IMPL *op;
- WT_DECL_RET;
- WT_SESSION_IMPL *session;
-
- op = (WT_ASYNC_OP_IMPL *)asyncop;
- ASYNCOP_API_CALL(O2C(op), session, search);
- WT_STAT_CONN_INCR(O2S(op), async_op_search);
- WT_ERR(__async_op_wrap(op, WT_AOP_SEARCH));
-err: API_END_RET(session, ret);
+ WT_ASYNC_OP_IMPL *op;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+
+ op = (WT_ASYNC_OP_IMPL *)asyncop;
+ ASYNCOP_API_CALL(O2C(op), session, search);
+ WT_STAT_CONN_INCR(O2S(op), async_op_search);
+ WT_ERR(__async_op_wrap(op, WT_AOP_SEARCH));
+err:
+ API_END_RET(session, ret);
}
/*
* __async_insert --
- * WT_ASYNC_OP->insert implementation for op handles.
+ * WT_ASYNC_OP->insert implementation for op handles.
*/
static int
__async_insert(WT_ASYNC_OP *asyncop)
{
- WT_ASYNC_OP_IMPL *op;
- WT_DECL_RET;
- WT_SESSION_IMPL *session;
-
- op = (WT_ASYNC_OP_IMPL *)asyncop;
- ASYNCOP_API_CALL(O2C(op), session, insert);
- WT_STAT_CONN_INCR(O2S(op), async_op_insert);
- WT_ERR(__async_op_wrap(op, WT_AOP_INSERT));
-err: API_END_RET(session, ret);
+ WT_ASYNC_OP_IMPL *op;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+
+ op = (WT_ASYNC_OP_IMPL *)asyncop;
+ ASYNCOP_API_CALL(O2C(op), session, insert);
+ WT_STAT_CONN_INCR(O2S(op), async_op_insert);
+ WT_ERR(__async_op_wrap(op, WT_AOP_INSERT));
+err:
+ API_END_RET(session, ret);
}
/*
* __async_update --
- * WT_ASYNC_OP->update implementation for op handles.
+ * WT_ASYNC_OP->update implementation for op handles.
*/
static int
__async_update(WT_ASYNC_OP *asyncop)
{
- WT_ASYNC_OP_IMPL *op;
- WT_DECL_RET;
- WT_SESSION_IMPL *session;
-
- op = (WT_ASYNC_OP_IMPL *)asyncop;
- ASYNCOP_API_CALL(O2C(op), session, update);
- WT_STAT_CONN_INCR(O2S(op), async_op_update);
- WT_ERR(__async_op_wrap(op, WT_AOP_UPDATE));
-err: API_END_RET(session, ret);
+ WT_ASYNC_OP_IMPL *op;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+
+ op = (WT_ASYNC_OP_IMPL *)asyncop;
+ ASYNCOP_API_CALL(O2C(op), session, update);
+ WT_STAT_CONN_INCR(O2S(op), async_op_update);
+ WT_ERR(__async_op_wrap(op, WT_AOP_UPDATE));
+err:
+ API_END_RET(session, ret);
}
/*
* __async_remove --
- * WT_ASYNC_OP->remove implementation for op handles.
+ * WT_ASYNC_OP->remove implementation for op handles.
*/
static int
__async_remove(WT_ASYNC_OP *asyncop)
{
- WT_ASYNC_OP_IMPL *op;
- WT_DECL_RET;
- WT_SESSION_IMPL *session;
-
- op = (WT_ASYNC_OP_IMPL *)asyncop;
- ASYNCOP_API_CALL(O2C(op), session, remove);
- WT_STAT_CONN_INCR(O2S(op), async_op_remove);
- WT_ERR(__async_op_wrap(op, WT_AOP_REMOVE));
-err: API_END_RET(session, ret);
+ WT_ASYNC_OP_IMPL *op;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+
+ op = (WT_ASYNC_OP_IMPL *)asyncop;
+ ASYNCOP_API_CALL(O2C(op), session, remove);
+ WT_STAT_CONN_INCR(O2S(op), async_op_remove);
+ WT_ERR(__async_op_wrap(op, WT_AOP_REMOVE));
+err:
+ API_END_RET(session, ret);
}
/*
* __async_compact --
- * WT_ASYNC_OP->compact implementation for op handles.
+ * WT_ASYNC_OP->compact implementation for op handles.
*/
static int
__async_compact(WT_ASYNC_OP *asyncop)
{
- WT_ASYNC_OP_IMPL *op;
- WT_DECL_RET;
- WT_SESSION_IMPL *session;
-
- op = (WT_ASYNC_OP_IMPL *)asyncop;
- ASYNCOP_API_CALL(O2C(op), session, compact);
- WT_STAT_CONN_INCR(O2S(op), async_op_compact);
- WT_ERR(__async_op_wrap(op, WT_AOP_COMPACT));
-err: API_END_RET(session, ret);
+ WT_ASYNC_OP_IMPL *op;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+
+ op = (WT_ASYNC_OP_IMPL *)asyncop;
+ ASYNCOP_API_CALL(O2C(op), session, compact);
+ WT_STAT_CONN_INCR(O2S(op), async_op_compact);
+ WT_ERR(__async_op_wrap(op, WT_AOP_COMPACT));
+err:
+ API_END_RET(session, ret);
}
/*
* __async_get_id --
- * WT_ASYNC_OP->get_id implementation for op handles.
+ * WT_ASYNC_OP->get_id implementation for op handles.
*/
static uint64_t
__async_get_id(WT_ASYNC_OP *asyncop)
{
- return (((WT_ASYNC_OP_IMPL *)asyncop)->unique_id);
+ return (((WT_ASYNC_OP_IMPL *)asyncop)->unique_id);
}
/*
* __async_get_type --
- * WT_ASYNC_OP->get_type implementation for op handles.
+ * WT_ASYNC_OP->get_type implementation for op handles.
*/
static WT_ASYNC_OPTYPE
__async_get_type(WT_ASYNC_OP *asyncop)
{
- return (((WT_ASYNC_OP_IMPL *)asyncop)->optype);
+ return (((WT_ASYNC_OP_IMPL *)asyncop)->optype);
}
/*
* __async_op_init --
- * Initialize all the op handle fields.
+ * Initialize all the op handle fields.
*/
static void
__async_op_init(WT_CONNECTION_IMPL *conn, WT_ASYNC_OP_IMPL *op, uint32_t id)
{
- WT_ASYNC_OP *asyncop;
-
- asyncop = (WT_ASYNC_OP *)op;
- asyncop->connection = (WT_CONNECTION *)conn;
- asyncop->key_format = asyncop->value_format = NULL;
- asyncop->c.key_format = asyncop->c.value_format = NULL;
- asyncop->get_key = __async_get_key;
- asyncop->get_value = __async_get_value;
- asyncop->set_key = __async_set_key;
- asyncop->set_value = __async_set_value;
- asyncop->search = __async_search;
- asyncop->insert = __async_insert;
- asyncop->update = __async_update;
- asyncop->remove = __async_remove;
- asyncop->compact = __async_compact;
- asyncop->get_id = __async_get_id;
- asyncop->get_type = __async_get_type;
- /*
- * The cursor needs to have the get/set key/value functions initialized.
- * It also needs the key/value related fields set up.
- */
- asyncop->c.get_key = __wt_cursor_get_key;
- asyncop->c.set_key = __wt_cursor_set_key;
- asyncop->c.get_value = __wt_cursor_get_value;
- asyncop->c.set_value = __wt_cursor_set_value;
- asyncop->c.recno = WT_RECNO_OOB;
- memset(asyncop->c.raw_recno_buf, 0, sizeof(asyncop->c.raw_recno_buf));
- memset(&asyncop->c.key, 0, sizeof(asyncop->c.key));
- memset(&asyncop->c.value, 0, sizeof(asyncop->c.value));
- asyncop->c.session = (WT_SESSION *)conn->default_session;
- asyncop->c.saved_err = 0;
- asyncop->c.flags = 0;
-
- op->internal_id = id;
- op->state = WT_ASYNCOP_FREE;
+ WT_ASYNC_OP *asyncop;
+
+ asyncop = (WT_ASYNC_OP *)op;
+ asyncop->connection = (WT_CONNECTION *)conn;
+ asyncop->key_format = asyncop->value_format = NULL;
+ asyncop->c.key_format = asyncop->c.value_format = NULL;
+ asyncop->get_key = __async_get_key;
+ asyncop->get_value = __async_get_value;
+ asyncop->set_key = __async_set_key;
+ asyncop->set_value = __async_set_value;
+ asyncop->search = __async_search;
+ asyncop->insert = __async_insert;
+ asyncop->update = __async_update;
+ asyncop->remove = __async_remove;
+ asyncop->compact = __async_compact;
+ asyncop->get_id = __async_get_id;
+ asyncop->get_type = __async_get_type;
+ /*
+ * The cursor needs to have the get/set key/value functions initialized. It also needs the
+ * key/value related fields set up.
+ */
+ asyncop->c.get_key = __wt_cursor_get_key;
+ asyncop->c.set_key = __wt_cursor_set_key;
+ asyncop->c.get_value = __wt_cursor_get_value;
+ asyncop->c.set_value = __wt_cursor_set_value;
+ asyncop->c.recno = WT_RECNO_OOB;
+ memset(asyncop->c.raw_recno_buf, 0, sizeof(asyncop->c.raw_recno_buf));
+ memset(&asyncop->c.key, 0, sizeof(asyncop->c.key));
+ memset(&asyncop->c.value, 0, sizeof(asyncop->c.value));
+ asyncop->c.session = (WT_SESSION *)conn->default_session;
+ asyncop->c.saved_err = 0;
+ asyncop->c.flags = 0;
+
+ op->internal_id = id;
+ op->state = WT_ASYNCOP_FREE;
}
/*
* __wt_async_op_enqueue --
- * Enqueue an operation onto the work queue.
+ * Enqueue an operation onto the work queue.
*/
int
__wt_async_op_enqueue(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op)
{
- WT_ASYNC *async;
- WT_CONNECTION_IMPL *conn;
- uint64_t cur_head, cur_tail, my_alloc, my_slot;
-#ifdef HAVE_DIAGNOSTIC
- WT_ASYNC_OP_IMPL *my_op;
+ WT_ASYNC *async;
+ WT_CONNECTION_IMPL *conn;
+ uint64_t cur_head, cur_tail, my_alloc, my_slot;
+#ifdef HAVE_DIAGNOSTIC
+ WT_ASYNC_OP_IMPL *my_op;
#endif
- conn = S2C(session);
- async = conn->async;
-
- /*
- * If an application re-uses a WT_ASYNC_OP, we end up here with an
- * invalid object.
- */
- if (op->state != WT_ASYNCOP_READY)
- WT_RET_MSG(session, EINVAL,
- "application error: WT_ASYNC_OP already in use");
-
- /*
- * Enqueue op at the tail of the work queue.
- * We get our slot in the ring buffer to use.
- */
- my_alloc = __wt_atomic_add64(&async->alloc_head, 1);
- my_slot = my_alloc % async->async_qsize;
-
- /*
- * Make sure we haven't wrapped around the queue.
- * If so, wait for the tail to advance off this slot.
- */
- WT_ORDERED_READ(cur_tail, async->tail_slot);
- while (cur_tail == my_slot) {
- __wt_yield();
- WT_ORDERED_READ(cur_tail, async->tail_slot);
- }
-
-#ifdef HAVE_DIAGNOSTIC
- WT_ORDERED_READ(my_op, async->async_queue[my_slot]);
- if (my_op != NULL)
- return (__wt_panic(session));
+ conn = S2C(session);
+ async = conn->async;
+
+ /*
+ * If an application re-uses a WT_ASYNC_OP, we end up here with an invalid object.
+ */
+ if (op->state != WT_ASYNCOP_READY)
+ WT_RET_MSG(session, EINVAL, "application error: WT_ASYNC_OP already in use");
+
+ /*
+ * Enqueue op at the tail of the work queue. We get our slot in the ring buffer to use.
+ */
+ my_alloc = __wt_atomic_add64(&async->alloc_head, 1);
+ my_slot = my_alloc % async->async_qsize;
+
+ /*
+ * Make sure we haven't wrapped around the queue. If so, wait for the tail to advance off this
+ * slot.
+ */
+ WT_ORDERED_READ(cur_tail, async->tail_slot);
+ while (cur_tail == my_slot) {
+ __wt_yield();
+ WT_ORDERED_READ(cur_tail, async->tail_slot);
+ }
+
+#ifdef HAVE_DIAGNOSTIC
+ WT_ORDERED_READ(my_op, async->async_queue[my_slot]);
+ if (my_op != NULL)
+ return (__wt_panic(session));
#endif
- WT_PUBLISH(async->async_queue[my_slot], op);
- op->state = WT_ASYNCOP_ENQUEUED;
- if (__wt_atomic_add32(&async->cur_queue, 1) > async->max_queue)
- WT_PUBLISH(async->max_queue, async->cur_queue);
- /*
- * Multiple threads may be adding ops to the queue. We need to wait
- * our turn to make our slot visible to workers.
- */
- WT_ORDERED_READ(cur_head, async->head);
- while (cur_head != (my_alloc - 1)) {
- __wt_yield();
- WT_ORDERED_READ(cur_head, async->head);
- }
- WT_PUBLISH(async->head, my_alloc);
- return (0);
+ WT_PUBLISH(async->async_queue[my_slot], op);
+ op->state = WT_ASYNCOP_ENQUEUED;
+ if (__wt_atomic_add32(&async->cur_queue, 1) > async->max_queue)
+ WT_PUBLISH(async->max_queue, async->cur_queue);
+ /*
+ * Multiple threads may be adding ops to the queue. We need to wait our turn to make our slot
+ * visible to workers.
+ */
+ WT_ORDERED_READ(cur_head, async->head);
+ while (cur_head != (my_alloc - 1)) {
+ __wt_yield();
+ WT_ORDERED_READ(cur_head, async->head);
+ }
+ WT_PUBLISH(async->head, my_alloc);
+ return (0);
}
/*
* __wt_async_op_init --
- * Initialize all the op handles.
+ * Initialize all the op handles.
*/
int
__wt_async_op_init(WT_SESSION_IMPL *session)
{
- WT_ASYNC *async;
- WT_ASYNC_OP_IMPL *op;
- WT_CONNECTION_IMPL *conn;
- WT_DECL_RET;
- uint32_t i;
-
- conn = S2C(session);
- async = conn->async;
-
- /*
- * Initialize the flush op structure.
- */
- __async_op_init(conn, &async->flush_op, OPS_INVALID_INDEX);
-
- /*
- * Allocate and initialize the work queue. This is sized so that
- * the ring buffer is known to be big enough such that the head
- * can never overlap the tail. Include extra for the flush op.
- */
- async->async_qsize = conn->async_size + 2;
- WT_RET(__wt_calloc_def(
- session, async->async_qsize, &async->async_queue));
- /*
- * Allocate and initialize all the user ops.
- */
- WT_ERR(__wt_calloc_def(session, conn->async_size, &async->async_ops));
- for (i = 0; i < conn->async_size; i++) {
- op = &async->async_ops[i];
- __async_op_init(conn, op, i);
- }
- return (0);
-
-err: __wt_free(session, async->async_ops);
- __wt_free(session, async->async_queue);
- return (ret);
+ WT_ASYNC *async;
+ WT_ASYNC_OP_IMPL *op;
+ WT_CONNECTION_IMPL *conn;
+ WT_DECL_RET;
+ uint32_t i;
+
+ conn = S2C(session);
+ async = conn->async;
+
+ /*
+ * Initialize the flush op structure.
+ */
+ __async_op_init(conn, &async->flush_op, OPS_INVALID_INDEX);
+
+ /*
+ * Allocate and initialize the work queue. This is sized so that the ring buffer is known to be
+ * big enough such that the head can never overlap the tail. Include extra for the flush op.
+ */
+ async->async_qsize = conn->async_size + 2;
+ WT_RET(__wt_calloc_def(session, async->async_qsize, &async->async_queue));
+ /*
+ * Allocate and initialize all the user ops.
+ */
+ WT_ERR(__wt_calloc_def(session, conn->async_size, &async->async_ops));
+ for (i = 0; i < conn->async_size; i++) {
+ op = &async->async_ops[i];
+ __async_op_init(conn, op, i);
+ }
+ return (0);
+
+err:
+ __wt_free(session, async->async_ops);
+ __wt_free(session, async->async_queue);
+ return (ret);
}
diff --git a/src/third_party/wiredtiger/src/async/async_worker.c b/src/third_party/wiredtiger/src/async/async_worker.c
index abb32c5ecd2..8fdd4cba4b4 100644
--- a/src/third_party/wiredtiger/src/async/async_worker.c
+++ b/src/third_party/wiredtiger/src/async/async_worker.c
@@ -10,344 +10,314 @@
/*
* __async_op_dequeue --
- * Wait for work to be available. Then atomically take it off
- * the work queue.
+ * Wait for work to be available. Then atomically take it off the work queue.
*/
static int
-__async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session,
- WT_ASYNC_OP_IMPL **op)
+__async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL **op)
{
- WT_ASYNC *async;
- uint64_t cur_tail, last_consume, my_consume, my_slot, prev_slot;
- uint64_t sleep_usec;
- uint32_t tries;
+ WT_ASYNC *async;
+ uint64_t cur_tail, last_consume, my_consume, my_slot, prev_slot;
+ uint64_t sleep_usec;
+ uint32_t tries;
- *op = NULL;
+ *op = NULL;
- async = conn->async;
- /*
- * Wait for work to do. Work is available when async->head moves.
- * Then grab the slot containing the work. If we lose, try again.
- */
+ async = conn->async;
+/*
+ * Wait for work to do. Work is available when async->head moves. Then grab the slot containing the
+ * work. If we lose, try again.
+ */
retry:
- tries = 0;
- sleep_usec = 100;
- WT_ORDERED_READ(last_consume, async->alloc_tail);
- /*
- * We stay in this loop until there is work to do.
- */
- while (last_consume == async->head &&
- async->flush_state != WT_ASYNC_FLUSHING) {
- WT_STAT_CONN_INCR(session, async_nowork);
- if (++tries < MAX_ASYNC_YIELD)
- /*
- * Initially when we find no work, allow other
- * threads to run.
- */
- __wt_yield();
- else {
- /*
- * If we haven't found work in a while, start sleeping
- * to wait for work to arrive instead of spinning.
- */
- __wt_sleep(0, sleep_usec);
- sleep_usec = WT_MIN(sleep_usec * 2,
- MAX_ASYNC_SLEEP_USECS);
- }
- if (!F_ISSET(session, WT_SESSION_SERVER_ASYNC))
- return (0);
- if (!F_ISSET(conn, WT_CONN_SERVER_ASYNC))
- return (0);
- WT_ORDERED_READ(last_consume, async->alloc_tail);
- }
- if (async->flush_state == WT_ASYNC_FLUSHING)
- return (0);
- /*
- * Try to increment the tail to claim this slot. If we lose
- * a race, try again.
- */
- my_consume = last_consume + 1;
- if (!__wt_atomic_cas64(&async->alloc_tail, last_consume, my_consume))
- goto retry;
- /*
- * This item of work is ours to process. Clear it out of the
- * queue and return.
- */
- my_slot = my_consume % async->async_qsize;
- prev_slot = last_consume % async->async_qsize;
- *op = async->async_queue[my_slot];
- async->async_queue[my_slot] = NULL;
+ tries = 0;
+ sleep_usec = 100;
+ WT_ORDERED_READ(last_consume, async->alloc_tail);
+ /*
+ * We stay in this loop until there is work to do.
+ */
+ while (last_consume == async->head && async->flush_state != WT_ASYNC_FLUSHING) {
+ WT_STAT_CONN_INCR(session, async_nowork);
+ if (++tries < MAX_ASYNC_YIELD)
+ /*
+ * Initially when we find no work, allow other threads to run.
+ */
+ __wt_yield();
+ else {
+ /*
+ * If we haven't found work in a while, start sleeping to wait for work to arrive
+ * instead of spinning.
+ */
+ __wt_sleep(0, sleep_usec);
+ sleep_usec = WT_MIN(sleep_usec * 2, MAX_ASYNC_SLEEP_USECS);
+ }
+ if (!F_ISSET(session, WT_SESSION_SERVER_ASYNC))
+ return (0);
+ if (!F_ISSET(conn, WT_CONN_SERVER_ASYNC))
+ return (0);
+ WT_ORDERED_READ(last_consume, async->alloc_tail);
+ }
+ if (async->flush_state == WT_ASYNC_FLUSHING)
+ return (0);
+ /*
+ * Try to increment the tail to claim this slot. If we lose a race, try again.
+ */
+ my_consume = last_consume + 1;
+ if (!__wt_atomic_cas64(&async->alloc_tail, last_consume, my_consume))
+ goto retry;
+ /*
+ * This item of work is ours to process. Clear it out of the queue and return.
+ */
+ my_slot = my_consume % async->async_qsize;
+ prev_slot = last_consume % async->async_qsize;
+ *op = async->async_queue[my_slot];
+ async->async_queue[my_slot] = NULL;
- WT_ASSERT(session, async->cur_queue > 0);
- WT_ASSERT(session, *op != NULL);
- WT_ASSERT(session, (*op)->state == WT_ASYNCOP_ENQUEUED);
- (void)__wt_atomic_sub32(&async->cur_queue, 1);
- (*op)->state = WT_ASYNCOP_WORKING;
+ WT_ASSERT(session, async->cur_queue > 0);
+ WT_ASSERT(session, *op != NULL);
+ WT_ASSERT(session, (*op)->state == WT_ASYNCOP_ENQUEUED);
+ (void)__wt_atomic_sub32(&async->cur_queue, 1);
+ (*op)->state = WT_ASYNCOP_WORKING;
- if (*op == &async->flush_op)
- /*
- * We're the worker to take the flush op off the queue.
- */
- WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSHING);
- WT_ORDERED_READ(cur_tail, async->tail_slot);
- while (cur_tail != prev_slot) {
- __wt_yield();
- WT_ORDERED_READ(cur_tail, async->tail_slot);
- }
- WT_PUBLISH(async->tail_slot, my_slot);
- return (0);
+ if (*op == &async->flush_op)
+ /*
+ * We're the worker to take the flush op off the queue.
+ */
+ WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSHING);
+ WT_ORDERED_READ(cur_tail, async->tail_slot);
+ while (cur_tail != prev_slot) {
+ __wt_yield();
+ WT_ORDERED_READ(cur_tail, async->tail_slot);
+ }
+ WT_PUBLISH(async->tail_slot, my_slot);
+ return (0);
}
/*
* __async_flush_wait --
- * Wait for the final worker to finish flushing.
+ * Wait for the final worker to finish flushing.
*/
static void
__async_flush_wait(WT_SESSION_IMPL *session, WT_ASYNC *async, uint64_t my_gen)
{
- while (async->flush_state == WT_ASYNC_FLUSHING &&
- async->flush_gen == my_gen) {
- __wt_cond_wait(session, async->flush_cond, 10000, NULL);
- WT_BARRIER();
- }
+ while (async->flush_state == WT_ASYNC_FLUSHING && async->flush_gen == my_gen) {
+ __wt_cond_wait(session, async->flush_cond, 10000, NULL);
+ WT_BARRIER();
+ }
}
/*
* __async_worker_cursor --
- * Return a cursor for the worker thread to use for its op.
- * The worker thread caches cursors. So first search for one
- * with the same config/uri signature. Otherwise open a new
- * cursor and cache it.
+ * Return a cursor for the worker thread to use for its op. The worker thread caches cursors. So
+ * first search for one with the same config/uri signature. Otherwise open a new cursor and
+ * cache it.
*/
static int
-__async_worker_cursor(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op,
- WT_ASYNC_WORKER_STATE *worker, WT_CURSOR **cursorp)
+__async_worker_cursor(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, WT_ASYNC_WORKER_STATE *worker,
+ WT_CURSOR **cursorp)
{
- WT_ASYNC_CURSOR *ac;
- WT_CURSOR *c;
- WT_DECL_RET;
- WT_SESSION *wt_session;
+ WT_ASYNC_CURSOR *ac;
+ WT_CURSOR *c;
+ WT_DECL_RET;
+ WT_SESSION *wt_session;
- *cursorp = NULL;
+ *cursorp = NULL;
- wt_session = (WT_SESSION *)session;
- /*
- * Compact doesn't need a cursor.
- */
- if (op->optype == WT_AOP_COMPACT)
- return (0);
- WT_ASSERT(session, op->format != NULL);
- TAILQ_FOREACH(ac, &worker->cursorqh, q) {
- if (op->format->cfg_hash == ac->cfg_hash &&
- op->format->uri_hash == ac->uri_hash) {
- /*
- * If one of our cached cursors has a matching
- * signature, use it and we're done.
- */
- *cursorp = ac->c;
- return (0);
- }
- }
- /*
- * We didn't find one in our cache. Open one and cache it.
- * Insert it at the head expecting LRU usage.
- */
- WT_RET(__wt_calloc_one(session, &ac));
- WT_ERR(wt_session->open_cursor(
- wt_session, op->format->uri, NULL, op->format->config, &c));
- ac->cfg_hash = op->format->cfg_hash;
- ac->uri_hash = op->format->uri_hash;
- ac->c = c;
- TAILQ_INSERT_HEAD(&worker->cursorqh, ac, q);
- worker->num_cursors++;
- *cursorp = c;
- return (0);
+ wt_session = (WT_SESSION *)session;
+ /*
+ * Compact doesn't need a cursor.
+ */
+ if (op->optype == WT_AOP_COMPACT)
+ return (0);
+ WT_ASSERT(session, op->format != NULL);
+ TAILQ_FOREACH (ac, &worker->cursorqh, q) {
+ if (op->format->cfg_hash == ac->cfg_hash && op->format->uri_hash == ac->uri_hash) {
+ /*
+ * If one of our cached cursors has a matching signature, use it and we're done.
+ */
+ *cursorp = ac->c;
+ return (0);
+ }
+ }
+ /*
+ * We didn't find one in our cache. Open one and cache it. Insert it at the head expecting LRU
+ * usage.
+ */
+ WT_RET(__wt_calloc_one(session, &ac));
+ WT_ERR(wt_session->open_cursor(wt_session, op->format->uri, NULL, op->format->config, &c));
+ ac->cfg_hash = op->format->cfg_hash;
+ ac->uri_hash = op->format->uri_hash;
+ ac->c = c;
+ TAILQ_INSERT_HEAD(&worker->cursorqh, ac, q);
+ worker->num_cursors++;
+ *cursorp = c;
+ return (0);
-err: __wt_free(session, ac);
- return (ret);
+err:
+ __wt_free(session, ac);
+ return (ret);
}
/*
* __async_worker_execop --
- * A worker thread executes an individual op with a cursor.
+ * A worker thread executes an individual op with a cursor.
*/
static int
-__async_worker_execop(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op,
- WT_CURSOR *cursor)
+__async_worker_execop(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, WT_CURSOR *cursor)
{
- WT_ASYNC_OP *asyncop;
- WT_ITEM val;
- WT_SESSION *wt_session;
+ WT_ASYNC_OP *asyncop;
+ WT_ITEM val;
+ WT_SESSION *wt_session;
- asyncop = (WT_ASYNC_OP *)op;
- /*
- * Set the key of our local cursor from the async op handle.
- * If needed, also set the value.
- */
- if (op->optype != WT_AOP_COMPACT) {
- WT_RET(__wt_cursor_get_raw_key(&asyncop->c, &val));
- __wt_cursor_set_raw_key(cursor, &val);
- if (op->optype == WT_AOP_INSERT ||
- op->optype == WT_AOP_UPDATE) {
- WT_RET(__wt_cursor_get_raw_value(&asyncop->c, &val));
- __wt_cursor_set_raw_value(cursor, &val);
- }
- }
- switch (op->optype) {
- case WT_AOP_COMPACT:
- wt_session = &session->iface;
- WT_RET(wt_session->compact(wt_session,
- op->format->uri, op->format->config));
- break;
- case WT_AOP_INSERT:
- WT_RET(cursor->insert(cursor));
- break;
- case WT_AOP_UPDATE:
- WT_RET(cursor->update(cursor));
- break;
- case WT_AOP_REMOVE:
- WT_RET(cursor->remove(cursor));
- break;
- case WT_AOP_SEARCH:
- WT_RET(cursor->search(cursor));
- /*
- * Get the value from the cursor and put it into
- * the op for op->get_value.
- */
- WT_RET(__wt_cursor_get_raw_value(cursor, &val));
- __wt_cursor_set_raw_value(&asyncop->c, &val);
- break;
- case WT_AOP_NONE:
- WT_RET_MSG(session, EINVAL,
- "Unknown async optype %d", (int)op->optype);
- }
- return (0);
+ asyncop = (WT_ASYNC_OP *)op;
+ /*
+ * Set the key of our local cursor from the async op handle. If needed, also set the value.
+ */
+ if (op->optype != WT_AOP_COMPACT) {
+ WT_RET(__wt_cursor_get_raw_key(&asyncop->c, &val));
+ __wt_cursor_set_raw_key(cursor, &val);
+ if (op->optype == WT_AOP_INSERT || op->optype == WT_AOP_UPDATE) {
+ WT_RET(__wt_cursor_get_raw_value(&asyncop->c, &val));
+ __wt_cursor_set_raw_value(cursor, &val);
+ }
+ }
+ switch (op->optype) {
+ case WT_AOP_COMPACT:
+ wt_session = &session->iface;
+ WT_RET(wt_session->compact(wt_session, op->format->uri, op->format->config));
+ break;
+ case WT_AOP_INSERT:
+ WT_RET(cursor->insert(cursor));
+ break;
+ case WT_AOP_UPDATE:
+ WT_RET(cursor->update(cursor));
+ break;
+ case WT_AOP_REMOVE:
+ WT_RET(cursor->remove(cursor));
+ break;
+ case WT_AOP_SEARCH:
+ WT_RET(cursor->search(cursor));
+ /*
+ * Get the value from the cursor and put it into the op for op->get_value.
+ */
+ WT_RET(__wt_cursor_get_raw_value(cursor, &val));
+ __wt_cursor_set_raw_value(&asyncop->c, &val);
+ break;
+ case WT_AOP_NONE:
+ WT_RET_MSG(session, EINVAL, "Unknown async optype %d", (int)op->optype);
+ }
+ return (0);
}
/*
* __async_worker_op --
- * A worker thread handles an individual op.
+ * A worker thread handles an individual op.
*/
static int
-__async_worker_op(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op,
- WT_ASYNC_WORKER_STATE *worker)
+__async_worker_op(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, WT_ASYNC_WORKER_STATE *worker)
{
- WT_ASYNC_OP *asyncop;
- WT_CURSOR *cursor;
- WT_DECL_RET;
- WT_SESSION *wt_session;
- int cb_ret;
+ WT_ASYNC_OP *asyncop;
+ WT_CURSOR *cursor;
+ WT_DECL_RET;
+ WT_SESSION *wt_session;
+ int cb_ret;
- asyncop = (WT_ASYNC_OP *)op;
+ asyncop = (WT_ASYNC_OP *)op;
- cb_ret = 0;
+ cb_ret = 0;
- wt_session = &session->iface;
- if (op->optype != WT_AOP_COMPACT)
- WT_RET(wt_session->begin_transaction(wt_session, NULL));
- WT_ASSERT(session, op->state == WT_ASYNCOP_WORKING);
- WT_RET(__async_worker_cursor(session, op, worker, &cursor));
- /*
- * Perform op and invoke the callback.
- */
- ret = __async_worker_execop(session, op, cursor);
- if (op->cb != NULL && op->cb->notify != NULL)
- cb_ret = op->cb->notify(op->cb, asyncop, ret, 0);
+ wt_session = &session->iface;
+ if (op->optype != WT_AOP_COMPACT)
+ WT_RET(wt_session->begin_transaction(wt_session, NULL));
+ WT_ASSERT(session, op->state == WT_ASYNCOP_WORKING);
+ WT_RET(__async_worker_cursor(session, op, worker, &cursor));
+ /*
+ * Perform op and invoke the callback.
+ */
+ ret = __async_worker_execop(session, op, cursor);
+ if (op->cb != NULL && op->cb->notify != NULL)
+ cb_ret = op->cb->notify(op->cb, asyncop, ret, 0);
- /*
- * If the operation succeeded and the user callback returned
- * zero then commit. Otherwise rollback.
- */
- if (op->optype != WT_AOP_COMPACT) {
- if ((ret == 0 || ret == WT_NOTFOUND) && cb_ret == 0)
- WT_TRET(wt_session->commit_transaction(
- wt_session, NULL));
- else
- WT_TRET(wt_session->rollback_transaction(
- wt_session, NULL));
- F_CLR(&asyncop->c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
- WT_TRET(cursor->reset(cursor));
- }
- /*
- * After the callback returns, and the transaction resolved release
- * the op back to the free pool. We do this regardless of
- * success or failure.
- */
- WT_PUBLISH(op->state, WT_ASYNCOP_FREE);
- return (ret);
+ /*
+ * If the operation succeeded and the user callback returned zero then commit. Otherwise
+ * rollback.
+ */
+ if (op->optype != WT_AOP_COMPACT) {
+ if ((ret == 0 || ret == WT_NOTFOUND) && cb_ret == 0)
+ WT_TRET(wt_session->commit_transaction(wt_session, NULL));
+ else
+ WT_TRET(wt_session->rollback_transaction(wt_session, NULL));
+ F_CLR(&asyncop->c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET);
+ WT_TRET(cursor->reset(cursor));
+ }
+ /*
+ * After the callback returns, and the transaction resolved release the op back to the free
+ * pool. We do this regardless of success or failure.
+ */
+ WT_PUBLISH(op->state, WT_ASYNCOP_FREE);
+ return (ret);
}
/*
* __wt_async_worker --
- * The async worker threads.
+ * The async worker threads.
*/
WT_THREAD_RET
__wt_async_worker(void *arg)
{
- WT_ASYNC *async;
- WT_ASYNC_CURSOR *ac;
- WT_ASYNC_OP_IMPL *op;
- WT_ASYNC_WORKER_STATE worker;
- WT_CONNECTION_IMPL *conn;
- WT_DECL_RET;
- WT_SESSION_IMPL *session;
- uint64_t flush_gen;
+ WT_ASYNC *async;
+ WT_ASYNC_CURSOR *ac;
+ WT_ASYNC_OP_IMPL *op;
+ WT_ASYNC_WORKER_STATE worker;
+ WT_CONNECTION_IMPL *conn;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ uint64_t flush_gen;
- session = arg;
- conn = S2C(session);
- async = conn->async;
+ session = arg;
+ conn = S2C(session);
+ async = conn->async;
- worker.num_cursors = 0;
- TAILQ_INIT(&worker.cursorqh);
- while (F_ISSET(conn, WT_CONN_SERVER_ASYNC) &&
- F_ISSET(session, WT_SESSION_SERVER_ASYNC)) {
- WT_ERR(__async_op_dequeue(conn, session, &op));
- if (op != NULL && op != &async->flush_op) {
- /*
- * Operation failure doesn't cause the worker thread to
- * exit.
- */
- (void)__async_worker_op(session, op, &worker);
- } else if (async->flush_state == WT_ASYNC_FLUSHING) {
- /*
- * Worker flushing going on. Last worker to the party
- * needs to clear the FLUSHING flag and signal the cond.
- * If FLUSHING is going on, we do not take anything off
- * the queue.
- */
- WT_ORDERED_READ(flush_gen, async->flush_gen);
- if (__wt_atomic_add32(&async->flush_count, 1) ==
- conn->async_workers) {
- /*
- * We're last. All workers accounted for so
- * signal the condition and clear the FLUSHING
- * flag to release the other worker threads.
- * Set the FLUSH_COMPLETE flag so that the
- * caller can return to the application.
- */
- WT_PUBLISH(async->flush_state,
- WT_ASYNC_FLUSH_COMPLETE);
- __wt_cond_signal(session, async->flush_cond);
- } else
- /*
- * We need to wait for the last worker to
- * signal the condition.
- */
- __async_flush_wait(session, async, flush_gen);
- }
- }
+ worker.num_cursors = 0;
+ TAILQ_INIT(&worker.cursorqh);
+ while (F_ISSET(conn, WT_CONN_SERVER_ASYNC) && F_ISSET(session, WT_SESSION_SERVER_ASYNC)) {
+ WT_ERR(__async_op_dequeue(conn, session, &op));
+ if (op != NULL && op != &async->flush_op) {
+ /*
+ * Operation failure doesn't cause the worker thread to exit.
+ */
+ (void)__async_worker_op(session, op, &worker);
+ } else if (async->flush_state == WT_ASYNC_FLUSHING) {
+ /*
+ * Worker flushing going on. Last worker to the party needs to clear the FLUSHING flag
+ * and signal the cond. If FLUSHING is going on, we do not take anything off the queue.
+ */
+ WT_ORDERED_READ(flush_gen, async->flush_gen);
+ if (__wt_atomic_add32(&async->flush_count, 1) == conn->async_workers) {
+ /*
+ * We're last. All workers accounted for so signal the condition and clear the
+ * FLUSHING flag to release the other worker threads. Set the FLUSH_COMPLETE flag so
+ * that the caller can return to the application.
+ */
+ WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSH_COMPLETE);
+ __wt_cond_signal(session, async->flush_cond);
+ } else
+ /*
+ * We need to wait for the last worker to signal the condition.
+ */
+ __async_flush_wait(session, async, flush_gen);
+ }
+ }
- if (0) {
-err: WT_PANIC_MSG(session, ret, "async worker error");
- }
- /*
- * Worker thread cleanup, close our cached cursors and free all the
- * WT_ASYNC_CURSOR structures.
- */
- while ((ac = TAILQ_FIRST(&worker.cursorqh)) != NULL) {
- TAILQ_REMOVE(&worker.cursorqh, ac, q);
- WT_TRET(ac->c->close(ac->c));
- __wt_free(session, ac);
- }
- return (WT_THREAD_RET_VALUE);
+ if (0) {
+err:
+ WT_PANIC_MSG(session, ret, "async worker error");
+ }
+ /*
+ * Worker thread cleanup, close our cached cursors and free all the WT_ASYNC_CURSOR structures.
+ */
+ while ((ac = TAILQ_FIRST(&worker.cursorqh)) != NULL) {
+ TAILQ_REMOVE(&worker.cursorqh, ac, q);
+ WT_TRET(ac->c->close(ac->c));
+ __wt_free(session, ac);
+ }
+ return (WT_THREAD_RET_VALUE);
}