summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSusan LoVerso <sue@wiredtiger.com>2014-05-13 12:34:49 -0400
committerSusan LoVerso <sue@wiredtiger.com>2014-05-13 12:34:49 -0400
commitf17eaaac762bb3ba50a0d789ffde131c7cf654ad (patch)
tree446dd33bd63c89251517fb427a203373324620a8
parent4a2583d492a2a923a895b897293689fae656a1e6 (diff)
downloadmongo-f17eaaac762bb3ba50a0d789ffde131c7cf654ad.tar.gz
Allow async to be reconfigured. #1002
-rw-r--r--bench/wtperf/wtperf.c13
-rw-r--r--dist/flags.py2
-rw-r--r--src/async/async_api.c168
-rw-r--r--src/async/async_worker.c7
-rw-r--r--src/conn/conn_api.c1
-rw-r--r--src/include/extern.h1
-rw-r--r--src/include/flags.h30
7 files changed, 188 insertions, 34 deletions
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c
index 2a4bf2cc618..d56b5452ad8 100644
--- a/bench/wtperf/wtperf.c
+++ b/bench/wtperf/wtperf.c
@@ -1323,6 +1323,19 @@ retry: if ((ret = cfg->conn->async_new_op(cfg->conn,
lprintf(cfg, ret, 0, "Re-opening the connection failed");
return (ret);
}
+ /*
+ * If we started async threads only for the purposes of compact,
+ * then turn it off before starting the workload so that those extra
+ * threads looking for work that will never arrive don't affect
+ * performance.
+ */
+ if (cfg->compact && cfg->use_asyncops == 0) {
+ if ((ret = cfg->conn->reconfigure(
+ cfg->conn, "async=(enabled=false)")) != 0) {
+ lprintf(cfg, ret, 0, "Reconfigure async off failed");
+ return (ret);
+ }
+ }
return (0);
}
diff --git a/dist/flags.py b/dist/flags.py
index f52ab2fc756..d065bb5dfbf 100644
--- a/dist/flags.py
+++ b/dist/flags.py
@@ -89,6 +89,7 @@ flags = {
'CONN_LSM_MERGE',
'CONN_PANIC',
'CONN_SERVER_RUN',
+ 'CONN_SERVER_ASYNC',
'CONN_SERVER_CHECKPOINT',
'CONN_SERVER_STATISTICS',
'CONN_WAS_BACKUP',
@@ -102,6 +103,7 @@ flags = {
'SESSION_NO_SCHEMA_LOCK',
'SESSION_SALVAGE_CORRUPT_OK',
'SESSION_SCHEMA_LOCKED',
+ 'SESSION_SERVER_ASYNC',
],
}
diff --git a/src/async/async_api.c b/src/async/async_api.c
index c99e9b3846c..47446247c26 100644
--- a/src/async/async_api.c
+++ b/src/async/async_api.c
@@ -175,30 +175,33 @@ err:
* Parse and setup the async API options.
*/
static int
-__async_config(WT_SESSION_IMPL *session, const char **cfg, int *runp)
+__async_config(WT_SESSION_IMPL *session,
+ WT_CONNECTION_IMPL *conn, const char **cfg, int *runp)
{
WT_CONFIG_ITEM cval;
- WT_CONNECTION_IMPL *conn;
WT_DECL_RET;
- conn = S2C(session);
-
/*
* The async configuration is off by default.
*/
- WT_RET(__wt_config_gets(session, cfg, "async.enabled", &cval));
- *runp = cval.val != 0;
- if (*runp == 0)
- return (0);
-
- WT_RET(__wt_config_gets(session, cfg, "async.ops_max", &cval));
- conn->async_size = (uint32_t)cval.val;
+ if ((ret = __wt_config_gets(
+ session, cfg, "async.enabled", &cval)) == 0) {
+ *runp = cval.val != 0;
+ if (*runp == 0)
+ return (0);
+ }
- WT_RET(__wt_config_gets(session, cfg, "async.threads", &cval));
- conn->async_workers = (uint32_t)cval.val;
+ if ((ret = __wt_config_gets(
+ session, cfg, "async.ops_max", &cval)) == 0)
+ conn->async_size = (uint32_t)cval.val;
- /* Sanity check that api_data.py is in sync with async.h */
- WT_ASSERT(session, conn->async_workers <= WT_ASYNC_MAX_WORKERS);
+ if ((ret = __wt_config_gets(
+ session, cfg, "async.threads", &cval)) == 0) {
+ conn->async_workers = (uint32_t)cval.val;
+ /* Sanity check that api_data.py is in sync with async.h */
+ WT_ASSERT(session, conn->async_workers <= WT_ASYNC_MAX_WORKERS);
+ }
+ WT_RET_NOTFOUND_OK(ret);
ret = 0;
return (ret);
@@ -222,6 +225,7 @@ __wt_async_stats_update(WT_SESSION_IMPL *session)
stats = &conn->stats;
WT_STAT_SET(stats, async_cur_queue, async->cur_queue);
WT_STAT_SET(stats, async_max_queue, async->max_queue);
+ F_SET(conn, WT_CONN_SERVER_ASYNC);
}
/*
@@ -239,7 +243,7 @@ __wt_async_create(WT_CONNECTION_IMPL *conn, const char *cfg[])
session = conn->default_session;
/* Handle configuration. */
- WT_RET(__async_config(session, cfg, &run));
+ WT_RET(__async_config(session, conn, cfg, &run));
/* If async is not configured, we're done. */
if (!run)
@@ -259,13 +263,19 @@ __wt_async_create(WT_CONNECTION_IMPL *conn, const char *cfg[])
/*
* Start up the worker threads.
*/
+ F_SET(conn, WT_CONN_SERVER_ASYNC);
for (i = 0; i < conn->async_workers; i++) {
/*
- * Each worker has its own session.
+ * Each worker has its own session. We set both a general
+ * server flag in the connection and an individual flag
+ * in the session. The user may reconfigure the number of
+ * workers and we may want to selectively stop some workers
+ * while leaving the rest running.
*/
WT_RET(__wt_open_session(
conn, 1, NULL, NULL, &async->worker_sessions[i]));
async->worker_sessions[i]->name = "async-worker";
+ F_SET(async->worker_sessions[i], WT_SESSION_SERVER_ASYNC);
}
for (i = 0; i < conn->async_workers; i++) {
/*
@@ -279,6 +289,126 @@ __wt_async_create(WT_CONNECTION_IMPL *conn, const char *cfg[])
}
/*
+ * __wt_async_reconfig --
+ * Start the async subsystem and worker threads.
+ */
+int
+__wt_async_reconfig(WT_CONNECTION_IMPL *conn, const char *cfg[])
+{
+ WT_ASYNC *async;
+ WT_CONNECTION_IMPL tmp_conn;
+ WT_DECL_RET;
+ WT_SESSION *wt_session;
+ WT_SESSION_IMPL *session;
+ int run;
+ uint32_t i;
+
+ session = conn->default_session;
+ async = conn->async;
+
+ /* Handle configuration. */
+ WT_RET(__async_config(session, &tmp_conn, cfg, &run));
+
+ /*
+ * There are some restrictions on the live reconfiguration of async.
+ * Unlike other subsystems where we simply destroy anything existing
+ * and restart with the new configuration, async is not so easy.
+ * If the user is just changing the number of workers, we want to
+ * allow the existing op handles and other information to remain in
+ * existence. So we must handle various combinations of changes
+ * individually.
+ *
+ * One restriction is that if async is currently on, the user cannot
+ * change the number of async op handles available. The user can try
+ * but we do nothing with it. However we must allow the ops_max config
+ * string so that a user can completely start async via reconfigure.
+ */
+
+ /*
+ * Easy cases:
+ * 1. If async is on and the user wants it off, shut it down.
+ * 2. If async is off, and the user wants it on, start it.
+ * 3. If not a toggle and async is off, we're done.
+ */
+ if (conn->async_cfg > 0 && !run) {
+ /* Case 1 */
+ WT_TRET(__wt_async_flush(conn));
+ ret = __wt_async_destroy(conn);
+ conn->async_cfg = 0;
+ return (ret);
+ } else if (conn->async_cfg == 0 && run)
+ /* Case 2 */
+ return (__wt_async_create(conn, cfg));
+ else if (conn->async_cfg == 0)
+ /* Case 3 */
+ return (0);
+
+ /*
+ * Running async worker modification cases:
+ * 4. If number of workers didn't change, we're done.
+ * 5. If more workers, start new ones.
+ * 6. If fewer workers, kill some.
+ */
+ if (conn->async_workers == tmp_conn.async_workers)
+ /* No change in the number of workers. */
+ return (0);
+ if (conn->async_workers < tmp_conn.async_workers) {
+ /* Case 5 */
+ /*
+ * The worker_sessions array is allocated for the maximum
+ * allowed number of workers, so starting more is easy.
+ */
+ for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
+ /*
+ * Each worker has its own session.
+ */
+ WT_RET(__wt_open_session(
+ conn, 1, NULL, NULL, &async->worker_sessions[i]));
+ async->worker_sessions[i]->name = "async-worker";
+ F_SET(async->worker_sessions[i],
+ WT_SESSION_SERVER_ASYNC);
+ }
+ for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
+ /*
+ * Start the threads.
+ */
+ WT_RET(__wt_thread_create(session,
+ &async->worker_tids[i], __wt_async_worker,
+ async->worker_sessions[i]));
+ }
+ conn->async_workers = tmp_conn.async_workers;
+ }
+ if (conn->async_workers > tmp_conn.async_workers) {
+ /* Case 6 */
+ /*
+ * Stopping an individual async worker is the most complex case.
+ * We clear the session async flag on the targeted worker thread
+ * so that only that thread stops, and the others keep running.
+ */
+ for (i = conn->async_workers - 1;
+ i >= tmp_conn.async_workers; i--) {
+ /*
+ * Join any worker we're stopping.
+ * After the thread is stopped, close its session.
+ */
+ WT_ASSERT(session, async->worker_tids[i] != 0);
+ WT_ASSERT(session, async->worker_sessions[i] != NULL);
+ F_CLR(async->worker_sessions[i],
+ WT_SESSION_SERVER_ASYNC);
+ WT_TRET(__wt_thread_join(
+ session, async->worker_tids[i]));
+ async->worker_tids[i] = 0;
+ wt_session = &async->worker_sessions[i]->iface;
+ WT_TRET(wt_session->close(wt_session, NULL));
+ async->worker_sessions[i] = NULL;
+ }
+ conn->async_workers = tmp_conn.async_workers;
+ }
+
+ return (0);
+}
+
+/*
* __wt_async_destroy --
* Destroy the async worker threads and async subsystem.
*/
@@ -298,6 +428,8 @@ __wt_async_destroy(WT_CONNECTION_IMPL *conn)
if (!conn->async_cfg)
return (0);
+
+ F_CLR(conn, WT_CONN_SERVER_ASYNC);
for (i = 0; i < conn->async_workers; i++)
if (async->worker_tids[i] != 0) {
WT_TRET(__wt_thread_join(
@@ -447,10 +579,10 @@ __wt_async_new_op(WT_CONNECTION_IMPL *conn, const char *uri,
WT_ASYNC_OP_IMPL *op;
WT_DECL_RET;
+ *opp = NULL;
if (!conn->async_cfg)
return (0);
- *opp = NULL;
op = NULL;
WT_ERR(__async_new_op_alloc(conn, uri, config, &op));
WT_ERR(__async_runtime_config(op, cfg));
diff --git a/src/async/async_worker.c b/src/async/async_worker.c
index 812d695d527..26236c72dc4 100644
--- a/src/async/async_worker.c
+++ b/src/async/async_worker.c
@@ -52,7 +52,9 @@ retry:
sleep_usec = WT_MIN(sleep_usec * 2,
MAX_ASYNC_SLEEP_USECS);
}
- if (!F_ISSET(conn, WT_CONN_SERVER_RUN))
+ if (!F_ISSET(session, WT_SESSION_SERVER_ASYNC))
+ return (0);
+ if (!F_ISSET(conn, WT_CONN_SERVER_ASYNC))
return (0);
if (F_ISSET(conn, WT_CONN_PANIC))
return (__wt_panic(session));
@@ -299,7 +301,8 @@ __wt_async_worker(void *arg)
worker.num_cursors = 0;
STAILQ_INIT(&worker.cursorqh);
- while (F_ISSET(conn, WT_CONN_SERVER_RUN)) {
+ while (F_ISSET(conn, WT_CONN_SERVER_ASYNC) &&
+ F_ISSET(session, WT_SESSION_SERVER_ASYNC)) {
WT_ERR(__async_op_dequeue(conn, session, &op));
if (op != NULL && op != &async->flush_op) {
/*
diff --git a/src/conn/conn_api.c b/src/conn/conn_api.c
index 2aa628283e6..66b733c2579 100644
--- a/src/conn/conn_api.c
+++ b/src/conn/conn_api.c
@@ -604,6 +604,7 @@ __conn_reconfigure(WT_CONNECTION *wt_conn, const char *config)
WT_ERR(__wt_conn_cache_pool_config(session, cfg));
WT_ERR(__wt_cache_config(conn, raw_cfg));
+ WT_ERR(__wt_async_reconfig(conn, raw_cfg));
WT_ERR(__conn_statistics_config(session, raw_cfg));
WT_ERR(__wt_conn_verbose_config(session, raw_cfg));
WT_ERR(__wt_checkpoint_server_create(conn, cfg));
diff --git a/src/include/extern.h b/src/include/extern.h
index 446100a3937..1aa577ef4f2 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -2,6 +2,7 @@
extern void __wt_async_stats_update(WT_SESSION_IMPL *session);
extern int __wt_async_create(WT_CONNECTION_IMPL *conn, const char *cfg[]);
+extern int __wt_async_reconfig(WT_CONNECTION_IMPL *conn, const char *cfg[]);
extern int __wt_async_destroy(WT_CONNECTION_IMPL *conn);
extern int __wt_async_flush(WT_CONNECTION_IMPL *conn);
extern int __wt_async_new_op(WT_CONNECTION_IMPL *conn,
diff --git a/src/include/flags.h b/src/include/flags.h
index 457871751e8..b242bab4e86 100644
--- a/src/include/flags.h
+++ b/src/include/flags.h
@@ -3,12 +3,13 @@
* flags section: BEGIN
*/
#define WT_CACHE_POOL_RUN 0x00000001
-#define WT_CONN_CACHE_POOL 0x00000200
-#define WT_CONN_CKPT_SYNC 0x00000100
-#define WT_CONN_EVICTION_RUN 0x00000080
-#define WT_CONN_LEAK_MEMORY 0x00000040
-#define WT_CONN_LSM_MERGE 0x00000020
-#define WT_CONN_PANIC 0x00000010
+#define WT_CONN_CACHE_POOL 0x00000400
+#define WT_CONN_CKPT_SYNC 0x00000200
+#define WT_CONN_EVICTION_RUN 0x00000100
+#define WT_CONN_LEAK_MEMORY 0x00000080
+#define WT_CONN_LSM_MERGE 0x00000040
+#define WT_CONN_PANIC 0x00000020
+#define WT_CONN_SERVER_ASYNC 0x00000010
#define WT_CONN_SERVER_CHECKPOINT 0x00000008
#define WT_CONN_SERVER_RUN 0x00000004
#define WT_CONN_SERVER_STATISTICS 0x00000002
@@ -32,14 +33,15 @@
#define WT_READ_SKIP_LEAF 0x00000004
#define WT_READ_TRUNCATE 0x00000002
#define WT_READ_WONT_NEED 0x00000001
-#define WT_SESSION_INTERNAL 0x00000080
-#define WT_SESSION_LOGGING_INMEM 0x00000040
-#define WT_SESSION_NO_CACHE 0x00000020
-#define WT_SESSION_NO_CACHE_CHECK 0x00000010
-#define WT_SESSION_NO_LOGGING 0x00000008
-#define WT_SESSION_NO_SCHEMA_LOCK 0x00000004
-#define WT_SESSION_SALVAGE_CORRUPT_OK 0x00000002
-#define WT_SESSION_SCHEMA_LOCKED 0x00000001
+#define WT_SESSION_INTERNAL 0x00000100
+#define WT_SESSION_LOGGING_INMEM 0x00000080
+#define WT_SESSION_NO_CACHE 0x00000040
+#define WT_SESSION_NO_CACHE_CHECK 0x00000020
+#define WT_SESSION_NO_LOGGING 0x00000010
+#define WT_SESSION_NO_SCHEMA_LOCK 0x00000008
+#define WT_SESSION_SALVAGE_CORRUPT_OK 0x00000004
+#define WT_SESSION_SCHEMA_LOCKED 0x00000002
+#define WT_SESSION_SERVER_ASYNC 0x00000001
#define WT_SKIP_UPDATE_ERR 0x00000002
#define WT_SKIP_UPDATE_RESTORE 0x00000001
#define WT_SYNC_CHECKPOINT 0x00000008