diff options
author | Luke Chen <luke.chen@mongodb.com> | 2019-09-09 14:45:50 +1000 |
---|---|---|
committer | Luke Chen <luke.chen@mongodb.com> | 2019-09-09 14:48:15 +1000 |
commit | d1c52b7aafac44e16dd62ce94663eab1aa16f921 (patch) | |
tree | 2aea018150b745b0fc7ef3dc626c4ef4164d2704 /src/third_party/wiredtiger/src/async | |
parent | b326fd656716e95418e563ff12368a3015994b5e (diff) | |
download | mongo-d1c52b7aafac44e16dd62ce94663eab1aa16f921.tar.gz |
Import wiredtiger: c600bde20363629405082a3ea985b70dfb00850e from branch mongodb-4.2
ref: 280c572c80..c600bde203
for: 4.2.1
WT-4535 Enhance wt command line utility to use read-only mode
WT-4658 Apply Clang Format
WT-4810 Adding WT_ERR_ASSERT and WT_RET_ASSERT macros
WT-4831 Add option to python tests to not fail if stdout is not empty
WT-4884 Test for recovery correctness with modify operations
WT-4935 Add a perf test to find out the wiredtiger_calc_modify overhead
WT-4966 Fix valgrind detected memory leak from test/csuite/import/smoke.sh
WT-5043 Add debugging to aid in test/format hang
WT-5046 Prepared transactions aren't properly cleared from global table with WT_CONN_LOG_DEBUG_MODE enabled
WT-5062 Adjust the record size to consume less size
WT-5063 Return proper error message for cursor modify operation for not supported cursor types
WT-5074 Fix "make check" on exotic architectures
WT-5076 Cut WiredTiger 3.2.1 release
WT-5080 New wtperf workload modify_distribute option
WT-5085 Fix spacing in wtperf output
WT-5087 Add time tracking at start of reconciliation
WT-5088 Refine clearing and setting of debug timer
WT-5100 Update test format to disable readonly mode in utility
Diffstat (limited to 'src/third_party/wiredtiger/src/async')
-rw-r--r-- | src/third_party/wiredtiger/src/async/async_api.c | 991 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/async/async_op.c | 441 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/async/async_worker.c | 538 |
3 files changed, 959 insertions, 1011 deletions
diff --git a/src/third_party/wiredtiger/src/async/async_api.c b/src/third_party/wiredtiger/src/async/async_api.c index 0ef85b8cd28..81b23b238e7 100644 --- a/src/third_party/wiredtiger/src/async/async_api.c +++ b/src/third_party/wiredtiger/src/async/async_api.c @@ -10,602 +10,581 @@ /* * __async_get_format -- - * Find or allocate the uri/config/format structure. + * Find or allocate the uri/config/format structure. */ static int -__async_get_format(WT_CONNECTION_IMPL *conn, const char *uri, - const char *config, WT_ASYNC_OP_IMPL *op) +__async_get_format( + WT_CONNECTION_IMPL *conn, const char *uri, const char *config, WT_ASYNC_OP_IMPL *op) { - WT_ASYNC *async; - WT_ASYNC_FORMAT *af; - WT_CURSOR *c; - WT_DECL_RET; - WT_SESSION *wt_session; - WT_SESSION_IMPL *session; - uint64_t cfg_hash, uri_hash; - - async = conn->async; - c = NULL; - op->format = NULL; - - if (uri != NULL) - uri_hash = __wt_hash_city64(uri, strlen(uri)); - else - uri_hash = 0; - if (config != NULL) - cfg_hash = __wt_hash_city64(config, strlen(config)); - else - cfg_hash = 0; - - /* - * We don't need to hold a lock around this walk. The list is - * permanent and always valid. We might race an insert and there - * is a possibility a duplicate entry might be inserted, but - * that is not harmful. - */ - TAILQ_FOREACH(af, &async->formatqh, q) { - if (af->uri_hash == uri_hash && af->cfg_hash == cfg_hash) - goto setup; - } - /* - * We didn't find one in the cache. Allocate and initialize one. - * Insert it at the head expecting LRU usage. We need a real session - * for the cursor. - */ - WT_RET(__wt_open_internal_session( - conn, "async-cursor", true, 0, &session)); - __wt_spin_lock(session, &async->ops_lock); - WT_ERR(__wt_calloc_one(session, &af)); - WT_ERR(__wt_strdup(session, uri, &af->uri)); - WT_ERR(__wt_strdup(session, config, &af->config)); - af->uri_hash = uri_hash; - af->cfg_hash = cfg_hash; - /* - * Get the key_format and value_format for this URI and store - * it in the structure so that async->set_key/value work. - */ - wt_session = &session->iface; - WT_ERR(wt_session->open_cursor(wt_session, uri, NULL, NULL, &c)); - WT_ERR(__wt_strdup(session, c->key_format, &af->key_format)); - WT_ERR(__wt_strdup(session, c->value_format, &af->value_format)); - WT_ERR(c->close(c)); - c = NULL; - - TAILQ_INSERT_HEAD(&async->formatqh, af, q); - __wt_spin_unlock(session, &async->ops_lock); - WT_ERR(wt_session->close(wt_session, NULL)); - -setup: op->format = af; - /* - * Copy the pointers for the formats. Items in the async format - * queue remain there until the connection is closed. We must - * initialize the format fields in the async_op, which are publicly - * visible, and its internal cursor used by internal key/value - * functions. - */ - op->iface.c.key_format = op->iface.key_format = af->key_format; - op->iface.c.value_format = op->iface.value_format = af->value_format; - return (0); + WT_ASYNC *async; + WT_ASYNC_FORMAT *af; + WT_CURSOR *c; + WT_DECL_RET; + WT_SESSION *wt_session; + WT_SESSION_IMPL *session; + uint64_t cfg_hash, uri_hash; + + async = conn->async; + c = NULL; + op->format = NULL; + + if (uri != NULL) + uri_hash = __wt_hash_city64(uri, strlen(uri)); + else + uri_hash = 0; + if (config != NULL) + cfg_hash = __wt_hash_city64(config, strlen(config)); + else + cfg_hash = 0; + + /* + * We don't need to hold a lock around this walk. The list is permanent and always valid. We + * might race an insert and there is a possibility a duplicate entry might be inserted, but that + * is not harmful. + */ + TAILQ_FOREACH (af, &async->formatqh, q) { + if (af->uri_hash == uri_hash && af->cfg_hash == cfg_hash) + goto setup; + } + /* + * We didn't find one in the cache. Allocate and initialize one. Insert it at the head expecting + * LRU usage. We need a real session for the cursor. + */ + WT_RET(__wt_open_internal_session(conn, "async-cursor", true, 0, &session)); + __wt_spin_lock(session, &async->ops_lock); + WT_ERR(__wt_calloc_one(session, &af)); + WT_ERR(__wt_strdup(session, uri, &af->uri)); + WT_ERR(__wt_strdup(session, config, &af->config)); + af->uri_hash = uri_hash; + af->cfg_hash = cfg_hash; + /* + * Get the key_format and value_format for this URI and store it in the structure so that + * async->set_key/value work. + */ + wt_session = &session->iface; + WT_ERR(wt_session->open_cursor(wt_session, uri, NULL, NULL, &c)); + WT_ERR(__wt_strdup(session, c->key_format, &af->key_format)); + WT_ERR(__wt_strdup(session, c->value_format, &af->value_format)); + WT_ERR(c->close(c)); + c = NULL; + + TAILQ_INSERT_HEAD(&async->formatqh, af, q); + __wt_spin_unlock(session, &async->ops_lock); + WT_ERR(wt_session->close(wt_session, NULL)); + +setup: + op->format = af; + /* + * Copy the pointers for the formats. Items in the async format queue remain there until the + * connection is closed. We must initialize the format fields in the async_op, which are + * publicly visible, and its internal cursor used by internal key/value functions. + */ + op->iface.c.key_format = op->iface.key_format = af->key_format; + op->iface.c.value_format = op->iface.value_format = af->value_format; + return (0); err: - if (c != NULL) - WT_TRET(c->close(c)); - __wt_free(session, af->uri); - __wt_free(session, af->config); - __wt_free(session, af->key_format); - __wt_free(session, af->value_format); - __wt_free(session, af); - return (ret); + if (c != NULL) + WT_TRET(c->close(c)); + __wt_free(session, af->uri); + __wt_free(session, af->config); + __wt_free(session, af->key_format); + __wt_free(session, af->value_format); + __wt_free(session, af); + return (ret); } /* * __async_new_op_alloc -- - * Find and allocate the next available async op handle. + * Find and allocate the next available async op handle. */ static int -__async_new_op_alloc(WT_SESSION_IMPL *session, const char *uri, - const char *config, WT_ASYNC_OP_IMPL **opp) +__async_new_op_alloc( + WT_SESSION_IMPL *session, const char *uri, const char *config, WT_ASYNC_OP_IMPL **opp) { - WT_ASYNC *async; - WT_ASYNC_OP_IMPL *op; - WT_CONNECTION_IMPL *conn; - uint32_t i, save_i, view; + WT_ASYNC *async; + WT_ASYNC_OP_IMPL *op; + WT_CONNECTION_IMPL *conn; + uint32_t i, save_i, view; - *opp = NULL; + *opp = NULL; - conn = S2C(session); - async = conn->async; - WT_STAT_CONN_INCR(session, async_op_alloc); + conn = S2C(session); + async = conn->async; + WT_STAT_CONN_INCR(session, async_op_alloc); retry: - op = NULL; - WT_ORDERED_READ(save_i, async->ops_index); - /* - * Look after the last one allocated for a free one. We'd expect - * ops to be freed mostly FIFO so we should quickly find one. - */ - for (view = 1, i = save_i; i < conn->async_size; i++, view++) { - op = &async->async_ops[i]; - if (op->state == WT_ASYNCOP_FREE) - break; - } - - /* - * Loop around back to the beginning if we need to. - */ - if (op == NULL || op->state != WT_ASYNCOP_FREE) - for (i = 0; i < save_i; i++, view++) { - op = &async->async_ops[i]; - if (op->state == WT_ASYNCOP_FREE) - break; - } - - /* - * We still haven't found one. Return an error. - */ - if (op == NULL || op->state != WT_ASYNCOP_FREE) { - WT_STAT_CONN_INCR(session, async_full); - return (__wt_set_return(session, EBUSY)); - } - /* - * Set the state of this op handle as READY for the user to use. - * If we can set the state then the op entry is ours. - * Start the next search at the next entry after this one. - */ - if (!__wt_atomic_cas32(&op->state, WT_ASYNCOP_FREE, WT_ASYNCOP_READY)) { - WT_STAT_CONN_INCR(session, async_alloc_race); - goto retry; - } - WT_STAT_CONN_INCRV(session, async_alloc_view, view); - WT_RET(__async_get_format(conn, uri, config, op)); - op->unique_id = __wt_atomic_add64(&async->op_id, 1); - op->optype = WT_AOP_NONE; - async->ops_index = (i + 1) % conn->async_size; - *opp = op; - return (0); + op = NULL; + WT_ORDERED_READ(save_i, async->ops_index); + /* + * Look after the last one allocated for a free one. We'd expect ops to be freed mostly FIFO so + * we should quickly find one. + */ + for (view = 1, i = save_i; i < conn->async_size; i++, view++) { + op = &async->async_ops[i]; + if (op->state == WT_ASYNCOP_FREE) + break; + } + + /* + * Loop around back to the beginning if we need to. + */ + if (op == NULL || op->state != WT_ASYNCOP_FREE) + for (i = 0; i < save_i; i++, view++) { + op = &async->async_ops[i]; + if (op->state == WT_ASYNCOP_FREE) + break; + } + + /* + * We still haven't found one. Return an error. + */ + if (op == NULL || op->state != WT_ASYNCOP_FREE) { + WT_STAT_CONN_INCR(session, async_full); + return (__wt_set_return(session, EBUSY)); + } + /* + * Set the state of this op handle as READY for the user to use. If we can set the state then + * the op entry is ours. Start the next search at the next entry after this one. + */ + if (!__wt_atomic_cas32(&op->state, WT_ASYNCOP_FREE, WT_ASYNCOP_READY)) { + WT_STAT_CONN_INCR(session, async_alloc_race); + goto retry; + } + WT_STAT_CONN_INCRV(session, async_alloc_view, view); + WT_RET(__async_get_format(conn, uri, config, op)); + op->unique_id = __wt_atomic_add64(&async->op_id, 1); + op->optype = WT_AOP_NONE; + async->ops_index = (i + 1) % conn->async_size; + *opp = op; + return (0); } /* * __async_config -- - * Parse and setup the async API options. + * Parse and setup the async API options. */ static int -__async_config(WT_SESSION_IMPL *session, - WT_CONNECTION_IMPL *conn, const char **cfg, bool *runp) +__async_config(WT_SESSION_IMPL *session, WT_CONNECTION_IMPL *conn, const char **cfg, bool *runp) { - WT_CONFIG_ITEM cval; - - /* - * The async configuration is off by default. - */ - WT_RET(__wt_config_gets(session, cfg, "async.enabled", &cval)); - *runp = cval.val != 0; - - /* - * Even if async is turned off, we want to parse and store the default - * values so that reconfigure can just enable them. - * - * Bound the minimum maximum operations at 10. - */ - WT_RET(__wt_config_gets(session, cfg, "async.ops_max", &cval)); - conn->async_size = (uint32_t)WT_MAX(cval.val, 10); - - WT_RET(__wt_config_gets(session, cfg, "async.threads", &cval)); - conn->async_workers = (uint32_t)cval.val; - /* Sanity check that api_data.py is in sync with async.h */ - WT_ASSERT(session, conn->async_workers <= WT_ASYNC_MAX_WORKERS); - - return (0); + WT_CONFIG_ITEM cval; + + /* + * The async configuration is off by default. + */ + WT_RET(__wt_config_gets(session, cfg, "async.enabled", &cval)); + *runp = cval.val != 0; + + /* + * Even if async is turned off, we want to parse and store the default + * values so that reconfigure can just enable them. + * + * Bound the minimum maximum operations at 10. + */ + WT_RET(__wt_config_gets(session, cfg, "async.ops_max", &cval)); + conn->async_size = (uint32_t)WT_MAX(cval.val, 10); + + WT_RET(__wt_config_gets(session, cfg, "async.threads", &cval)); + conn->async_workers = (uint32_t)cval.val; + /* Sanity check that api_data.py is in sync with async.h */ + WT_ASSERT(session, conn->async_workers <= WT_ASYNC_MAX_WORKERS); + + return (0); } /* * __wt_async_stats_update -- - * Update the async stats for return to the application. + * Update the async stats for return to the application. */ void __wt_async_stats_update(WT_SESSION_IMPL *session) { - WT_ASYNC *async; - WT_CONNECTION_IMPL *conn; - WT_CONNECTION_STATS **stats; - - conn = S2C(session); - async = conn->async; - if (async == NULL) - return; - stats = conn->stats; - WT_STAT_SET(session, stats, async_cur_queue, async->cur_queue); - WT_STAT_SET(session, stats, async_max_queue, async->max_queue); + WT_ASYNC *async; + WT_CONNECTION_IMPL *conn; + WT_CONNECTION_STATS **stats; + + conn = S2C(session); + async = conn->async; + if (async == NULL) + return; + stats = conn->stats; + WT_STAT_SET(session, stats, async_cur_queue, async->cur_queue); + WT_STAT_SET(session, stats, async_max_queue, async->max_queue); } /* * __async_start -- - * Start the async subsystem. All configuration processing has - * already been done by the caller. + * Start the async subsystem. All configuration processing has already been done by the caller. */ static int __async_start(WT_SESSION_IMPL *session) { - WT_ASYNC *async; - WT_CONNECTION_IMPL *conn; - uint32_t i, session_flags; - - conn = S2C(session); - conn->async_cfg = true; - /* - * Async is on, allocate the WT_ASYNC structure and initialize the ops. - */ - WT_RET(__wt_calloc_one(session, &conn->async)); - async = conn->async; - TAILQ_INIT(&async->formatqh); - WT_RET(__wt_spin_init(session, &async->ops_lock, "ops")); - WT_RET(__wt_cond_alloc(session, "async flush", &async->flush_cond)); - WT_RET(__wt_async_op_init(session)); - - /* - * Start up the worker threads. - */ - F_SET(conn, WT_CONN_SERVER_ASYNC); - for (i = 0; i < conn->async_workers; i++) { - /* - * Each worker has its own session. We set both a general - * server flag in the connection and an individual flag - * in the session. The user may reconfigure the number of - * workers and we may want to selectively stop some workers - * while leaving the rest running. - */ - session_flags = WT_SESSION_SERVER_ASYNC; - WT_RET(__wt_open_internal_session(conn, "async-worker", - true, session_flags, &async->worker_sessions[i])); - } - for (i = 0; i < conn->async_workers; i++) { - /* - * Start the threads. - */ - WT_RET(__wt_thread_create(session, &async->worker_tids[i], - __wt_async_worker, async->worker_sessions[i])); - } - __wt_async_stats_update(session); - return (0); + WT_ASYNC *async; + WT_CONNECTION_IMPL *conn; + uint32_t i, session_flags; + + conn = S2C(session); + conn->async_cfg = true; + /* + * Async is on, allocate the WT_ASYNC structure and initialize the ops. + */ + WT_RET(__wt_calloc_one(session, &conn->async)); + async = conn->async; + TAILQ_INIT(&async->formatqh); + WT_RET(__wt_spin_init(session, &async->ops_lock, "ops")); + WT_RET(__wt_cond_alloc(session, "async flush", &async->flush_cond)); + WT_RET(__wt_async_op_init(session)); + + /* + * Start up the worker threads. + */ + F_SET(conn, WT_CONN_SERVER_ASYNC); + for (i = 0; i < conn->async_workers; i++) { + /* + * Each worker has its own session. We set both a general server flag in the connection and + * an individual flag in the session. The user may reconfigure the number of workers and we + * may want to selectively stop some workers while leaving the rest running. + */ + session_flags = WT_SESSION_SERVER_ASYNC; + WT_RET(__wt_open_internal_session( + conn, "async-worker", true, session_flags, &async->worker_sessions[i])); + } + for (i = 0; i < conn->async_workers; i++) { + /* + * Start the threads. + */ + WT_RET(__wt_thread_create( + session, &async->worker_tids[i], __wt_async_worker, async->worker_sessions[i])); + } + __wt_async_stats_update(session); + return (0); } /* * __wt_async_create -- - * Start the async subsystem and worker threads. + * Start the async subsystem and worker threads. */ int __wt_async_create(WT_SESSION_IMPL *session, const char *cfg[]) { - WT_CONNECTION_IMPL *conn; - bool run; + WT_CONNECTION_IMPL *conn; + bool run; - conn = S2C(session); + conn = S2C(session); - /* Handle configuration. */ - run = false; - WT_RET(__async_config(session, conn, cfg, &run)); + /* Handle configuration. */ + run = false; + WT_RET(__async_config(session, conn, cfg, &run)); - /* If async is not configured, we're done. */ - if (!run) - return (0); - return (__async_start(session)); + /* If async is not configured, we're done. */ + if (!run) + return (0); + return (__async_start(session)); } /* * __wt_async_reconfig -- - * Start the async subsystem and worker threads. + * Start the async subsystem and worker threads. */ int __wt_async_reconfig(WT_SESSION_IMPL *session, const char *cfg[]) { - WT_ASYNC *async; - WT_CONNECTION_IMPL *conn, tmp_conn; - WT_DECL_RET; - WT_SESSION *wt_session; - uint32_t i, session_flags; - bool run; - - conn = S2C(session); - async = conn->async; - memset(&tmp_conn, 0, sizeof(tmp_conn)); - tmp_conn.async_cfg = conn->async_cfg; - tmp_conn.async_workers = conn->async_workers; - tmp_conn.async_size = conn->async_size; - - /* Handle configuration. */ - run = conn->async_cfg; - WT_RET(__async_config(session, &tmp_conn, cfg, &run)); - - /* - * There are some restrictions on the live reconfiguration of async. - * Unlike other subsystems where we simply destroy anything existing - * and restart with the new configuration, async is not so easy. - * If the user is just changing the number of workers, we want to - * allow the existing op handles and other information to remain in - * existence. So we must handle various combinations of changes - * individually. - * - * One restriction is that if async is currently on, the user cannot - * change the number of async op handles available. The user can try - * but we do nothing with it. However we must allow the ops_max config - * string so that a user can completely start async via reconfigure. - */ - - /* - * Easy cases: - * 1. If async is on and the user wants it off, shut it down. - * 2. If async is off, and the user wants it on, start it. - * 3. If not a toggle and async is off, we're done. - */ - if (conn->async_cfg && !run) { /* Case 1 */ - WT_TRET(__wt_async_flush(session)); - ret = __wt_async_destroy(session); - conn->async_cfg = false; - return (ret); - } - if (!conn->async_cfg && run) /* Case 2 */ - return (__wt_async_create(session, cfg)); - if (!conn->async_cfg) /* Case 3 */ - return (0); - - /* - * Running async worker modification cases: - * 4. If number of workers didn't change, we're done. - * 5. If more workers, start new ones. - * 6. If fewer workers, kill some. - */ - if (conn->async_workers == tmp_conn.async_workers) - /* No change in the number of workers. */ - return (0); - if (conn->async_workers < tmp_conn.async_workers) { - /* Case 5 */ - /* - * The worker_sessions array is allocated for the maximum - * allowed number of workers, so starting more is easy. - */ - for (i = conn->async_workers; i < tmp_conn.async_workers; i++) { - /* - * Each worker has its own session. - */ - session_flags = WT_SESSION_SERVER_ASYNC; - WT_RET(__wt_open_internal_session(conn, "async-worker", - true, session_flags, &async->worker_sessions[i])); - } - for (i = conn->async_workers; i < tmp_conn.async_workers; i++) { - /* - * Start the threads. - */ - WT_RET(__wt_thread_create(session, - &async->worker_tids[i], __wt_async_worker, - async->worker_sessions[i])); - } - conn->async_workers = tmp_conn.async_workers; - } - if (conn->async_workers > tmp_conn.async_workers) { - /* Case 6 */ - /* - * Stopping an individual async worker is the most complex case. - * We clear the session async flag on the targeted worker thread - * so that only that thread stops, and the others keep running. - */ - for (i = conn->async_workers - 1; - i >= tmp_conn.async_workers; i--) { - /* - * Join any worker we're stopping. - * After the thread is stopped, close its session. - */ - WT_ASSERT(session, async->worker_tids[i].created); - WT_ASSERT(session, async->worker_sessions[i] != NULL); - F_CLR(async->worker_sessions[i], - WT_SESSION_SERVER_ASYNC); - WT_TRET(__wt_thread_join( - session, &async->worker_tids[i])); - wt_session = &async->worker_sessions[i]->iface; - WT_TRET(wt_session->close(wt_session, NULL)); - async->worker_sessions[i] = NULL; - } - conn->async_workers = tmp_conn.async_workers; - } - - return (0); + WT_ASYNC *async; + WT_CONNECTION_IMPL *conn, tmp_conn; + WT_DECL_RET; + WT_SESSION *wt_session; + uint32_t i, session_flags; + bool run; + + conn = S2C(session); + async = conn->async; + memset(&tmp_conn, 0, sizeof(tmp_conn)); + tmp_conn.async_cfg = conn->async_cfg; + tmp_conn.async_workers = conn->async_workers; + tmp_conn.async_size = conn->async_size; + + /* Handle configuration. */ + run = conn->async_cfg; + WT_RET(__async_config(session, &tmp_conn, cfg, &run)); + + /* + * There are some restrictions on the live reconfiguration of async. + * Unlike other subsystems where we simply destroy anything existing + * and restart with the new configuration, async is not so easy. + * If the user is just changing the number of workers, we want to + * allow the existing op handles and other information to remain in + * existence. So we must handle various combinations of changes + * individually. + * + * One restriction is that if async is currently on, the user cannot + * change the number of async op handles available. The user can try + * but we do nothing with it. However we must allow the ops_max config + * string so that a user can completely start async via reconfigure. + */ + + /* + * Easy cases: + * 1. If async is on and the user wants it off, shut it down. + * 2. If async is off, and the user wants it on, start it. + * 3. If not a toggle and async is off, we're done. + */ + if (conn->async_cfg && !run) { /* Case 1 */ + WT_TRET(__wt_async_flush(session)); + ret = __wt_async_destroy(session); + conn->async_cfg = false; + return (ret); + } + if (!conn->async_cfg && run) /* Case 2 */ + return (__wt_async_create(session, cfg)); + if (!conn->async_cfg) /* Case 3 */ + return (0); + + /* + * Running async worker modification cases: + * 4. If number of workers didn't change, we're done. + * 5. If more workers, start new ones. + * 6. If fewer workers, kill some. + */ + if (conn->async_workers == tmp_conn.async_workers) + /* No change in the number of workers. */ + return (0); + if (conn->async_workers < tmp_conn.async_workers) { + /* Case 5 */ + /* + * The worker_sessions array is allocated for the maximum allowed number of workers, so + * starting more is easy. + */ + for (i = conn->async_workers; i < tmp_conn.async_workers; i++) { + /* + * Each worker has its own session. + */ + session_flags = WT_SESSION_SERVER_ASYNC; + WT_RET(__wt_open_internal_session( + conn, "async-worker", true, session_flags, &async->worker_sessions[i])); + } + for (i = conn->async_workers; i < tmp_conn.async_workers; i++) { + /* + * Start the threads. + */ + WT_RET(__wt_thread_create( + session, &async->worker_tids[i], __wt_async_worker, async->worker_sessions[i])); + } + conn->async_workers = tmp_conn.async_workers; + } + if (conn->async_workers > tmp_conn.async_workers) { + /* Case 6 */ + /* + * Stopping an individual async worker is the most complex case. We clear the session async + * flag on the targeted worker thread so that only that thread stops, and the others keep + * running. + */ + for (i = conn->async_workers - 1; i >= tmp_conn.async_workers; i--) { + /* + * Join any worker we're stopping. After the thread is stopped, close its session. + */ + WT_ASSERT(session, async->worker_tids[i].created); + WT_ASSERT(session, async->worker_sessions[i] != NULL); + F_CLR(async->worker_sessions[i], WT_SESSION_SERVER_ASYNC); + WT_TRET(__wt_thread_join(session, &async->worker_tids[i])); + wt_session = &async->worker_sessions[i]->iface; + WT_TRET(wt_session->close(wt_session, NULL)); + async->worker_sessions[i] = NULL; + } + conn->async_workers = tmp_conn.async_workers; + } + + return (0); } /* * __wt_async_destroy -- - * Destroy the async worker threads and async subsystem. + * Destroy the async worker threads and async subsystem. */ int __wt_async_destroy(WT_SESSION_IMPL *session) { - WT_ASYNC *async; - WT_ASYNC_FORMAT *af; - WT_ASYNC_OP *op; - WT_CONNECTION_IMPL *conn; - WT_DECL_RET; - WT_SESSION *wt_session; - uint32_t i; - - conn = S2C(session); - async = conn->async; - - if (!conn->async_cfg) - return (0); - - F_CLR(conn, WT_CONN_SERVER_ASYNC); - for (i = 0; i < conn->async_workers; i++) - WT_TRET(__wt_thread_join(session, &async->worker_tids[i])); - __wt_cond_destroy(session, &async->flush_cond); - - /* Close the server threads' sessions. */ - for (i = 0; i < conn->async_workers; i++) - if (async->worker_sessions[i] != NULL) { - wt_session = &async->worker_sessions[i]->iface; - WT_TRET(wt_session->close(wt_session, NULL)); - async->worker_sessions[i] = NULL; - } - /* Free any op key/value buffers. */ - for (i = 0; i < conn->async_size; i++) { - op = (WT_ASYNC_OP *)&async->async_ops[i]; - if (op->c.key.data != NULL) - __wt_buf_free(session, &op->c.key); - if (op->c.value.data != NULL) - __wt_buf_free(session, &op->c.value); - } - - /* Free format resources */ - while ((af = TAILQ_FIRST(&async->formatqh)) != NULL) { - TAILQ_REMOVE(&async->formatqh, af, q); - __wt_free(session, af->uri); - __wt_free(session, af->config); - __wt_free(session, af->key_format); - __wt_free(session, af->value_format); - __wt_free(session, af); - } - __wt_free(session, async->async_queue); - __wt_free(session, async->async_ops); - __wt_spin_destroy(session, &async->ops_lock); - __wt_free(session, conn->async); - - return (ret); + WT_ASYNC *async; + WT_ASYNC_FORMAT *af; + WT_ASYNC_OP *op; + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + WT_SESSION *wt_session; + uint32_t i; + + conn = S2C(session); + async = conn->async; + + if (!conn->async_cfg) + return (0); + + F_CLR(conn, WT_CONN_SERVER_ASYNC); + for (i = 0; i < conn->async_workers; i++) + WT_TRET(__wt_thread_join(session, &async->worker_tids[i])); + __wt_cond_destroy(session, &async->flush_cond); + + /* Close the server threads' sessions. */ + for (i = 0; i < conn->async_workers; i++) + if (async->worker_sessions[i] != NULL) { + wt_session = &async->worker_sessions[i]->iface; + WT_TRET(wt_session->close(wt_session, NULL)); + async->worker_sessions[i] = NULL; + } + /* Free any op key/value buffers. */ + for (i = 0; i < conn->async_size; i++) { + op = (WT_ASYNC_OP *)&async->async_ops[i]; + if (op->c.key.data != NULL) + __wt_buf_free(session, &op->c.key); + if (op->c.value.data != NULL) + __wt_buf_free(session, &op->c.value); + } + + /* Free format resources */ + while ((af = TAILQ_FIRST(&async->formatqh)) != NULL) { + TAILQ_REMOVE(&async->formatqh, af, q); + __wt_free(session, af->uri); + __wt_free(session, af->config); + __wt_free(session, af->key_format); + __wt_free(session, af->value_format); + __wt_free(session, af); + } + __wt_free(session, async->async_queue); + __wt_free(session, async->async_ops); + __wt_spin_destroy(session, &async->ops_lock); + __wt_free(session, conn->async); + + return (ret); } /* * __wt_async_flush -- - * Implementation of the WT_CONN->async_flush method. + * Implementation of the WT_CONN->async_flush method. */ int __wt_async_flush(WT_SESSION_IMPL *session) { - WT_ASYNC *async; - WT_CONNECTION_IMPL *conn; - uint32_t i, workers; - - conn = S2C(session); - if (!conn->async_cfg) - return (0); - - async = conn->async; - /* - * Only add a flush operation if there are workers who can process - * it. Otherwise we will wait forever. - */ - workers = 0; - for (i = 0; i < conn->async_workers; ++i) - if (async->worker_tids[i].created) - ++workers; - if (workers == 0) - return (0); - - WT_STAT_CONN_INCR(session, async_flush); - /* - * We have to do several things. First we have to prevent - * other callers from racing with us so that only one - * flush is happening at a time. Next we have to wait for - * the worker threads to notice the flush and indicate - * that the flush is complete on their side. Then we - * clear the flush flags and return. - */ + WT_ASYNC *async; + WT_CONNECTION_IMPL *conn; + uint32_t i, workers; + + conn = S2C(session); + if (!conn->async_cfg) + return (0); + + async = conn->async; + /* + * Only add a flush operation if there are workers who can process it. Otherwise we will wait + * forever. + */ + workers = 0; + for (i = 0; i < conn->async_workers; ++i) + if (async->worker_tids[i].created) + ++workers; + if (workers == 0) + return (0); + + WT_STAT_CONN_INCR(session, async_flush); +/* + * We have to do several things. First we have to prevent other callers from racing with us so that + * only one flush is happening at a time. Next we have to wait for the worker threads to notice the + * flush and indicate that the flush is complete on their side. Then we clear the flush flags and + * return. + */ retry: - while (async->flush_state != WT_ASYNC_FLUSH_NONE) - /* - * We're racing an in-progress flush. We need to wait - * our turn to start our own. We need to convoy the - * racing calls because a later call may be waiting for - * specific enqueued ops to be complete before this returns. - */ - __wt_sleep(0, 100000); - - if (!__wt_atomic_cas32(&async->flush_state, WT_ASYNC_FLUSH_NONE, - WT_ASYNC_FLUSH_IN_PROGRESS)) - goto retry; - /* - * We're the owner of this flush operation. Set the - * WT_ASYNC_FLUSH_IN_PROGRESS to block other callers. - * We're also preventing all worker threads from taking - * things off the work queue with the lock. - */ - async->flush_count = 0; - (void)__wt_atomic_add64(&async->flush_gen, 1); - WT_ASSERT(session, async->flush_op.state == WT_ASYNCOP_FREE); - async->flush_op.state = WT_ASYNCOP_READY; - WT_RET(__wt_async_op_enqueue(session, &async->flush_op)); - while (async->flush_state != WT_ASYNC_FLUSH_COMPLETE) - __wt_cond_wait(session, async->flush_cond, 100000, NULL); - /* - * Flush is done. Clear the flags. - */ - async->flush_op.state = WT_ASYNCOP_FREE; - WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSH_NONE); - return (0); + while (async->flush_state != WT_ASYNC_FLUSH_NONE) + /* + * We're racing an in-progress flush. We need to wait our turn to start our own. We need to + * convoy the racing calls because a later call may be waiting for specific enqueued ops to + * be complete before this returns. + */ + __wt_sleep(0, 100000); + + if (!__wt_atomic_cas32(&async->flush_state, WT_ASYNC_FLUSH_NONE, WT_ASYNC_FLUSH_IN_PROGRESS)) + goto retry; + /* + * We're the owner of this flush operation. Set the WT_ASYNC_FLUSH_IN_PROGRESS to block other + * callers. We're also preventing all worker threads from taking things off the work queue with + * the lock. + */ + async->flush_count = 0; + (void)__wt_atomic_add64(&async->flush_gen, 1); + WT_ASSERT(session, async->flush_op.state == WT_ASYNCOP_FREE); + async->flush_op.state = WT_ASYNCOP_READY; + WT_RET(__wt_async_op_enqueue(session, &async->flush_op)); + while (async->flush_state != WT_ASYNC_FLUSH_COMPLETE) + __wt_cond_wait(session, async->flush_cond, 100000, NULL); + /* + * Flush is done. Clear the flags. + */ + async->flush_op.state = WT_ASYNCOP_FREE; + WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSH_NONE); + return (0); } /* * __async_runtime_config -- - * Configure runtime fields at allocation. + * Configure runtime fields at allocation. */ static int __async_runtime_config(WT_ASYNC_OP_IMPL *op, const char *cfg[]) { - WT_ASYNC_OP *asyncop; - WT_CONFIG_ITEM cval; - WT_SESSION_IMPL *session; - - session = O2S(op); - asyncop = (WT_ASYNC_OP *)op; - WT_RET(__wt_config_gets_def(session, cfg, "append", 0, &cval)); - if (cval.val) - F_SET(&asyncop->c, WT_CURSTD_APPEND); - else - F_CLR(&asyncop->c, WT_CURSTD_APPEND); - WT_RET(__wt_config_gets_def(session, cfg, "overwrite", 1, &cval)); - if (cval.val) - F_SET(&asyncop->c, WT_CURSTD_OVERWRITE); - else - F_CLR(&asyncop->c, WT_CURSTD_OVERWRITE); - WT_RET(__wt_config_gets_def(session, cfg, "raw", 0, &cval)); - if (cval.val) - F_SET(&asyncop->c, WT_CURSTD_RAW); - else - F_CLR(&asyncop->c, WT_CURSTD_RAW); - return (0); - + WT_ASYNC_OP *asyncop; + WT_CONFIG_ITEM cval; + WT_SESSION_IMPL *session; + + session = O2S(op); + asyncop = (WT_ASYNC_OP *)op; + WT_RET(__wt_config_gets_def(session, cfg, "append", 0, &cval)); + if (cval.val) + F_SET(&asyncop->c, WT_CURSTD_APPEND); + else + F_CLR(&asyncop->c, WT_CURSTD_APPEND); + WT_RET(__wt_config_gets_def(session, cfg, "overwrite", 1, &cval)); + if (cval.val) + F_SET(&asyncop->c, WT_CURSTD_OVERWRITE); + else + F_CLR(&asyncop->c, WT_CURSTD_OVERWRITE); + WT_RET(__wt_config_gets_def(session, cfg, "raw", 0, &cval)); + if (cval.val) + F_SET(&asyncop->c, WT_CURSTD_RAW); + else + F_CLR(&asyncop->c, WT_CURSTD_RAW); + return (0); } /* * __wt_async_new_op -- - * Implementation of the WT_CONN->async_new_op method. + * Implementation of the WT_CONN->async_new_op method. */ int -__wt_async_new_op(WT_SESSION_IMPL *session, const char *uri, - const char *config, WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP_IMPL **opp) +__wt_async_new_op(WT_SESSION_IMPL *session, const char *uri, const char *config, + WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP_IMPL **opp) { - WT_ASYNC_OP_IMPL *op; - WT_CONNECTION_IMPL *conn; - WT_DECL_RET; - const char *cfg[] = { S2C(session)->cfg, NULL, NULL }; - - *opp = NULL; - - conn = S2C(session); - if (!conn->async_cfg) - WT_RET(__wt_async_create(session, cfg)); - if (!conn->async_cfg) - WT_RET_MSG( - session, ENOTSUP, "Asynchronous operations not configured"); - - op = NULL; - WT_ERR(__async_new_op_alloc(session, uri, config, &op)); - cfg[1] = config; - WT_ERR(__async_runtime_config(op, cfg)); - op->cb = cb; - *opp = op; - return (0); + WT_ASYNC_OP_IMPL *op; + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + const char *cfg[] = {S2C(session)->cfg, NULL, NULL}; + + *opp = NULL; + + conn = S2C(session); + if (!conn->async_cfg) + WT_RET(__wt_async_create(session, cfg)); + if (!conn->async_cfg) + WT_RET_MSG(session, ENOTSUP, "Asynchronous operations not configured"); + + op = NULL; + WT_ERR(__async_new_op_alloc(session, uri, config, &op)); + cfg[1] = config; + WT_ERR(__async_runtime_config(op, cfg)); + op->cb = cb; + *opp = op; + return (0); err: - /* - * If we get an error after allocating op, set its state to free. - */ - if (op != NULL) - op->state = WT_ASYNCOP_FREE; - return (ret); + /* + * If we get an error after allocating op, set its state to free. + */ + if (op != NULL) + op->state = WT_ASYNCOP_FREE; + return (ret); } diff --git a/src/third_party/wiredtiger/src/async/async_op.c b/src/third_party/wiredtiger/src/async/async_op.c index 41cabe0297a..5ba9af81055 100644 --- a/src/third_party/wiredtiger/src/async/async_op.c +++ b/src/third_party/wiredtiger/src/async/async_op.c @@ -10,346 +10,345 @@ /* * __async_get_key -- - * WT_ASYNC_OP->get_key implementation for op handles. + * WT_ASYNC_OP->get_key implementation for op handles. */ static int __async_get_key(WT_ASYNC_OP *asyncop, ...) { - WT_DECL_RET; - va_list ap; + WT_DECL_RET; + va_list ap; - va_start(ap, asyncop); - ret = __wt_cursor_get_keyv(&asyncop->c, asyncop->c.flags, ap); - va_end(ap); - return (ret); + va_start(ap, asyncop); + ret = __wt_cursor_get_keyv(&asyncop->c, asyncop->c.flags, ap); + va_end(ap); + return (ret); } /* * __async_set_key -- - * WT_ASYNC_OP->set_key implementation for op handles. + * WT_ASYNC_OP->set_key implementation for op handles. */ static void __async_set_key(WT_ASYNC_OP *asyncop, ...) { - WT_CURSOR *c; - va_list ap; - - c = &asyncop->c; - va_start(ap, asyncop); - __wt_cursor_set_keyv(c, c->flags, ap); - if (!WT_DATA_IN_ITEM(&c->key) && !WT_CURSOR_RECNO(c)) - c->saved_err = __wt_buf_set( - O2S((WT_ASYNC_OP_IMPL *)asyncop), - &c->key, c->key.data, c->key.size); - va_end(ap); + WT_CURSOR *c; + va_list ap; + + c = &asyncop->c; + va_start(ap, asyncop); + __wt_cursor_set_keyv(c, c->flags, ap); + if (!WT_DATA_IN_ITEM(&c->key) && !WT_CURSOR_RECNO(c)) + c->saved_err = + __wt_buf_set(O2S((WT_ASYNC_OP_IMPL *)asyncop), &c->key, c->key.data, c->key.size); + va_end(ap); } /* * __async_get_value -- - * WT_ASYNC_OP->get_value implementation for op handles. + * WT_ASYNC_OP->get_value implementation for op handles. */ static int __async_get_value(WT_ASYNC_OP *asyncop, ...) { - WT_DECL_RET; - va_list ap; + WT_DECL_RET; + va_list ap; - va_start(ap, asyncop); - ret = __wt_cursor_get_valuev(&asyncop->c, ap); - va_end(ap); - return (ret); + va_start(ap, asyncop); + ret = __wt_cursor_get_valuev(&asyncop->c, ap); + va_end(ap); + return (ret); } /* * __async_set_value -- - * WT_ASYNC_OP->set_value implementation for op handles. + * WT_ASYNC_OP->set_value implementation for op handles. */ static void __async_set_value(WT_ASYNC_OP *asyncop, ...) { - WT_CURSOR *c; - va_list ap; - - c = &asyncop->c; - va_start(ap, asyncop); - __wt_cursor_set_valuev(c, ap); - /* Copy the data, if it is pointing at data elsewhere. */ - if (!WT_DATA_IN_ITEM(&c->value)) - c->saved_err = __wt_buf_set( - O2S((WT_ASYNC_OP_IMPL *)asyncop), - &c->value, c->value.data, c->value.size); - va_end(ap); + WT_CURSOR *c; + va_list ap; + + c = &asyncop->c; + va_start(ap, asyncop); + __wt_cursor_set_valuev(c, ap); + /* Copy the data, if it is pointing at data elsewhere. */ + if (!WT_DATA_IN_ITEM(&c->value)) + c->saved_err = + __wt_buf_set(O2S((WT_ASYNC_OP_IMPL *)asyncop), &c->value, c->value.data, c->value.size); + va_end(ap); } /* * __async_op_wrap -- - * Common wrapper for all async operations. + * Common wrapper for all async operations. */ static int __async_op_wrap(WT_ASYNC_OP_IMPL *op, WT_ASYNC_OPTYPE type) { - op->optype = type; - return (__wt_async_op_enqueue(O2S(op), op)); + op->optype = type; + return (__wt_async_op_enqueue(O2S(op), op)); } /* * __async_search -- - * WT_ASYNC_OP->search implementation for op handles. + * WT_ASYNC_OP->search implementation for op handles. */ static int __async_search(WT_ASYNC_OP *asyncop) { - WT_ASYNC_OP_IMPL *op; - WT_DECL_RET; - WT_SESSION_IMPL *session; - - op = (WT_ASYNC_OP_IMPL *)asyncop; - ASYNCOP_API_CALL(O2C(op), session, search); - WT_STAT_CONN_INCR(O2S(op), async_op_search); - WT_ERR(__async_op_wrap(op, WT_AOP_SEARCH)); -err: API_END_RET(session, ret); + WT_ASYNC_OP_IMPL *op; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + op = (WT_ASYNC_OP_IMPL *)asyncop; + ASYNCOP_API_CALL(O2C(op), session, search); + WT_STAT_CONN_INCR(O2S(op), async_op_search); + WT_ERR(__async_op_wrap(op, WT_AOP_SEARCH)); +err: + API_END_RET(session, ret); } /* * __async_insert -- - * WT_ASYNC_OP->insert implementation for op handles. + * WT_ASYNC_OP->insert implementation for op handles. */ static int __async_insert(WT_ASYNC_OP *asyncop) { - WT_ASYNC_OP_IMPL *op; - WT_DECL_RET; - WT_SESSION_IMPL *session; - - op = (WT_ASYNC_OP_IMPL *)asyncop; - ASYNCOP_API_CALL(O2C(op), session, insert); - WT_STAT_CONN_INCR(O2S(op), async_op_insert); - WT_ERR(__async_op_wrap(op, WT_AOP_INSERT)); -err: API_END_RET(session, ret); + WT_ASYNC_OP_IMPL *op; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + op = (WT_ASYNC_OP_IMPL *)asyncop; + ASYNCOP_API_CALL(O2C(op), session, insert); + WT_STAT_CONN_INCR(O2S(op), async_op_insert); + WT_ERR(__async_op_wrap(op, WT_AOP_INSERT)); +err: + API_END_RET(session, ret); } /* * __async_update -- - * WT_ASYNC_OP->update implementation for op handles. + * WT_ASYNC_OP->update implementation for op handles. */ static int __async_update(WT_ASYNC_OP *asyncop) { - WT_ASYNC_OP_IMPL *op; - WT_DECL_RET; - WT_SESSION_IMPL *session; - - op = (WT_ASYNC_OP_IMPL *)asyncop; - ASYNCOP_API_CALL(O2C(op), session, update); - WT_STAT_CONN_INCR(O2S(op), async_op_update); - WT_ERR(__async_op_wrap(op, WT_AOP_UPDATE)); -err: API_END_RET(session, ret); + WT_ASYNC_OP_IMPL *op; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + op = (WT_ASYNC_OP_IMPL *)asyncop; + ASYNCOP_API_CALL(O2C(op), session, update); + WT_STAT_CONN_INCR(O2S(op), async_op_update); + WT_ERR(__async_op_wrap(op, WT_AOP_UPDATE)); +err: + API_END_RET(session, ret); } /* * __async_remove -- - * WT_ASYNC_OP->remove implementation for op handles. + * WT_ASYNC_OP->remove implementation for op handles. */ static int __async_remove(WT_ASYNC_OP *asyncop) { - WT_ASYNC_OP_IMPL *op; - WT_DECL_RET; - WT_SESSION_IMPL *session; - - op = (WT_ASYNC_OP_IMPL *)asyncop; - ASYNCOP_API_CALL(O2C(op), session, remove); - WT_STAT_CONN_INCR(O2S(op), async_op_remove); - WT_ERR(__async_op_wrap(op, WT_AOP_REMOVE)); -err: API_END_RET(session, ret); + WT_ASYNC_OP_IMPL *op; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + op = (WT_ASYNC_OP_IMPL *)asyncop; + ASYNCOP_API_CALL(O2C(op), session, remove); + WT_STAT_CONN_INCR(O2S(op), async_op_remove); + WT_ERR(__async_op_wrap(op, WT_AOP_REMOVE)); +err: + API_END_RET(session, ret); } /* * __async_compact -- - * WT_ASYNC_OP->compact implementation for op handles. + * WT_ASYNC_OP->compact implementation for op handles. */ static int __async_compact(WT_ASYNC_OP *asyncop) { - WT_ASYNC_OP_IMPL *op; - WT_DECL_RET; - WT_SESSION_IMPL *session; - - op = (WT_ASYNC_OP_IMPL *)asyncop; - ASYNCOP_API_CALL(O2C(op), session, compact); - WT_STAT_CONN_INCR(O2S(op), async_op_compact); - WT_ERR(__async_op_wrap(op, WT_AOP_COMPACT)); -err: API_END_RET(session, ret); + WT_ASYNC_OP_IMPL *op; + WT_DECL_RET; + WT_SESSION_IMPL *session; + + op = (WT_ASYNC_OP_IMPL *)asyncop; + ASYNCOP_API_CALL(O2C(op), session, compact); + WT_STAT_CONN_INCR(O2S(op), async_op_compact); + WT_ERR(__async_op_wrap(op, WT_AOP_COMPACT)); +err: + API_END_RET(session, ret); } /* * __async_get_id -- - * WT_ASYNC_OP->get_id implementation for op handles. + * WT_ASYNC_OP->get_id implementation for op handles. */ static uint64_t __async_get_id(WT_ASYNC_OP *asyncop) { - return (((WT_ASYNC_OP_IMPL *)asyncop)->unique_id); + return (((WT_ASYNC_OP_IMPL *)asyncop)->unique_id); } /* * __async_get_type -- - * WT_ASYNC_OP->get_type implementation for op handles. + * WT_ASYNC_OP->get_type implementation for op handles. */ static WT_ASYNC_OPTYPE __async_get_type(WT_ASYNC_OP *asyncop) { - return (((WT_ASYNC_OP_IMPL *)asyncop)->optype); + return (((WT_ASYNC_OP_IMPL *)asyncop)->optype); } /* * __async_op_init -- - * Initialize all the op handle fields. + * Initialize all the op handle fields. */ static void __async_op_init(WT_CONNECTION_IMPL *conn, WT_ASYNC_OP_IMPL *op, uint32_t id) { - WT_ASYNC_OP *asyncop; - - asyncop = (WT_ASYNC_OP *)op; - asyncop->connection = (WT_CONNECTION *)conn; - asyncop->key_format = asyncop->value_format = NULL; - asyncop->c.key_format = asyncop->c.value_format = NULL; - asyncop->get_key = __async_get_key; - asyncop->get_value = __async_get_value; - asyncop->set_key = __async_set_key; - asyncop->set_value = __async_set_value; - asyncop->search = __async_search; - asyncop->insert = __async_insert; - asyncop->update = __async_update; - asyncop->remove = __async_remove; - asyncop->compact = __async_compact; - asyncop->get_id = __async_get_id; - asyncop->get_type = __async_get_type; - /* - * The cursor needs to have the get/set key/value functions initialized. - * It also needs the key/value related fields set up. - */ - asyncop->c.get_key = __wt_cursor_get_key; - asyncop->c.set_key = __wt_cursor_set_key; - asyncop->c.get_value = __wt_cursor_get_value; - asyncop->c.set_value = __wt_cursor_set_value; - asyncop->c.recno = WT_RECNO_OOB; - memset(asyncop->c.raw_recno_buf, 0, sizeof(asyncop->c.raw_recno_buf)); - memset(&asyncop->c.key, 0, sizeof(asyncop->c.key)); - memset(&asyncop->c.value, 0, sizeof(asyncop->c.value)); - asyncop->c.session = (WT_SESSION *)conn->default_session; - asyncop->c.saved_err = 0; - asyncop->c.flags = 0; - - op->internal_id = id; - op->state = WT_ASYNCOP_FREE; + WT_ASYNC_OP *asyncop; + + asyncop = (WT_ASYNC_OP *)op; + asyncop->connection = (WT_CONNECTION *)conn; + asyncop->key_format = asyncop->value_format = NULL; + asyncop->c.key_format = asyncop->c.value_format = NULL; + asyncop->get_key = __async_get_key; + asyncop->get_value = __async_get_value; + asyncop->set_key = __async_set_key; + asyncop->set_value = __async_set_value; + asyncop->search = __async_search; + asyncop->insert = __async_insert; + asyncop->update = __async_update; + asyncop->remove = __async_remove; + asyncop->compact = __async_compact; + asyncop->get_id = __async_get_id; + asyncop->get_type = __async_get_type; + /* + * The cursor needs to have the get/set key/value functions initialized. It also needs the + * key/value related fields set up. + */ + asyncop->c.get_key = __wt_cursor_get_key; + asyncop->c.set_key = __wt_cursor_set_key; + asyncop->c.get_value = __wt_cursor_get_value; + asyncop->c.set_value = __wt_cursor_set_value; + asyncop->c.recno = WT_RECNO_OOB; + memset(asyncop->c.raw_recno_buf, 0, sizeof(asyncop->c.raw_recno_buf)); + memset(&asyncop->c.key, 0, sizeof(asyncop->c.key)); + memset(&asyncop->c.value, 0, sizeof(asyncop->c.value)); + asyncop->c.session = (WT_SESSION *)conn->default_session; + asyncop->c.saved_err = 0; + asyncop->c.flags = 0; + + op->internal_id = id; + op->state = WT_ASYNCOP_FREE; } /* * __wt_async_op_enqueue -- - * Enqueue an operation onto the work queue. + * Enqueue an operation onto the work queue. */ int __wt_async_op_enqueue(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op) { - WT_ASYNC *async; - WT_CONNECTION_IMPL *conn; - uint64_t cur_head, cur_tail, my_alloc, my_slot; -#ifdef HAVE_DIAGNOSTIC - WT_ASYNC_OP_IMPL *my_op; + WT_ASYNC *async; + WT_CONNECTION_IMPL *conn; + uint64_t cur_head, cur_tail, my_alloc, my_slot; +#ifdef HAVE_DIAGNOSTIC + WT_ASYNC_OP_IMPL *my_op; #endif - conn = S2C(session); - async = conn->async; - - /* - * If an application re-uses a WT_ASYNC_OP, we end up here with an - * invalid object. - */ - if (op->state != WT_ASYNCOP_READY) - WT_RET_MSG(session, EINVAL, - "application error: WT_ASYNC_OP already in use"); - - /* - * Enqueue op at the tail of the work queue. - * We get our slot in the ring buffer to use. - */ - my_alloc = __wt_atomic_add64(&async->alloc_head, 1); - my_slot = my_alloc % async->async_qsize; - - /* - * Make sure we haven't wrapped around the queue. - * If so, wait for the tail to advance off this slot. - */ - WT_ORDERED_READ(cur_tail, async->tail_slot); - while (cur_tail == my_slot) { - __wt_yield(); - WT_ORDERED_READ(cur_tail, async->tail_slot); - } - -#ifdef HAVE_DIAGNOSTIC - WT_ORDERED_READ(my_op, async->async_queue[my_slot]); - if (my_op != NULL) - return (__wt_panic(session)); + conn = S2C(session); + async = conn->async; + + /* + * If an application re-uses a WT_ASYNC_OP, we end up here with an invalid object. + */ + if (op->state != WT_ASYNCOP_READY) + WT_RET_MSG(session, EINVAL, "application error: WT_ASYNC_OP already in use"); + + /* + * Enqueue op at the tail of the work queue. We get our slot in the ring buffer to use. + */ + my_alloc = __wt_atomic_add64(&async->alloc_head, 1); + my_slot = my_alloc % async->async_qsize; + + /* + * Make sure we haven't wrapped around the queue. If so, wait for the tail to advance off this + * slot. + */ + WT_ORDERED_READ(cur_tail, async->tail_slot); + while (cur_tail == my_slot) { + __wt_yield(); + WT_ORDERED_READ(cur_tail, async->tail_slot); + } + +#ifdef HAVE_DIAGNOSTIC + WT_ORDERED_READ(my_op, async->async_queue[my_slot]); + if (my_op != NULL) + return (__wt_panic(session)); #endif - WT_PUBLISH(async->async_queue[my_slot], op); - op->state = WT_ASYNCOP_ENQUEUED; - if (__wt_atomic_add32(&async->cur_queue, 1) > async->max_queue) - WT_PUBLISH(async->max_queue, async->cur_queue); - /* - * Multiple threads may be adding ops to the queue. We need to wait - * our turn to make our slot visible to workers. - */ - WT_ORDERED_READ(cur_head, async->head); - while (cur_head != (my_alloc - 1)) { - __wt_yield(); - WT_ORDERED_READ(cur_head, async->head); - } - WT_PUBLISH(async->head, my_alloc); - return (0); + WT_PUBLISH(async->async_queue[my_slot], op); + op->state = WT_ASYNCOP_ENQUEUED; + if (__wt_atomic_add32(&async->cur_queue, 1) > async->max_queue) + WT_PUBLISH(async->max_queue, async->cur_queue); + /* + * Multiple threads may be adding ops to the queue. We need to wait our turn to make our slot + * visible to workers. + */ + WT_ORDERED_READ(cur_head, async->head); + while (cur_head != (my_alloc - 1)) { + __wt_yield(); + WT_ORDERED_READ(cur_head, async->head); + } + WT_PUBLISH(async->head, my_alloc); + return (0); } /* * __wt_async_op_init -- - * Initialize all the op handles. + * Initialize all the op handles. */ int __wt_async_op_init(WT_SESSION_IMPL *session) { - WT_ASYNC *async; - WT_ASYNC_OP_IMPL *op; - WT_CONNECTION_IMPL *conn; - WT_DECL_RET; - uint32_t i; - - conn = S2C(session); - async = conn->async; - - /* - * Initialize the flush op structure. - */ - __async_op_init(conn, &async->flush_op, OPS_INVALID_INDEX); - - /* - * Allocate and initialize the work queue. This is sized so that - * the ring buffer is known to be big enough such that the head - * can never overlap the tail. Include extra for the flush op. - */ - async->async_qsize = conn->async_size + 2; - WT_RET(__wt_calloc_def( - session, async->async_qsize, &async->async_queue)); - /* - * Allocate and initialize all the user ops. - */ - WT_ERR(__wt_calloc_def(session, conn->async_size, &async->async_ops)); - for (i = 0; i < conn->async_size; i++) { - op = &async->async_ops[i]; - __async_op_init(conn, op, i); - } - return (0); - -err: __wt_free(session, async->async_ops); - __wt_free(session, async->async_queue); - return (ret); + WT_ASYNC *async; + WT_ASYNC_OP_IMPL *op; + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + uint32_t i; + + conn = S2C(session); + async = conn->async; + + /* + * Initialize the flush op structure. + */ + __async_op_init(conn, &async->flush_op, OPS_INVALID_INDEX); + + /* + * Allocate and initialize the work queue. This is sized so that the ring buffer is known to be + * big enough such that the head can never overlap the tail. Include extra for the flush op. + */ + async->async_qsize = conn->async_size + 2; + WT_RET(__wt_calloc_def(session, async->async_qsize, &async->async_queue)); + /* + * Allocate and initialize all the user ops. + */ + WT_ERR(__wt_calloc_def(session, conn->async_size, &async->async_ops)); + for (i = 0; i < conn->async_size; i++) { + op = &async->async_ops[i]; + __async_op_init(conn, op, i); + } + return (0); + +err: + __wt_free(session, async->async_ops); + __wt_free(session, async->async_queue); + return (ret); } diff --git a/src/third_party/wiredtiger/src/async/async_worker.c b/src/third_party/wiredtiger/src/async/async_worker.c index abb32c5ecd2..8fdd4cba4b4 100644 --- a/src/third_party/wiredtiger/src/async/async_worker.c +++ b/src/third_party/wiredtiger/src/async/async_worker.c @@ -10,344 +10,314 @@ /* * __async_op_dequeue -- - * Wait for work to be available. Then atomically take it off - * the work queue. + * Wait for work to be available. Then atomically take it off the work queue. */ static int -__async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session, - WT_ASYNC_OP_IMPL **op) +__async_op_dequeue(WT_CONNECTION_IMPL *conn, WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL **op) { - WT_ASYNC *async; - uint64_t cur_tail, last_consume, my_consume, my_slot, prev_slot; - uint64_t sleep_usec; - uint32_t tries; + WT_ASYNC *async; + uint64_t cur_tail, last_consume, my_consume, my_slot, prev_slot; + uint64_t sleep_usec; + uint32_t tries; - *op = NULL; + *op = NULL; - async = conn->async; - /* - * Wait for work to do. Work is available when async->head moves. - * Then grab the slot containing the work. If we lose, try again. - */ + async = conn->async; +/* + * Wait for work to do. Work is available when async->head moves. Then grab the slot containing the + * work. If we lose, try again. + */ retry: - tries = 0; - sleep_usec = 100; - WT_ORDERED_READ(last_consume, async->alloc_tail); - /* - * We stay in this loop until there is work to do. - */ - while (last_consume == async->head && - async->flush_state != WT_ASYNC_FLUSHING) { - WT_STAT_CONN_INCR(session, async_nowork); - if (++tries < MAX_ASYNC_YIELD) - /* - * Initially when we find no work, allow other - * threads to run. - */ - __wt_yield(); - else { - /* - * If we haven't found work in a while, start sleeping - * to wait for work to arrive instead of spinning. - */ - __wt_sleep(0, sleep_usec); - sleep_usec = WT_MIN(sleep_usec * 2, - MAX_ASYNC_SLEEP_USECS); - } - if (!F_ISSET(session, WT_SESSION_SERVER_ASYNC)) - return (0); - if (!F_ISSET(conn, WT_CONN_SERVER_ASYNC)) - return (0); - WT_ORDERED_READ(last_consume, async->alloc_tail); - } - if (async->flush_state == WT_ASYNC_FLUSHING) - return (0); - /* - * Try to increment the tail to claim this slot. If we lose - * a race, try again. - */ - my_consume = last_consume + 1; - if (!__wt_atomic_cas64(&async->alloc_tail, last_consume, my_consume)) - goto retry; - /* - * This item of work is ours to process. Clear it out of the - * queue and return. - */ - my_slot = my_consume % async->async_qsize; - prev_slot = last_consume % async->async_qsize; - *op = async->async_queue[my_slot]; - async->async_queue[my_slot] = NULL; + tries = 0; + sleep_usec = 100; + WT_ORDERED_READ(last_consume, async->alloc_tail); + /* + * We stay in this loop until there is work to do. + */ + while (last_consume == async->head && async->flush_state != WT_ASYNC_FLUSHING) { + WT_STAT_CONN_INCR(session, async_nowork); + if (++tries < MAX_ASYNC_YIELD) + /* + * Initially when we find no work, allow other threads to run. + */ + __wt_yield(); + else { + /* + * If we haven't found work in a while, start sleeping to wait for work to arrive + * instead of spinning. + */ + __wt_sleep(0, sleep_usec); + sleep_usec = WT_MIN(sleep_usec * 2, MAX_ASYNC_SLEEP_USECS); + } + if (!F_ISSET(session, WT_SESSION_SERVER_ASYNC)) + return (0); + if (!F_ISSET(conn, WT_CONN_SERVER_ASYNC)) + return (0); + WT_ORDERED_READ(last_consume, async->alloc_tail); + } + if (async->flush_state == WT_ASYNC_FLUSHING) + return (0); + /* + * Try to increment the tail to claim this slot. If we lose a race, try again. + */ + my_consume = last_consume + 1; + if (!__wt_atomic_cas64(&async->alloc_tail, last_consume, my_consume)) + goto retry; + /* + * This item of work is ours to process. Clear it out of the queue and return. + */ + my_slot = my_consume % async->async_qsize; + prev_slot = last_consume % async->async_qsize; + *op = async->async_queue[my_slot]; + async->async_queue[my_slot] = NULL; - WT_ASSERT(session, async->cur_queue > 0); - WT_ASSERT(session, *op != NULL); - WT_ASSERT(session, (*op)->state == WT_ASYNCOP_ENQUEUED); - (void)__wt_atomic_sub32(&async->cur_queue, 1); - (*op)->state = WT_ASYNCOP_WORKING; + WT_ASSERT(session, async->cur_queue > 0); + WT_ASSERT(session, *op != NULL); + WT_ASSERT(session, (*op)->state == WT_ASYNCOP_ENQUEUED); + (void)__wt_atomic_sub32(&async->cur_queue, 1); + (*op)->state = WT_ASYNCOP_WORKING; - if (*op == &async->flush_op) - /* - * We're the worker to take the flush op off the queue. - */ - WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSHING); - WT_ORDERED_READ(cur_tail, async->tail_slot); - while (cur_tail != prev_slot) { - __wt_yield(); - WT_ORDERED_READ(cur_tail, async->tail_slot); - } - WT_PUBLISH(async->tail_slot, my_slot); - return (0); + if (*op == &async->flush_op) + /* + * We're the worker to take the flush op off the queue. + */ + WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSHING); + WT_ORDERED_READ(cur_tail, async->tail_slot); + while (cur_tail != prev_slot) { + __wt_yield(); + WT_ORDERED_READ(cur_tail, async->tail_slot); + } + WT_PUBLISH(async->tail_slot, my_slot); + return (0); } /* * __async_flush_wait -- - * Wait for the final worker to finish flushing. + * Wait for the final worker to finish flushing. */ static void __async_flush_wait(WT_SESSION_IMPL *session, WT_ASYNC *async, uint64_t my_gen) { - while (async->flush_state == WT_ASYNC_FLUSHING && - async->flush_gen == my_gen) { - __wt_cond_wait(session, async->flush_cond, 10000, NULL); - WT_BARRIER(); - } + while (async->flush_state == WT_ASYNC_FLUSHING && async->flush_gen == my_gen) { + __wt_cond_wait(session, async->flush_cond, 10000, NULL); + WT_BARRIER(); + } } /* * __async_worker_cursor -- - * Return a cursor for the worker thread to use for its op. - * The worker thread caches cursors. So first search for one - * with the same config/uri signature. Otherwise open a new - * cursor and cache it. + * Return a cursor for the worker thread to use for its op. The worker thread caches cursors. So + * first search for one with the same config/uri signature. Otherwise open a new cursor and + * cache it. */ static int -__async_worker_cursor(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, - WT_ASYNC_WORKER_STATE *worker, WT_CURSOR **cursorp) +__async_worker_cursor(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, WT_ASYNC_WORKER_STATE *worker, + WT_CURSOR **cursorp) { - WT_ASYNC_CURSOR *ac; - WT_CURSOR *c; - WT_DECL_RET; - WT_SESSION *wt_session; + WT_ASYNC_CURSOR *ac; + WT_CURSOR *c; + WT_DECL_RET; + WT_SESSION *wt_session; - *cursorp = NULL; + *cursorp = NULL; - wt_session = (WT_SESSION *)session; - /* - * Compact doesn't need a cursor. - */ - if (op->optype == WT_AOP_COMPACT) - return (0); - WT_ASSERT(session, op->format != NULL); - TAILQ_FOREACH(ac, &worker->cursorqh, q) { - if (op->format->cfg_hash == ac->cfg_hash && - op->format->uri_hash == ac->uri_hash) { - /* - * If one of our cached cursors has a matching - * signature, use it and we're done. - */ - *cursorp = ac->c; - return (0); - } - } - /* - * We didn't find one in our cache. Open one and cache it. - * Insert it at the head expecting LRU usage. - */ - WT_RET(__wt_calloc_one(session, &ac)); - WT_ERR(wt_session->open_cursor( - wt_session, op->format->uri, NULL, op->format->config, &c)); - ac->cfg_hash = op->format->cfg_hash; - ac->uri_hash = op->format->uri_hash; - ac->c = c; - TAILQ_INSERT_HEAD(&worker->cursorqh, ac, q); - worker->num_cursors++; - *cursorp = c; - return (0); + wt_session = (WT_SESSION *)session; + /* + * Compact doesn't need a cursor. + */ + if (op->optype == WT_AOP_COMPACT) + return (0); + WT_ASSERT(session, op->format != NULL); + TAILQ_FOREACH (ac, &worker->cursorqh, q) { + if (op->format->cfg_hash == ac->cfg_hash && op->format->uri_hash == ac->uri_hash) { + /* + * If one of our cached cursors has a matching signature, use it and we're done. + */ + *cursorp = ac->c; + return (0); + } + } + /* + * We didn't find one in our cache. Open one and cache it. Insert it at the head expecting LRU + * usage. + */ + WT_RET(__wt_calloc_one(session, &ac)); + WT_ERR(wt_session->open_cursor(wt_session, op->format->uri, NULL, op->format->config, &c)); + ac->cfg_hash = op->format->cfg_hash; + ac->uri_hash = op->format->uri_hash; + ac->c = c; + TAILQ_INSERT_HEAD(&worker->cursorqh, ac, q); + worker->num_cursors++; + *cursorp = c; + return (0); -err: __wt_free(session, ac); - return (ret); +err: + __wt_free(session, ac); + return (ret); } /* * __async_worker_execop -- - * A worker thread executes an individual op with a cursor. + * A worker thread executes an individual op with a cursor. */ static int -__async_worker_execop(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, - WT_CURSOR *cursor) +__async_worker_execop(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, WT_CURSOR *cursor) { - WT_ASYNC_OP *asyncop; - WT_ITEM val; - WT_SESSION *wt_session; + WT_ASYNC_OP *asyncop; + WT_ITEM val; + WT_SESSION *wt_session; - asyncop = (WT_ASYNC_OP *)op; - /* - * Set the key of our local cursor from the async op handle. - * If needed, also set the value. - */ - if (op->optype != WT_AOP_COMPACT) { - WT_RET(__wt_cursor_get_raw_key(&asyncop->c, &val)); - __wt_cursor_set_raw_key(cursor, &val); - if (op->optype == WT_AOP_INSERT || - op->optype == WT_AOP_UPDATE) { - WT_RET(__wt_cursor_get_raw_value(&asyncop->c, &val)); - __wt_cursor_set_raw_value(cursor, &val); - } - } - switch (op->optype) { - case WT_AOP_COMPACT: - wt_session = &session->iface; - WT_RET(wt_session->compact(wt_session, - op->format->uri, op->format->config)); - break; - case WT_AOP_INSERT: - WT_RET(cursor->insert(cursor)); - break; - case WT_AOP_UPDATE: - WT_RET(cursor->update(cursor)); - break; - case WT_AOP_REMOVE: - WT_RET(cursor->remove(cursor)); - break; - case WT_AOP_SEARCH: - WT_RET(cursor->search(cursor)); - /* - * Get the value from the cursor and put it into - * the op for op->get_value. - */ - WT_RET(__wt_cursor_get_raw_value(cursor, &val)); - __wt_cursor_set_raw_value(&asyncop->c, &val); - break; - case WT_AOP_NONE: - WT_RET_MSG(session, EINVAL, - "Unknown async optype %d", (int)op->optype); - } - return (0); + asyncop = (WT_ASYNC_OP *)op; + /* + * Set the key of our local cursor from the async op handle. If needed, also set the value. + */ + if (op->optype != WT_AOP_COMPACT) { + WT_RET(__wt_cursor_get_raw_key(&asyncop->c, &val)); + __wt_cursor_set_raw_key(cursor, &val); + if (op->optype == WT_AOP_INSERT || op->optype == WT_AOP_UPDATE) { + WT_RET(__wt_cursor_get_raw_value(&asyncop->c, &val)); + __wt_cursor_set_raw_value(cursor, &val); + } + } + switch (op->optype) { + case WT_AOP_COMPACT: + wt_session = &session->iface; + WT_RET(wt_session->compact(wt_session, op->format->uri, op->format->config)); + break; + case WT_AOP_INSERT: + WT_RET(cursor->insert(cursor)); + break; + case WT_AOP_UPDATE: + WT_RET(cursor->update(cursor)); + break; + case WT_AOP_REMOVE: + WT_RET(cursor->remove(cursor)); + break; + case WT_AOP_SEARCH: + WT_RET(cursor->search(cursor)); + /* + * Get the value from the cursor and put it into the op for op->get_value. + */ + WT_RET(__wt_cursor_get_raw_value(cursor, &val)); + __wt_cursor_set_raw_value(&asyncop->c, &val); + break; + case WT_AOP_NONE: + WT_RET_MSG(session, EINVAL, "Unknown async optype %d", (int)op->optype); + } + return (0); } /* * __async_worker_op -- - * A worker thread handles an individual op. + * A worker thread handles an individual op. */ static int -__async_worker_op(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, - WT_ASYNC_WORKER_STATE *worker) +__async_worker_op(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op, WT_ASYNC_WORKER_STATE *worker) { - WT_ASYNC_OP *asyncop; - WT_CURSOR *cursor; - WT_DECL_RET; - WT_SESSION *wt_session; - int cb_ret; + WT_ASYNC_OP *asyncop; + WT_CURSOR *cursor; + WT_DECL_RET; + WT_SESSION *wt_session; + int cb_ret; - asyncop = (WT_ASYNC_OP *)op; + asyncop = (WT_ASYNC_OP *)op; - cb_ret = 0; + cb_ret = 0; - wt_session = &session->iface; - if (op->optype != WT_AOP_COMPACT) - WT_RET(wt_session->begin_transaction(wt_session, NULL)); - WT_ASSERT(session, op->state == WT_ASYNCOP_WORKING); - WT_RET(__async_worker_cursor(session, op, worker, &cursor)); - /* - * Perform op and invoke the callback. - */ - ret = __async_worker_execop(session, op, cursor); - if (op->cb != NULL && op->cb->notify != NULL) - cb_ret = op->cb->notify(op->cb, asyncop, ret, 0); + wt_session = &session->iface; + if (op->optype != WT_AOP_COMPACT) + WT_RET(wt_session->begin_transaction(wt_session, NULL)); + WT_ASSERT(session, op->state == WT_ASYNCOP_WORKING); + WT_RET(__async_worker_cursor(session, op, worker, &cursor)); + /* + * Perform op and invoke the callback. + */ + ret = __async_worker_execop(session, op, cursor); + if (op->cb != NULL && op->cb->notify != NULL) + cb_ret = op->cb->notify(op->cb, asyncop, ret, 0); - /* - * If the operation succeeded and the user callback returned - * zero then commit. Otherwise rollback. - */ - if (op->optype != WT_AOP_COMPACT) { - if ((ret == 0 || ret == WT_NOTFOUND) && cb_ret == 0) - WT_TRET(wt_session->commit_transaction( - wt_session, NULL)); - else - WT_TRET(wt_session->rollback_transaction( - wt_session, NULL)); - F_CLR(&asyncop->c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); - WT_TRET(cursor->reset(cursor)); - } - /* - * After the callback returns, and the transaction resolved release - * the op back to the free pool. We do this regardless of - * success or failure. - */ - WT_PUBLISH(op->state, WT_ASYNCOP_FREE); - return (ret); + /* + * If the operation succeeded and the user callback returned zero then commit. Otherwise + * rollback. + */ + if (op->optype != WT_AOP_COMPACT) { + if ((ret == 0 || ret == WT_NOTFOUND) && cb_ret == 0) + WT_TRET(wt_session->commit_transaction(wt_session, NULL)); + else + WT_TRET(wt_session->rollback_transaction(wt_session, NULL)); + F_CLR(&asyncop->c, WT_CURSTD_KEY_SET | WT_CURSTD_VALUE_SET); + WT_TRET(cursor->reset(cursor)); + } + /* + * After the callback returns, and the transaction resolved release the op back to the free + * pool. We do this regardless of success or failure. + */ + WT_PUBLISH(op->state, WT_ASYNCOP_FREE); + return (ret); } /* * __wt_async_worker -- - * The async worker threads. + * The async worker threads. */ WT_THREAD_RET __wt_async_worker(void *arg) { - WT_ASYNC *async; - WT_ASYNC_CURSOR *ac; - WT_ASYNC_OP_IMPL *op; - WT_ASYNC_WORKER_STATE worker; - WT_CONNECTION_IMPL *conn; - WT_DECL_RET; - WT_SESSION_IMPL *session; - uint64_t flush_gen; + WT_ASYNC *async; + WT_ASYNC_CURSOR *ac; + WT_ASYNC_OP_IMPL *op; + WT_ASYNC_WORKER_STATE worker; + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + WT_SESSION_IMPL *session; + uint64_t flush_gen; - session = arg; - conn = S2C(session); - async = conn->async; + session = arg; + conn = S2C(session); + async = conn->async; - worker.num_cursors = 0; - TAILQ_INIT(&worker.cursorqh); - while (F_ISSET(conn, WT_CONN_SERVER_ASYNC) && - F_ISSET(session, WT_SESSION_SERVER_ASYNC)) { - WT_ERR(__async_op_dequeue(conn, session, &op)); - if (op != NULL && op != &async->flush_op) { - /* - * Operation failure doesn't cause the worker thread to - * exit. - */ - (void)__async_worker_op(session, op, &worker); - } else if (async->flush_state == WT_ASYNC_FLUSHING) { - /* - * Worker flushing going on. Last worker to the party - * needs to clear the FLUSHING flag and signal the cond. - * If FLUSHING is going on, we do not take anything off - * the queue. - */ - WT_ORDERED_READ(flush_gen, async->flush_gen); - if (__wt_atomic_add32(&async->flush_count, 1) == - conn->async_workers) { - /* - * We're last. All workers accounted for so - * signal the condition and clear the FLUSHING - * flag to release the other worker threads. - * Set the FLUSH_COMPLETE flag so that the - * caller can return to the application. - */ - WT_PUBLISH(async->flush_state, - WT_ASYNC_FLUSH_COMPLETE); - __wt_cond_signal(session, async->flush_cond); - } else - /* - * We need to wait for the last worker to - * signal the condition. - */ - __async_flush_wait(session, async, flush_gen); - } - } + worker.num_cursors = 0; + TAILQ_INIT(&worker.cursorqh); + while (F_ISSET(conn, WT_CONN_SERVER_ASYNC) && F_ISSET(session, WT_SESSION_SERVER_ASYNC)) { + WT_ERR(__async_op_dequeue(conn, session, &op)); + if (op != NULL && op != &async->flush_op) { + /* + * Operation failure doesn't cause the worker thread to exit. + */ + (void)__async_worker_op(session, op, &worker); + } else if (async->flush_state == WT_ASYNC_FLUSHING) { + /* + * Worker flushing going on. Last worker to the party needs to clear the FLUSHING flag + * and signal the cond. If FLUSHING is going on, we do not take anything off the queue. + */ + WT_ORDERED_READ(flush_gen, async->flush_gen); + if (__wt_atomic_add32(&async->flush_count, 1) == conn->async_workers) { + /* + * We're last. All workers accounted for so signal the condition and clear the + * FLUSHING flag to release the other worker threads. Set the FLUSH_COMPLETE flag so + * that the caller can return to the application. + */ + WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSH_COMPLETE); + __wt_cond_signal(session, async->flush_cond); + } else + /* + * We need to wait for the last worker to signal the condition. + */ + __async_flush_wait(session, async, flush_gen); + } + } - if (0) { -err: WT_PANIC_MSG(session, ret, "async worker error"); - } - /* - * Worker thread cleanup, close our cached cursors and free all the - * WT_ASYNC_CURSOR structures. - */ - while ((ac = TAILQ_FIRST(&worker.cursorqh)) != NULL) { - TAILQ_REMOVE(&worker.cursorqh, ac, q); - WT_TRET(ac->c->close(ac->c)); - __wt_free(session, ac); - } - return (WT_THREAD_RET_VALUE); + if (0) { +err: + WT_PANIC_MSG(session, ret, "async worker error"); + } + /* + * Worker thread cleanup, close our cached cursors and free all the WT_ASYNC_CURSOR structures. + */ + while ((ac = TAILQ_FIRST(&worker.cursorqh)) != NULL) { + TAILQ_REMOVE(&worker.cursorqh, ac, q); + WT_TRET(ac->c->close(ac->c)); + __wt_free(session, ac); + } + return (WT_THREAD_RET_VALUE); } |