diff options
-rw-r--r-- | dist/flags.py | 4 | ||||
-rw-r--r-- | src/conn/conn_dhandle.c | 3 | ||||
-rw-r--r-- | src/conn/conn_log.c | 12 | ||||
-rw-r--r-- | src/conn/conn_open.c | 19 | ||||
-rw-r--r-- | src/include/extern.h | 1 | ||||
-rw-r--r-- | src/include/flags.h | 14 | ||||
-rw-r--r-- | src/include/lsm.h | 10 | ||||
-rw-r--r-- | src/lsm/lsm_manager.c | 54 | ||||
-rw-r--r-- | src/lsm/lsm_worker.c | 23 | ||||
-rw-r--r-- | src/os_posix/os_thread.c | 14 | ||||
-rw-r--r-- | src/os_win/os_thread.c | 14 | ||||
-rw-r--r-- | src/session/session_api.c | 4 |
12 files changed, 114 insertions, 58 deletions
diff --git a/dist/flags.py b/dist/flags.py index b20a7181532..64b5d789e72 100644 --- a/dist/flags.py +++ b/dist/flags.py @@ -96,19 +96,19 @@ flags = { 'CONN_CACHE_POOL', 'CONN_CKPT_SYNC', 'CONN_CLOSING', + 'CONN_CLOSING_NO_MORE_OPENS', 'CONN_EVICTION_RUN', 'CONN_IN_MEMORY', 'CONN_LAS_OPEN', 'CONN_LEAK_MEMORY', - 'CONN_LOG_SERVER_RUN', 'CONN_LSM_MERGE', 'CONN_PANIC', 'CONN_READONLY', 'CONN_RECOVERING', 'CONN_SERVER_ASYNC', 'CONN_SERVER_CHECKPOINT', + 'CONN_SERVER_LOG', 'CONN_SERVER_LSM', - 'CONN_SERVER_RUN', 'CONN_SERVER_STATISTICS', 'CONN_SERVER_SWEEP', 'CONN_WAS_BACKUP', diff --git a/src/conn/conn_dhandle.c b/src/conn/conn_dhandle.c index c5480897494..657cdebf7ee 100644 --- a/src/conn/conn_dhandle.c +++ b/src/conn/conn_dhandle.c @@ -314,7 +314,8 @@ __wt_conn_btree_open( F_ISSET(dhandle, WT_DHANDLE_EXCLUSIVE) && !LF_ISSET(WT_DHANDLE_LOCK_ONLY)); - WT_ASSERT(session, !F_ISSET(S2C(session), WT_CONN_CLOSING)); + WT_ASSERT(session, + !F_ISSET(S2C(session), WT_CONN_CLOSING_NO_MORE_OPENS)); /* * If the handle is already open, it has to be closed so it can be diff --git a/src/conn/conn_log.c b/src/conn/conn_log.c index c6dd795389d..b8b5bd2a908 100644 --- a/src/conn/conn_log.c +++ b/src/conn/conn_log.c @@ -341,7 +341,7 @@ __wt_log_truncate_files( conn = S2C(session); if (!FLD_ISSET(conn->log_flags, WT_CONN_LOG_ENABLED)) return (0); - if (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN) && + if (F_ISSET(conn, WT_CONN_SERVER_LOG) && FLD_ISSET(conn->log_flags, WT_CONN_LOG_ARCHIVE)) WT_RET_MSG(session, EINVAL, "Attempt to archive manually while a server is running"); @@ -382,7 +382,7 @@ __log_file_server(void *arg) conn = S2C(session); log = conn->log; locked = false; - while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) { + while (F_ISSET(conn, WT_CONN_SERVER_LOG)) { /* * If there is a log file to close, make sure any outstanding * write operations have completed, then fsync and close it. @@ -708,7 +708,7 @@ __log_wrlsn_server(void *arg) log = conn->log; yield = 0; WT_INIT_LSN(&prev); - while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) { + while (F_ISSET(conn, WT_CONN_SERVER_LOG)) { /* * Write out any log record buffers if anything was done * since last time. Only call the function to walk the @@ -783,7 +783,7 @@ __log_server(void *arg) * takes to sync out an earlier file. */ did_work = true; - while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) { + while (F_ISSET(conn, WT_CONN_SERVER_LOG)) { /* * Slots depend on future activity. Force out buffered * writes in case we are idle. This cannot be part of the @@ -923,7 +923,7 @@ __wt_logmgr_open(WT_SESSION_IMPL *session) if (!FLD_ISSET(conn->log_flags, WT_CONN_LOG_ENABLED)) return (0); - F_SET(conn, WT_CONN_LOG_SERVER_RUN); + F_SET(conn, WT_CONN_SERVER_LOG); /* * Start the log close thread. It is not configurable. @@ -995,7 +995,7 @@ __wt_logmgr_destroy(WT_SESSION_IMPL *session) conn = S2C(session); - F_CLR(conn, WT_CONN_LOG_SERVER_RUN); + F_CLR(conn, WT_CONN_SERVER_LOG); if (!FLD_ISSET(conn->log_flags, WT_CONN_LOG_ENABLED)) { /* diff --git a/src/conn/conn_open.c b/src/conn/conn_open.c index 5b20377d437..eb3c79422a0 100644 --- a/src/conn/conn_open.c +++ b/src/conn/conn_open.c @@ -21,12 +21,6 @@ __wt_connection_open(WT_CONNECTION_IMPL *conn, const char *cfg[]) session = conn->default_session; WT_ASSERT(session, session->iface.connection == &conn->iface); - /* - * Tell internal server threads to run: this must be set before opening - * any sessions. - */ - F_SET(conn, WT_CONN_SERVER_RUN); - /* WT_SESSION_IMPL array. */ WT_RET(__wt_calloc(session, conn->session_size, sizeof(WT_SESSION_IMPL), &conn->sessions)); @@ -100,6 +94,10 @@ __wt_connection_close(WT_CONNECTION_IMPL *conn) __wt_yield(); } + /* Shut down the subsystems, ensuring workers see the state change. */ + F_SET(conn, WT_CONN_CLOSING); + WT_FULL_BARRIER(); + /* * Clear any pending async operations and shut down the async worker * threads and system before closing LSM. @@ -113,10 +111,15 @@ __wt_connection_close(WT_CONNECTION_IMPL *conn) * btree handles, so take care in ordering shutdown to make sure they * exit before files are closed. */ - F_CLR(conn, WT_CONN_SERVER_RUN); WT_TRET(__wt_lsm_manager_destroy(session)); - F_SET(conn, WT_CONN_CLOSING); + /* + * Once the async and LSM threads exit, we shouldn't be opening any + * more files. + */ + F_SET(conn, WT_CONN_CLOSING_NO_MORE_OPENS); + WT_FULL_BARRIER(); + WT_TRET(__wt_checkpoint_server_destroy(session)); WT_TRET(__wt_statlog_destroy(session, true)); WT_TRET(__wt_sweep_destroy(session)); diff --git a/src/include/extern.h b/src/include/extern.h index 2759ac1dec3..47b4e03a7b7 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -458,6 +458,7 @@ extern int __wt_lsm_work_bloom(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) extern int __wt_lsm_checkpoint_chunk(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, WT_LSM_CHUNK *chunk) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden"))); extern int __wt_lsm_free_chunks(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden"))); extern int __wt_lsm_worker_start(WT_SESSION_IMPL *session, WT_LSM_WORKER_ARGS *args) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden"))); +extern int __wt_lsm_worker_stop(WT_SESSION_IMPL *session, WT_LSM_WORKER_ARGS *args) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden"))); extern int __wt_meta_apply_all(WT_SESSION_IMPL *session, int (*file_func)(WT_SESSION_IMPL *, const char *[]), int (*name_func)(WT_SESSION_IMPL *, const char *, bool *), const char *cfg[]) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden"))); extern int __wt_meta_checkpoint(WT_SESSION_IMPL *session, const char *fname, const char *checkpoint, WT_CKPT *ckpt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden"))); extern int __wt_meta_checkpoint_last_name( WT_SESSION_IMPL *session, const char *fname, const char **namep) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden"))); diff --git a/src/include/flags.h b/src/include/flags.h index c1fff920e3b..f26a45c68f5 100644 --- a/src/include/flags.h +++ b/src/include/flags.h @@ -6,19 +6,19 @@ #define WT_CONN_CACHE_POOL 0x00000001 #define WT_CONN_CKPT_SYNC 0x00000002 #define WT_CONN_CLOSING 0x00000004 -#define WT_CONN_EVICTION_RUN 0x00000008 -#define WT_CONN_IN_MEMORY 0x00000010 -#define WT_CONN_LAS_OPEN 0x00000020 -#define WT_CONN_LEAK_MEMORY 0x00000040 -#define WT_CONN_LOG_SERVER_RUN 0x00000080 +#define WT_CONN_CLOSING_NO_MORE_OPENS 0x00000008 +#define WT_CONN_EVICTION_RUN 0x00000010 +#define WT_CONN_IN_MEMORY 0x00000020 +#define WT_CONN_LAS_OPEN 0x00000040 +#define WT_CONN_LEAK_MEMORY 0x00000080 #define WT_CONN_LSM_MERGE 0x00000100 #define WT_CONN_PANIC 0x00000200 #define WT_CONN_READONLY 0x00000400 #define WT_CONN_RECOVERING 0x00000800 #define WT_CONN_SERVER_ASYNC 0x00001000 #define WT_CONN_SERVER_CHECKPOINT 0x00002000 -#define WT_CONN_SERVER_LSM 0x00004000 -#define WT_CONN_SERVER_RUN 0x00008000 +#define WT_CONN_SERVER_LOG 0x00004000 +#define WT_CONN_SERVER_LSM 0x00008000 #define WT_CONN_SERVER_STATISTICS 0x00010000 #define WT_CONN_SERVER_SWEEP 0x00020000 #define WT_CONN_WAS_BACKUP 0x00040000 diff --git a/src/include/lsm.h b/src/include/lsm.h index 2bbb813bad2..e3f6897ef9d 100644 --- a/src/include/lsm.h +++ b/src/include/lsm.h @@ -23,11 +23,14 @@ struct __wt_lsm_worker_cookie { struct __wt_lsm_worker_args { WT_SESSION_IMPL *session; /* Session */ WT_CONDVAR *work_cond; /* Owned by the manager */ + wt_thread_t tid; /* Thread id */ + bool tid_set; /* Thread id set */ + u_int id; /* My manager slot id */ uint32_t type; /* Types of operations handled */ -#define WT_LSM_WORKER_RUN 0x01 - uint32_t flags; /* Worker flags */ + + volatile bool running; /* Worker is running */ }; /* @@ -162,6 +165,9 @@ struct __wt_lsm_manager { #define WT_LSM_MAX_WORKERS 20 #define WT_LSM_MIN_WORKERS 3 WT_LSM_WORKER_ARGS lsm_worker_cookies[WT_LSM_MAX_WORKERS]; + +#define WT_LSM_MANAGER_SHUTDOWN 0x01 /* Manager has shut down */ + uint32_t flags; }; /* diff --git a/src/lsm/lsm_manager.c b/src/lsm/lsm_manager.c index 6dc06146179..e33e119aa41 100644 --- a/src/lsm/lsm_manager.c +++ b/src/lsm/lsm_manager.c @@ -89,7 +89,6 @@ __lsm_general_worker_start(WT_SESSION_IMPL *session) if (manager->lsm_workers % 2 == 0) FLD_SET(worker_args->type, WT_LSM_WORK_MERGE); } - F_SET(worker_args, WT_LSM_WORKER_RUN); WT_RET(__wt_lsm_worker_start(session, worker_args)); } @@ -129,17 +128,13 @@ __lsm_stop_workers(WT_SESSION_IMPL *session) manager->lsm_workers--) { worker_args = &manager->lsm_worker_cookies[manager->lsm_workers - 1]; - /* - * Clear this worker's flag so it stops. - */ - F_CLR(worker_args, WT_LSM_WORKER_RUN); - WT_ASSERT(session, worker_args->tid != 0); - WT_RET(__wt_thread_join(session, worker_args->tid)); - worker_args->tid = 0; + WT_ASSERT(session, worker_args->tid_set); + + WT_RET(__wt_lsm_worker_stop(session, worker_args)); worker_args->type = 0; - worker_args->flags = 0; + /* - * We do not clear the session because they are allocated + * We do not clear the other fields because they are allocated * statically when the connection was opened. */ } @@ -237,12 +232,12 @@ __wt_lsm_manager_start(WT_SESSION_IMPL *session) manager->lsm_worker_cookies[i].session = worker_session; } + F_SET(conn, WT_CONN_SERVER_LSM); + /* Start the LSM manager thread. */ WT_ERR(__wt_thread_create(session, &manager->lsm_worker_cookies[0].tid, __lsm_worker_manager, &manager->lsm_worker_cookies[0])); - F_SET(conn, WT_CONN_SERVER_LSM); - if (0) { err: for (i = 0; (worker_session = @@ -289,13 +284,18 @@ __wt_lsm_manager_destroy(WT_SESSION_IMPL *session) manager = &conn->lsm_manager; removed = 0; + /* + * Clear the LSM server flag and flush to ensure running threads see + * the state change. + */ + F_CLR(conn, WT_CONN_SERVER_LSM); + WT_FULL_BARRIER(); + WT_ASSERT(session, !F_ISSET(conn, WT_CONN_READONLY) || manager->lsm_workers == 0); if (manager->lsm_workers > 0) { - /* - * Stop the main LSM manager thread first. - */ - while (F_ISSET(conn, WT_CONN_SERVER_LSM)) + /* Wait for the main LSM manager thread to finish. */ + while (!F_ISSET(manager, WT_LSM_MANAGER_SHUTDOWN)) __wt_yield(); /* Clean up open LSM handles. */ @@ -303,7 +303,6 @@ __wt_lsm_manager_destroy(WT_SESSION_IMPL *session) WT_TRET(__wt_thread_join( session, manager->lsm_worker_cookies[0].tid)); - manager->lsm_worker_cookies[0].tid = 0; /* Release memory from any operations left on the queue. */ while ((current = TAILQ_FIRST(&manager->switchqh)) != NULL) { @@ -342,7 +341,7 @@ __wt_lsm_manager_destroy(WT_SESSION_IMPL *session) /* * __lsm_manager_worker_shutdown -- - * Shutdown the LSM manager and worker threads. + * Shutdown the LSM worker threads. */ static int __lsm_manager_worker_shutdown(WT_SESSION_IMPL *session) @@ -354,14 +353,13 @@ __lsm_manager_worker_shutdown(WT_SESSION_IMPL *session) manager = &S2C(session)->lsm_manager; /* - * Wait for the rest of the LSM workers to shutdown. Stop at index + * Wait for the rest of the LSM workers to shutdown. Start at index * one - since we (the manager) are at index 0. */ for (i = 1; i < manager->lsm_workers; i++) { - WT_ASSERT(session, manager->lsm_worker_cookies[i].tid != 0); - __wt_cond_signal(session, manager->work_cond); - WT_TRET(__wt_thread_join( - session, manager->lsm_worker_cookies[i].tid)); + WT_ASSERT(session, manager->lsm_worker_cookies[i].tid_set); + WT_TRET(__wt_lsm_worker_stop( + session, &manager->lsm_worker_cookies[i])); } return (ret); } @@ -383,7 +381,7 @@ __lsm_manager_run_server(WT_SESSION_IMPL *session) conn = S2C(session); dhandle_locked = false; - while (F_ISSET(conn, WT_CONN_SERVER_RUN)) { + while (F_ISSET(conn, WT_CONN_SERVER_LSM)) { __wt_sleep(0, 10000); if (TAILQ_EMPTY(&conn->lsmqh)) continue; @@ -469,11 +467,13 @@ static WT_THREAD_RET __lsm_worker_manager(void *arg) { WT_DECL_RET; + WT_LSM_MANAGER *manager; WT_LSM_WORKER_ARGS *cookie; WT_SESSION_IMPL *session; cookie = (WT_LSM_WORKER_ARGS *)arg; session = cookie->session; + manager = &S2C(session)->lsm_manager; WT_ERR(__lsm_general_worker_start(session)); WT_ERR(__lsm_manager_run_server(session)); @@ -482,7 +482,11 @@ __lsm_worker_manager(void *arg) if (ret != 0) { err: WT_PANIC_MSG(session, ret, "LSM worker manager thread error"); } - F_CLR(S2C(session), WT_CONN_SERVER_LSM); + + /* Connection close waits on us to shutdown, let it know we're done. */ + F_SET(manager, WT_LSM_MANAGER_SHUTDOWN); + WT_FULL_BARRIER(); + return (WT_THREAD_RET_VALUE); } diff --git a/src/lsm/lsm_worker.c b/src/lsm/lsm_worker.c index ffa00c0a5e7..1cabbd4888d 100644 --- a/src/lsm/lsm_worker.c +++ b/src/lsm/lsm_worker.c @@ -21,7 +21,23 @@ __wt_lsm_worker_start(WT_SESSION_IMPL *session, WT_LSM_WORKER_ARGS *args) { __wt_verbose(session, WT_VERB_LSM_MANAGER, "Start LSM worker %u type %#" PRIx32, args->id, args->type); - return (__wt_thread_create(session, &args->tid, __lsm_worker, args)); + + args->running = true; + WT_RET(__wt_thread_create(session, &args->tid, __lsm_worker, args)); + args->tid_set = true; + return (0); +} + +/* + * __wt_lsm_worker_stop -- + * A wrapper around the LSM worker thread stop. + */ +int +__wt_lsm_worker_stop(WT_SESSION_IMPL *session, WT_LSM_WORKER_ARGS *args) +{ + args->running = false; + args->tid_set = false; + return (__wt_thread_join(session, args->tid)); } /* @@ -84,7 +100,6 @@ err: __wt_lsm_manager_free_work_unit(session, entry); static WT_THREAD_RET __lsm_worker(void *arg) { - WT_CONNECTION_IMPL *conn; WT_DECL_RET; WT_LSM_WORK_UNIT *entry; WT_LSM_WORKER_ARGS *cookie; @@ -93,11 +108,9 @@ __lsm_worker(void *arg) cookie = (WT_LSM_WORKER_ARGS *)arg; session = cookie->session; - conn = S2C(session); entry = NULL; - while (F_ISSET(conn, WT_CONN_SERVER_RUN) && - F_ISSET(cookie, WT_LSM_WORKER_RUN)) { + while (cookie->running) { progress = false; /* diff --git a/src/os_posix/os_thread.c b/src/os_posix/os_thread.c index 85d43f10a33..18e4c347436 100644 --- a/src/os_posix/os_thread.c +++ b/src/os_posix/os_thread.c @@ -18,6 +18,13 @@ __wt_thread_create(WT_SESSION_IMPL *session, { WT_DECL_RET; + /* + * Creating a thread isn't a memory barrier, but WiredTiger commonly + * sets flags and or state and then expects worker threads to start. + * Include a barrier to ensure safety in those cases. + */ + WT_FULL_BARRIER(); + /* Spawn a new thread of control. */ WT_SYSCALL_RETRY(pthread_create(tidret, NULL, func, arg), ret); if (ret == 0) @@ -34,6 +41,13 @@ __wt_thread_join(WT_SESSION_IMPL *session, wt_thread_t tid) { WT_DECL_RET; + /* + * Joining a thread isn't a memory barrier, but WiredTiger commonly + * sets flags and or state and then expects worker threads to halt. + * Include a barrier to ensure safety in those cases. + */ + WT_FULL_BARRIER(); + WT_SYSCALL(pthread_join(tid, NULL), ret); if (ret == 0) return (0); diff --git a/src/os_win/os_thread.c b/src/os_win/os_thread.c index 7442fb08a36..4c8f212bb4f 100644 --- a/src/os_win/os_thread.c +++ b/src/os_win/os_thread.c @@ -16,6 +16,13 @@ int __wt_thread_create(WT_SESSION_IMPL *session, wt_thread_t *tidret, WT_THREAD_CALLBACK(*func)(void *), void *arg) { + /* + * Creating a thread isn't a memory barrier, but WiredTiger commonly + * sets flags and or state and then expects worker threads to start. + * Include a barrier to ensure safety in those cases. + */ + WT_FULL_BARRIER(); + /* Spawn a new thread of control. */ *tidret = (HANDLE)_beginthreadex(NULL, 0, func, arg, 0, NULL); if (*tidret != 0) @@ -33,6 +40,13 @@ __wt_thread_join(WT_SESSION_IMPL *session, wt_thread_t tid) { DWORD windows_error; + /* + * Joining a thread isn't a memory barrier, but WiredTiger commonly + * sets flags and or state and then expects worker threads to halt. + * Include a barrier to ensure safety in those cases. + */ + WT_FULL_BARRIER(); + if ((windows_error = WaitForSingleObject(tid, INFINITE)) != WAIT_OBJECT_0) { if (windows_error == WAIT_FAILED) diff --git a/src/session/session_api.c b/src/session/session_api.c index 51233e5e224..b7daf0e2e02 100644 --- a/src/session/session_api.c +++ b/src/session/session_api.c @@ -1502,7 +1502,7 @@ __transaction_sync_run_chk(WT_SESSION_IMPL *session) conn = S2C(session); - return (FLD_ISSET(conn->flags, WT_CONN_LOG_SERVER_RUN)); + return (FLD_ISSET(conn->flags, WT_CONN_SERVER_LOG)); } /* @@ -1812,7 +1812,7 @@ __open_session(WT_CONNECTION_IMPL *conn, * closes the connection. This is particularly intended to catch * cases where server threads open sessions. */ - WT_ASSERT(session, F_ISSET(conn, WT_CONN_SERVER_RUN)); + WT_ASSERT(session, !F_ISSET(conn, WT_CONN_CLOSING)); /* Find the first inactive session slot. */ for (session_ret = conn->sessions, |