summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/src/async/async_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/src/async/async_api.c')
-rw-r--r--src/third_party/wiredtiger/src/async/async_api.c583
1 files changed, 0 insertions, 583 deletions
diff --git a/src/third_party/wiredtiger/src/async/async_api.c b/src/third_party/wiredtiger/src/async/async_api.c
deleted file mode 100644
index 06f8291b08b..00000000000
--- a/src/third_party/wiredtiger/src/async/async_api.c
+++ /dev/null
@@ -1,583 +0,0 @@
-/*-
- * Copyright (c) 2014-2020 MongoDB, Inc.
- * Copyright (c) 2008-2014 WiredTiger, Inc.
- * All rights reserved.
- *
- * See the file LICENSE for redistribution information.
- */
-
-#include "wt_internal.h"
-
-/*
- * __async_get_format --
- * Find or allocate the uri/config/format structure.
- */
-static int
-__async_get_format(
- WT_CONNECTION_IMPL *conn, const char *uri, const char *config, WT_ASYNC_OP_IMPL *op)
-{
- WT_ASYNC *async;
- WT_ASYNC_FORMAT *af;
- WT_CURSOR *c;
- WT_DECL_RET;
- WT_SESSION *wt_session;
- WT_SESSION_IMPL *session;
- uint64_t cfg_hash, uri_hash;
-
- async = conn->async;
- c = NULL;
- op->format = NULL;
-
- if (uri != NULL)
- uri_hash = __wt_hash_city64(uri, strlen(uri));
- else
- uri_hash = 0;
- if (config != NULL)
- cfg_hash = __wt_hash_city64(config, strlen(config));
- else
- cfg_hash = 0;
-
- /*
- * We don't need to hold a lock around this walk. The list is permanent and always valid. We
- * might race an insert and there is a possibility a duplicate entry might be inserted, but that
- * is not harmful.
- */
- TAILQ_FOREACH (af, &async->formatqh, q) {
- if (af->uri_hash == uri_hash && af->cfg_hash == cfg_hash)
- goto setup;
- }
- /*
- * We didn't find one in the cache. Allocate and initialize one. Insert it at the head expecting
- * LRU usage. We need a real session for the cursor.
- */
- WT_RET(__wt_open_internal_session(conn, "async-cursor", true, 0, &session));
- __wt_spin_lock(session, &async->ops_lock);
- WT_ERR(__wt_calloc_one(session, &af));
- WT_ERR(__wt_strdup(session, uri, &af->uri));
- WT_ERR(__wt_strdup(session, config, &af->config));
- af->uri_hash = uri_hash;
- af->cfg_hash = cfg_hash;
- /*
- * Get the key_format and value_format for this URI and store it in the structure so that
- * async->set_key/value work.
- */
- wt_session = &session->iface;
- WT_ERR(wt_session->open_cursor(wt_session, uri, NULL, NULL, &c));
- WT_ERR(__wt_strdup(session, c->key_format, &af->key_format));
- WT_ERR(__wt_strdup(session, c->value_format, &af->value_format));
- WT_ERR(c->close(c));
- c = NULL;
-
- TAILQ_INSERT_HEAD(&async->formatqh, af, q);
- __wt_spin_unlock(session, &async->ops_lock);
- WT_ERR(__wt_session_close_internal(session));
-
-setup:
- op->format = af;
- /*
- * Copy the pointers for the formats. Items in the async format queue remain there until the
- * connection is closed. We must initialize the format fields in the async_op, which are
- * publicly visible, and its internal cursor used by internal key/value functions.
- */
- op->iface.c.key_format = op->iface.key_format = af->key_format;
- op->iface.c.value_format = op->iface.value_format = af->value_format;
- return (0);
-
-err:
- if (c != NULL)
- WT_TRET(c->close(c));
- __wt_free(session, af->uri);
- __wt_free(session, af->config);
- __wt_free(session, af->key_format);
- __wt_free(session, af->value_format);
- __wt_free(session, af);
- return (ret);
-}
-
-/*
- * __async_new_op_alloc --
- * Find and allocate the next available async op handle.
- */
-static int
-__async_new_op_alloc(
- WT_SESSION_IMPL *session, const char *uri, const char *config, WT_ASYNC_OP_IMPL **opp)
-{
- WT_ASYNC *async;
- WT_ASYNC_OP_IMPL *op;
- WT_CONNECTION_IMPL *conn;
- uint32_t i, save_i, view;
-
- *opp = NULL;
-
- conn = S2C(session);
- async = conn->async;
- WT_STAT_CONN_INCR(session, async_op_alloc);
-
-retry:
- op = NULL;
- WT_ORDERED_READ(save_i, async->ops_index);
- /*
- * Look after the last one allocated for a free one. We'd expect ops to be freed mostly FIFO so
- * we should quickly find one.
- */
- for (view = 1, i = save_i; i < conn->async_size; i++, view++) {
- op = &async->async_ops[i];
- if (op->state == WT_ASYNCOP_FREE)
- break;
- }
-
- /*
- * Loop around back to the beginning if we need to.
- */
- if (op == NULL || op->state != WT_ASYNCOP_FREE)
- for (i = 0; i < save_i; i++, view++) {
- op = &async->async_ops[i];
- if (op->state == WT_ASYNCOP_FREE)
- break;
- }
-
- /*
- * We still haven't found one. Return an error.
- */
- if (op == NULL || op->state != WT_ASYNCOP_FREE) {
- WT_STAT_CONN_INCR(session, async_full);
- return (__wt_set_return(session, EBUSY));
- }
- /*
- * Set the state of this op handle as READY for the user to use. If we can set the state then
- * the op entry is ours. Start the next search at the next entry after this one.
- */
- if (!__wt_atomic_cas32(&op->state, WT_ASYNCOP_FREE, WT_ASYNCOP_READY)) {
- WT_STAT_CONN_INCR(session, async_alloc_race);
- goto retry;
- }
- WT_STAT_CONN_INCRV(session, async_alloc_view, view);
- WT_RET(__async_get_format(conn, uri, config, op));
- op->unique_id = __wt_atomic_add64(&async->op_id, 1);
- op->optype = WT_AOP_NONE;
- async->ops_index = (i + 1) % conn->async_size;
- *opp = op;
- return (0);
-}
-
-/*
- * __async_config --
- * Parse and setup the async API options.
- */
-static int
-__async_config(WT_SESSION_IMPL *session, WT_CONNECTION_IMPL *conn, const char **cfg, bool *runp)
-{
- WT_CONFIG_ITEM cval;
-
- /*
- * The async configuration is off by default.
- */
- WT_RET(__wt_config_gets(session, cfg, "async.enabled", &cval));
- *runp = cval.val != 0;
-
- /*
- * Even if async is turned off, we want to parse and store the default values so that
- * reconfigure can just enable them.
- *
- * Bound the minimum maximum operations at 10.
- */
- WT_RET(__wt_config_gets(session, cfg, "async.ops_max", &cval));
- conn->async_size = (uint32_t)WT_MAX(cval.val, 10);
-
- WT_RET(__wt_config_gets(session, cfg, "async.threads", &cval));
- conn->async_workers = (uint32_t)cval.val;
- /* Sanity check that api_data.py is in sync with async.h */
- WT_ASSERT(session, conn->async_workers <= WT_ASYNC_MAX_WORKERS);
-
- return (0);
-}
-
-/*
- * __wt_async_stats_update --
- * Update the async stats for return to the application.
- */
-void
-__wt_async_stats_update(WT_SESSION_IMPL *session)
-{
- WT_ASYNC *async;
- WT_CONNECTION_IMPL *conn;
- WT_CONNECTION_STATS **stats;
-
- conn = S2C(session);
- async = conn->async;
- if (async == NULL)
- return;
- stats = conn->stats;
- WT_STAT_SET(session, stats, async_cur_queue, async->cur_queue);
- WT_STAT_SET(session, stats, async_max_queue, async->max_queue);
-}
-
-/*
- * __async_start --
- * Start the async subsystem. All configuration processing has already been done by the caller.
- */
-static int
-__async_start(WT_SESSION_IMPL *session)
-{
- WT_ASYNC *async;
- WT_CONNECTION_IMPL *conn;
- uint32_t i, session_flags;
-
- conn = S2C(session);
- conn->async_cfg = true;
- /*
- * Async is on, allocate the WT_ASYNC structure and initialize the ops.
- */
- WT_RET(__wt_calloc_one(session, &conn->async));
- async = conn->async;
- TAILQ_INIT(&async->formatqh);
- WT_RET(__wt_spin_init(session, &async->ops_lock, "ops"));
- WT_RET(__wt_cond_alloc(session, "async flush", &async->flush_cond));
- WT_RET(__wt_async_op_init(session));
-
- /*
- * Start up the worker threads.
- */
- F_SET(conn, WT_CONN_SERVER_ASYNC);
- for (i = 0; i < conn->async_workers; i++) {
- /*
- * Each worker has its own session. We set both a general server flag in the connection and
- * an individual flag in the session. The user may reconfigure the number of workers and we
- * may want to selectively stop some workers while leaving the rest running.
- */
- session_flags = WT_SESSION_SERVER_ASYNC;
- WT_RET(__wt_open_internal_session(
- conn, "async-worker", true, session_flags, &async->worker_sessions[i]));
- }
- for (i = 0; i < conn->async_workers; i++) {
- /*
- * Start the threads.
- */
- WT_RET(__wt_thread_create(
- session, &async->worker_tids[i], __wt_async_worker, async->worker_sessions[i]));
- }
- __wt_async_stats_update(session);
- return (0);
-}
-
-/*
- * __wt_async_create --
- * Start the async subsystem and worker threads.
- */
-int
-__wt_async_create(WT_SESSION_IMPL *session, const char *cfg[])
-{
- WT_CONNECTION_IMPL *conn;
- bool run;
-
- conn = S2C(session);
-
- /* Handle configuration. */
- run = false;
- WT_RET(__async_config(session, conn, cfg, &run));
-
- /* If async is not configured, we're done. */
- if (!run)
- return (0);
- return (__async_start(session));
-}
-
-/*
- * __wt_async_reconfig --
- * Start the async subsystem and worker threads.
- */
-int
-__wt_async_reconfig(WT_SESSION_IMPL *session, const char *cfg[])
-{
- WT_ASYNC *async;
- WT_CONNECTION_IMPL *conn, tmp_conn;
- WT_DECL_RET;
- uint32_t i, session_flags;
- bool run;
-
- conn = S2C(session);
- async = conn->async;
- memset(&tmp_conn, 0, sizeof(tmp_conn));
- tmp_conn.async_cfg = conn->async_cfg;
- tmp_conn.async_workers = conn->async_workers;
- tmp_conn.async_size = conn->async_size;
-
- /* Handle configuration. */
- run = conn->async_cfg;
- WT_RET(__async_config(session, &tmp_conn, cfg, &run));
-
- /*
- * There are some restrictions on the live reconfiguration of async. Unlike other subsystems
- * where we simply destroy anything existing and restart with the new configuration, async is
- * not so easy. If the user is just changing the number of workers, we want to allow the
- * existing op handles and other information to remain in existence. So we must handle various
- * combinations of changes individually.
- *
- * One restriction is that if async is currently on, the user cannot change the number of async
- * op handles available. The user can try but we do nothing with it. However we must allow the
- * ops_max config string so that a user can completely start async via reconfigure.
- */
-
- /*
- * Easy cases:
- * 1. If async is on and the user wants it off, shut it down.
- * 2. If async is off, and the user wants it on, start it.
- * 3. If not a toggle and async is off, we're done.
- */
- if (conn->async_cfg && !run) { /* Case 1 */
- WT_TRET(__wt_async_flush(session));
- ret = __wt_async_destroy(session);
- conn->async_cfg = false;
- return (ret);
- }
- if (!conn->async_cfg && run) /* Case 2 */
- return (__wt_async_create(session, cfg));
- if (!conn->async_cfg) /* Case 3 */
- return (0);
-
- /*
- * Running async worker modification cases:
- * 4. If number of workers didn't change, we're done.
- * 5. If more workers, start new ones.
- * 6. If fewer workers, kill some.
- */
- if (conn->async_workers == tmp_conn.async_workers)
- /* No change in the number of workers. */
- return (0);
- if (conn->async_workers < tmp_conn.async_workers) {
- /* Case 5 */
- /*
- * The worker_sessions array is allocated for the maximum allowed number of workers, so
- * starting more is easy.
- */
- for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
- /*
- * Each worker has its own session.
- */
- session_flags = WT_SESSION_SERVER_ASYNC;
- WT_RET(__wt_open_internal_session(
- conn, "async-worker", true, session_flags, &async->worker_sessions[i]));
- }
- for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
- /*
- * Start the threads.
- */
- WT_RET(__wt_thread_create(
- session, &async->worker_tids[i], __wt_async_worker, async->worker_sessions[i]));
- }
- conn->async_workers = tmp_conn.async_workers;
- }
- if (conn->async_workers > tmp_conn.async_workers) {
- /* Case 6 */
- /*
- * Stopping an individual async worker is the most complex case. We clear the session async
- * flag on the targeted worker thread so that only that thread stops, and the others keep
- * running.
- */
- for (i = conn->async_workers - 1; i >= tmp_conn.async_workers; i--) {
- /*
- * Join any worker we're stopping. After the thread is stopped, close its session.
- */
- WT_ASSERT(session, async->worker_tids[i].created);
- WT_ASSERT(session, async->worker_sessions[i] != NULL);
- F_CLR(async->worker_sessions[i], WT_SESSION_SERVER_ASYNC);
- WT_TRET(__wt_thread_join(session, &async->worker_tids[i]));
- WT_TRET(__wt_session_close_internal(async->worker_sessions[i]));
- async->worker_sessions[i] = NULL;
- }
- conn->async_workers = tmp_conn.async_workers;
- }
-
- return (0);
-}
-
-/*
- * __wt_async_destroy --
- * Destroy the async worker threads and async subsystem.
- */
-int
-__wt_async_destroy(WT_SESSION_IMPL *session)
-{
- WT_ASYNC *async;
- WT_ASYNC_FORMAT *af;
- WT_ASYNC_OP *op;
- WT_CONNECTION_IMPL *conn;
- WT_DECL_RET;
- uint32_t i;
-
- conn = S2C(session);
- async = conn->async;
-
- if (!conn->async_cfg)
- return (0);
-
- F_CLR(conn, WT_CONN_SERVER_ASYNC);
- for (i = 0; i < conn->async_workers; i++)
- WT_TRET(__wt_thread_join(session, &async->worker_tids[i]));
- __wt_cond_destroy(session, &async->flush_cond);
-
- /* Close the server threads' sessions. */
- for (i = 0; i < conn->async_workers; i++)
- if (async->worker_sessions[i] != NULL) {
- WT_TRET(__wt_session_close_internal(async->worker_sessions[i]));
- async->worker_sessions[i] = NULL;
- }
- /* Free any op key/value buffers. */
- for (i = 0; i < conn->async_size; i++) {
- op = (WT_ASYNC_OP *)&async->async_ops[i];
- if (op->c.key.data != NULL)
- __wt_buf_free(session, &op->c.key);
- if (op->c.value.data != NULL)
- __wt_buf_free(session, &op->c.value);
- }
-
- /* Free format resources */
- while ((af = TAILQ_FIRST(&async->formatqh)) != NULL) {
- TAILQ_REMOVE(&async->formatqh, af, q);
- __wt_free(session, af->uri);
- __wt_free(session, af->config);
- __wt_free(session, af->key_format);
- __wt_free(session, af->value_format);
- __wt_free(session, af);
- }
- __wt_free(session, async->async_queue);
- __wt_free(session, async->async_ops);
- __wt_spin_destroy(session, &async->ops_lock);
- __wt_free(session, conn->async);
-
- return (ret);
-}
-
-/*
- * __wt_async_flush --
- * Implementation of the WT_CONN->async_flush method.
- */
-int
-__wt_async_flush(WT_SESSION_IMPL *session)
-{
- WT_ASYNC *async;
- WT_CONNECTION_IMPL *conn;
- uint32_t i, workers;
-
- conn = S2C(session);
- if (!conn->async_cfg)
- return (0);
-
- async = conn->async;
- /*
- * Only add a flush operation if there are workers who can process it. Otherwise we will wait
- * forever.
- */
- workers = 0;
- for (i = 0; i < conn->async_workers; ++i)
- if (async->worker_tids[i].created)
- ++workers;
- if (workers == 0)
- return (0);
-
- WT_STAT_CONN_INCR(session, async_flush);
-/*
- * We have to do several things. First we have to prevent other callers from racing with us so that
- * only one flush is happening at a time. Next we have to wait for the worker threads to notice the
- * flush and indicate that the flush is complete on their side. Then we clear the flush flags and
- * return.
- */
-retry:
- while (async->flush_state != WT_ASYNC_FLUSH_NONE)
- /*
- * We're racing an in-progress flush. We need to wait our turn to start our own. We need to
- * convoy the racing calls because a later call may be waiting for specific enqueued ops to
- * be complete before this returns.
- */
- __wt_sleep(0, 100000);
-
- if (!__wt_atomic_cas32(&async->flush_state, WT_ASYNC_FLUSH_NONE, WT_ASYNC_FLUSH_IN_PROGRESS))
- goto retry;
- /*
- * We're the owner of this flush operation. Set the WT_ASYNC_FLUSH_IN_PROGRESS to block other
- * callers. We're also preventing all worker threads from taking things off the work queue with
- * the lock.
- */
- async->flush_count = 0;
- (void)__wt_atomic_add64(&async->flush_gen, 1);
- WT_ASSERT(session, async->flush_op.state == WT_ASYNCOP_FREE);
- async->flush_op.state = WT_ASYNCOP_READY;
- WT_RET(__wt_async_op_enqueue(session, &async->flush_op));
- while (async->flush_state != WT_ASYNC_FLUSH_COMPLETE)
- __wt_cond_wait(session, async->flush_cond, 100000, NULL);
- /*
- * Flush is done. Clear the flags.
- */
- async->flush_op.state = WT_ASYNCOP_FREE;
- WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSH_NONE);
- return (0);
-}
-
-/*
- * __async_runtime_config --
- * Configure runtime fields at allocation.
- */
-static int
-__async_runtime_config(WT_ASYNC_OP_IMPL *op, const char *cfg[])
-{
- WT_ASYNC_OP *asyncop;
- WT_CONFIG_ITEM cval;
- WT_SESSION_IMPL *session;
-
- session = O2S(op);
- asyncop = (WT_ASYNC_OP *)op;
- WT_RET(__wt_config_gets_def(session, cfg, "append", 0, &cval));
- if (cval.val)
- F_SET(&asyncop->c, WT_CURSTD_APPEND);
- else
- F_CLR(&asyncop->c, WT_CURSTD_APPEND);
- WT_RET(__wt_config_gets_def(session, cfg, "overwrite", 1, &cval));
- if (cval.val)
- F_SET(&asyncop->c, WT_CURSTD_OVERWRITE);
- else
- F_CLR(&asyncop->c, WT_CURSTD_OVERWRITE);
- WT_RET(__wt_config_gets_def(session, cfg, "raw", 0, &cval));
- if (cval.val)
- F_SET(&asyncop->c, WT_CURSTD_RAW);
- else
- F_CLR(&asyncop->c, WT_CURSTD_RAW);
- return (0);
-}
-
-/*
- * __wt_async_new_op --
- * Implementation of the WT_CONN->async_new_op method.
- */
-int
-__wt_async_new_op(WT_SESSION_IMPL *session, const char *uri, const char *config,
- WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP_IMPL **opp)
-{
- WT_ASYNC_OP_IMPL *op;
- WT_CONNECTION_IMPL *conn;
- WT_DECL_RET;
- const char *cfg[] = {S2C(session)->cfg, NULL, NULL};
-
- *opp = NULL;
-
- conn = S2C(session);
- if (!conn->async_cfg)
- WT_RET(__wt_async_create(session, cfg));
- if (!conn->async_cfg)
- WT_RET_MSG(session, ENOTSUP, "Asynchronous operations not configured");
-
- op = NULL;
- WT_ERR(__async_new_op_alloc(session, uri, config, &op));
- cfg[1] = config;
- WT_ERR(__async_runtime_config(op, cfg));
- op->cb = cb;
- *opp = op;
- return (0);
-
-err:
- /*
- * If we get an error after allocating op, set its state to free.
- */
- if (op != NULL)
- op->state = WT_ASYNCOP_FREE;
- return (ret);
-}