diff options
author | Susan LoVerso <sue@wiredtiger.com> | 2014-05-13 12:34:49 -0400 |
---|---|---|
committer | Susan LoVerso <sue@wiredtiger.com> | 2014-05-13 12:34:49 -0400 |
commit | f17eaaac762bb3ba50a0d789ffde131c7cf654ad (patch) | |
tree | 446dd33bd63c89251517fb427a203373324620a8 | |
parent | 4a2583d492a2a923a895b897293689fae656a1e6 (diff) | |
download | mongo-f17eaaac762bb3ba50a0d789ffde131c7cf654ad.tar.gz |
Allow async to be reconfigured. #1002
-rw-r--r-- | bench/wtperf/wtperf.c | 13 | ||||
-rw-r--r-- | dist/flags.py | 2 | ||||
-rw-r--r-- | src/async/async_api.c | 168 | ||||
-rw-r--r-- | src/async/async_worker.c | 7 | ||||
-rw-r--r-- | src/conn/conn_api.c | 1 | ||||
-rw-r--r-- | src/include/extern.h | 1 | ||||
-rw-r--r-- | src/include/flags.h | 30 |
7 files changed, 188 insertions, 34 deletions
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c index 2a4bf2cc618..d56b5452ad8 100644 --- a/bench/wtperf/wtperf.c +++ b/bench/wtperf/wtperf.c @@ -1323,6 +1323,19 @@ retry: if ((ret = cfg->conn->async_new_op(cfg->conn, lprintf(cfg, ret, 0, "Re-opening the connection failed"); return (ret); } + /* + * If we started async threads only for the purposes of compact, + * then turn it off before starting the workload so that those extra + * threads looking for work that will never arrive don't affect + * performance. + */ + if (cfg->compact && cfg->use_asyncops == 0) { + if ((ret = cfg->conn->reconfigure( + cfg->conn, "async=(enabled=false)")) != 0) { + lprintf(cfg, ret, 0, "Reconfigure async off failed"); + return (ret); + } + } return (0); } diff --git a/dist/flags.py b/dist/flags.py index f52ab2fc756..d065bb5dfbf 100644 --- a/dist/flags.py +++ b/dist/flags.py @@ -89,6 +89,7 @@ flags = { 'CONN_LSM_MERGE', 'CONN_PANIC', 'CONN_SERVER_RUN', + 'CONN_SERVER_ASYNC', 'CONN_SERVER_CHECKPOINT', 'CONN_SERVER_STATISTICS', 'CONN_WAS_BACKUP', @@ -102,6 +103,7 @@ flags = { 'SESSION_NO_SCHEMA_LOCK', 'SESSION_SALVAGE_CORRUPT_OK', 'SESSION_SCHEMA_LOCKED', + 'SESSION_SERVER_ASYNC', ], } diff --git a/src/async/async_api.c b/src/async/async_api.c index c99e9b3846c..47446247c26 100644 --- a/src/async/async_api.c +++ b/src/async/async_api.c @@ -175,30 +175,33 @@ err: * Parse and setup the async API options. */ static int -__async_config(WT_SESSION_IMPL *session, const char **cfg, int *runp) +__async_config(WT_SESSION_IMPL *session, + WT_CONNECTION_IMPL *conn, const char **cfg, int *runp) { WT_CONFIG_ITEM cval; - WT_CONNECTION_IMPL *conn; WT_DECL_RET; - conn = S2C(session); - /* * The async configuration is off by default. */ - WT_RET(__wt_config_gets(session, cfg, "async.enabled", &cval)); - *runp = cval.val != 0; - if (*runp == 0) - return (0); - - WT_RET(__wt_config_gets(session, cfg, "async.ops_max", &cval)); - conn->async_size = (uint32_t)cval.val; + if ((ret = __wt_config_gets( + session, cfg, "async.enabled", &cval)) == 0) { + *runp = cval.val != 0; + if (*runp == 0) + return (0); + } - WT_RET(__wt_config_gets(session, cfg, "async.threads", &cval)); - conn->async_workers = (uint32_t)cval.val; + if ((ret = __wt_config_gets( + session, cfg, "async.ops_max", &cval)) == 0) + conn->async_size = (uint32_t)cval.val; - /* Sanity check that api_data.py is in sync with async.h */ - WT_ASSERT(session, conn->async_workers <= WT_ASYNC_MAX_WORKERS); + if ((ret = __wt_config_gets( + session, cfg, "async.threads", &cval)) == 0) { + conn->async_workers = (uint32_t)cval.val; + /* Sanity check that api_data.py is in sync with async.h */ + WT_ASSERT(session, conn->async_workers <= WT_ASYNC_MAX_WORKERS); + } + WT_RET_NOTFOUND_OK(ret); ret = 0; return (ret); @@ -222,6 +225,7 @@ __wt_async_stats_update(WT_SESSION_IMPL *session) stats = &conn->stats; WT_STAT_SET(stats, async_cur_queue, async->cur_queue); WT_STAT_SET(stats, async_max_queue, async->max_queue); + F_SET(conn, WT_CONN_SERVER_ASYNC); } /* @@ -239,7 +243,7 @@ __wt_async_create(WT_CONNECTION_IMPL *conn, const char *cfg[]) session = conn->default_session; /* Handle configuration. */ - WT_RET(__async_config(session, cfg, &run)); + WT_RET(__async_config(session, conn, cfg, &run)); /* If async is not configured, we're done. */ if (!run) @@ -259,13 +263,19 @@ __wt_async_create(WT_CONNECTION_IMPL *conn, const char *cfg[]) /* * Start up the worker threads. */ + F_SET(conn, WT_CONN_SERVER_ASYNC); for (i = 0; i < conn->async_workers; i++) { /* - * Each worker has its own session. + * Each worker has its own session. We set both a general + * server flag in the connection and an individual flag + * in the session. The user may reconfigure the number of + * workers and we may want to selectively stop some workers + * while leaving the rest running. */ WT_RET(__wt_open_session( conn, 1, NULL, NULL, &async->worker_sessions[i])); async->worker_sessions[i]->name = "async-worker"; + F_SET(async->worker_sessions[i], WT_SESSION_SERVER_ASYNC); } for (i = 0; i < conn->async_workers; i++) { /* @@ -279,6 +289,126 @@ __wt_async_create(WT_CONNECTION_IMPL *conn, const char *cfg[]) } /* + * __wt_async_reconfig -- + * Start the async subsystem and worker threads. + */ +int +__wt_async_reconfig(WT_CONNECTION_IMPL *conn, const char *cfg[]) +{ + WT_ASYNC *async; + WT_CONNECTION_IMPL tmp_conn; + WT_DECL_RET; + WT_SESSION *wt_session; + WT_SESSION_IMPL *session; + int run; + uint32_t i; + + session = conn->default_session; + async = conn->async; + + /* Handle configuration. */ + WT_RET(__async_config(session, &tmp_conn, cfg, &run)); + + /* + * There are some restrictions on the live reconfiguration of async. + * Unlike other subsystems where we simply destroy anything existing + * and restart with the new configuration, async is not so easy. + * If the user is just changing the number of workers, we want to + * allow the existing op handles and other information to remain in + * existence. So we must handle various combinations of changes + * individually. + * + * One restriction is that if async is currently on, the user cannot + * change the number of async op handles available. The user can try + * but we do nothing with it. However we must allow the ops_max config + * string so that a user can completely start async via reconfigure. + */ + + /* + * Easy cases: + * 1. If async is on and the user wants it off, shut it down. + * 2. If async is off, and the user wants it on, start it. + * 3. If not a toggle and async is off, we're done. + */ + if (conn->async_cfg > 0 && !run) { + /* Case 1 */ + WT_TRET(__wt_async_flush(conn)); + ret = __wt_async_destroy(conn); + conn->async_cfg = 0; + return (ret); + } else if (conn->async_cfg == 0 && run) + /* Case 2 */ + return (__wt_async_create(conn, cfg)); + else if (conn->async_cfg == 0) + /* Case 3 */ + return (0); + + /* + * Running async worker modification cases: + * 4. If number of workers didn't change, we're done. + * 5. If more workers, start new ones. + * 6. If fewer workers, kill some. + */ + if (conn->async_workers == tmp_conn.async_workers) + /* No change in the number of workers. */ + return (0); + if (conn->async_workers < tmp_conn.async_workers) { + /* Case 5 */ + /* + * The worker_sessions array is allocated for the maximum + * allowed number of workers, so starting more is easy. + */ + for (i = conn->async_workers; i < tmp_conn.async_workers; i++) { + /* + * Each worker has its own session. + */ + WT_RET(__wt_open_session( + conn, 1, NULL, NULL, &async->worker_sessions[i])); + async->worker_sessions[i]->name = "async-worker"; + F_SET(async->worker_sessions[i], + WT_SESSION_SERVER_ASYNC); + } + for (i = conn->async_workers; i < tmp_conn.async_workers; i++) { + /* + * Start the threads. + */ + WT_RET(__wt_thread_create(session, + &async->worker_tids[i], __wt_async_worker, + async->worker_sessions[i])); + } + conn->async_workers = tmp_conn.async_workers; + } + if (conn->async_workers > tmp_conn.async_workers) { + /* Case 6 */ + /* + * Stopping an individual async worker is the most complex case. + * We clear the session async flag on the targeted worker thread + * so that only that thread stops, and the others keep running. + */ + for (i = conn->async_workers - 1; + i >= tmp_conn.async_workers; i--) { + /* + * Join any worker we're stopping. + * After the thread is stopped, close its session. + */ + WT_ASSERT(session, async->worker_tids[i] != 0); + WT_ASSERT(session, async->worker_sessions[i] != NULL); + F_CLR(async->worker_sessions[i], + WT_SESSION_SERVER_ASYNC); + WT_TRET(__wt_thread_join( + session, async->worker_tids[i])); + async->worker_tids[i] = 0; + wt_session = &async->worker_sessions[i]->iface; + WT_TRET(wt_session->close(wt_session, NULL)); + async->worker_sessions[i] = NULL; + } + conn->async_workers = tmp_conn.async_workers; + } + + return (0); +} + +/* * __wt_async_destroy -- * Destroy the async worker threads and async subsystem. */ @@ -298,6 +428,8 @@ __wt_async_destroy(WT_CONNECTION_IMPL *conn) if (!conn->async_cfg) return (0); + + F_CLR(conn, WT_CONN_SERVER_ASYNC); for (i = 0; i < conn->async_workers; i++) if (async->worker_tids[i] != 0) { WT_TRET(__wt_thread_join( @@ -447,10 +579,10 @@ __wt_async_new_op(WT_CONNECTION_IMPL *conn, const char *uri, WT_ASYNC_OP_IMPL *op; WT_DECL_RET; + *opp = NULL; if (!conn->async_cfg) return (0); - *opp = NULL; op = NULL; WT_ERR(__async_new_op_alloc(conn, uri, config, &op)); WT_ERR(__async_runtime_config(op, cfg)); diff --git a/src/async/async_worker.c b/src/async/async_worker.c index 812d695d527..26236c72dc4 100644 --- a/src/async/async_worker.c +++ b/src/async/async_worker.c @@ -52,7 +52,9 @@ retry: sleep_usec = WT_MIN(sleep_usec * 2, MAX_ASYNC_SLEEP_USECS); } - if (!F_ISSET(conn, WT_CONN_SERVER_RUN)) + if (!F_ISSET(session, WT_SESSION_SERVER_ASYNC)) + return (0); + if (!F_ISSET(conn, WT_CONN_SERVER_ASYNC)) return (0); if (F_ISSET(conn, WT_CONN_PANIC)) return (__wt_panic(session)); @@ -299,7 +301,8 @@ __wt_async_worker(void *arg) worker.num_cursors = 0; STAILQ_INIT(&worker.cursorqh); - while (F_ISSET(conn, WT_CONN_SERVER_RUN)) { + while (F_ISSET(conn, WT_CONN_SERVER_ASYNC) && + F_ISSET(session, WT_SESSION_SERVER_ASYNC)) { WT_ERR(__async_op_dequeue(conn, session, &op)); if (op != NULL && op != &async->flush_op) { /* diff --git a/src/conn/conn_api.c b/src/conn/conn_api.c index 2aa628283e6..66b733c2579 100644 --- a/src/conn/conn_api.c +++ b/src/conn/conn_api.c @@ -604,6 +604,7 @@ __conn_reconfigure(WT_CONNECTION *wt_conn, const char *config) WT_ERR(__wt_conn_cache_pool_config(session, cfg)); WT_ERR(__wt_cache_config(conn, raw_cfg)); + WT_ERR(__wt_async_reconfig(conn, raw_cfg)); WT_ERR(__conn_statistics_config(session, raw_cfg)); WT_ERR(__wt_conn_verbose_config(session, raw_cfg)); WT_ERR(__wt_checkpoint_server_create(conn, cfg)); diff --git a/src/include/extern.h b/src/include/extern.h index 446100a3937..1aa577ef4f2 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -2,6 +2,7 @@ extern void __wt_async_stats_update(WT_SESSION_IMPL *session); extern int __wt_async_create(WT_CONNECTION_IMPL *conn, const char *cfg[]); +extern int __wt_async_reconfig(WT_CONNECTION_IMPL *conn, const char *cfg[]); extern int __wt_async_destroy(WT_CONNECTION_IMPL *conn); extern int __wt_async_flush(WT_CONNECTION_IMPL *conn); extern int __wt_async_new_op(WT_CONNECTION_IMPL *conn, diff --git a/src/include/flags.h b/src/include/flags.h index 457871751e8..b242bab4e86 100644 --- a/src/include/flags.h +++ b/src/include/flags.h @@ -3,12 +3,13 @@ * flags section: BEGIN */ #define WT_CACHE_POOL_RUN 0x00000001 -#define WT_CONN_CACHE_POOL 0x00000200 -#define WT_CONN_CKPT_SYNC 0x00000100 -#define WT_CONN_EVICTION_RUN 0x00000080 -#define WT_CONN_LEAK_MEMORY 0x00000040 -#define WT_CONN_LSM_MERGE 0x00000020 -#define WT_CONN_PANIC 0x00000010 +#define WT_CONN_CACHE_POOL 0x00000400 +#define WT_CONN_CKPT_SYNC 0x00000200 +#define WT_CONN_EVICTION_RUN 0x00000100 +#define WT_CONN_LEAK_MEMORY 0x00000080 +#define WT_CONN_LSM_MERGE 0x00000040 +#define WT_CONN_PANIC 0x00000020 +#define WT_CONN_SERVER_ASYNC 0x00000010 #define WT_CONN_SERVER_CHECKPOINT 0x00000008 #define WT_CONN_SERVER_RUN 0x00000004 #define WT_CONN_SERVER_STATISTICS 0x00000002 @@ -32,14 +33,15 @@ #define WT_READ_SKIP_LEAF 0x00000004 #define WT_READ_TRUNCATE 0x00000002 #define WT_READ_WONT_NEED 0x00000001 -#define WT_SESSION_INTERNAL 0x00000080 -#define WT_SESSION_LOGGING_INMEM 0x00000040 -#define WT_SESSION_NO_CACHE 0x00000020 -#define WT_SESSION_NO_CACHE_CHECK 0x00000010 -#define WT_SESSION_NO_LOGGING 0x00000008 -#define WT_SESSION_NO_SCHEMA_LOCK 0x00000004 -#define WT_SESSION_SALVAGE_CORRUPT_OK 0x00000002 -#define WT_SESSION_SCHEMA_LOCKED 0x00000001 +#define WT_SESSION_INTERNAL 0x00000100 +#define WT_SESSION_LOGGING_INMEM 0x00000080 +#define WT_SESSION_NO_CACHE 0x00000040 +#define WT_SESSION_NO_CACHE_CHECK 0x00000020 +#define WT_SESSION_NO_LOGGING 0x00000010 +#define WT_SESSION_NO_SCHEMA_LOCK 0x00000008 +#define WT_SESSION_SALVAGE_CORRUPT_OK 0x00000004 +#define WT_SESSION_SCHEMA_LOCKED 0x00000002 +#define WT_SESSION_SERVER_ASYNC 0x00000001 #define WT_SKIP_UPDATE_ERR 0x00000002 #define WT_SKIP_UPDATE_RESTORE 0x00000001 #define WT_SYNC_CHECKPOINT 0x00000008 |