summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dist/flags.py4
-rw-r--r--src/conn/conn_dhandle.c3
-rw-r--r--src/conn/conn_log.c12
-rw-r--r--src/conn/conn_open.c19
-rw-r--r--src/include/extern.h1
-rw-r--r--src/include/flags.h14
-rw-r--r--src/include/lsm.h10
-rw-r--r--src/lsm/lsm_manager.c54
-rw-r--r--src/lsm/lsm_worker.c23
-rw-r--r--src/os_posix/os_thread.c14
-rw-r--r--src/os_win/os_thread.c14
-rw-r--r--src/session/session_api.c4
12 files changed, 114 insertions, 58 deletions
diff --git a/dist/flags.py b/dist/flags.py
index b20a7181532..64b5d789e72 100644
--- a/dist/flags.py
+++ b/dist/flags.py
@@ -96,19 +96,19 @@ flags = {
'CONN_CACHE_POOL',
'CONN_CKPT_SYNC',
'CONN_CLOSING',
+ 'CONN_CLOSING_NO_MORE_OPENS',
'CONN_EVICTION_RUN',
'CONN_IN_MEMORY',
'CONN_LAS_OPEN',
'CONN_LEAK_MEMORY',
- 'CONN_LOG_SERVER_RUN',
'CONN_LSM_MERGE',
'CONN_PANIC',
'CONN_READONLY',
'CONN_RECOVERING',
'CONN_SERVER_ASYNC',
'CONN_SERVER_CHECKPOINT',
+ 'CONN_SERVER_LOG',
'CONN_SERVER_LSM',
- 'CONN_SERVER_RUN',
'CONN_SERVER_STATISTICS',
'CONN_SERVER_SWEEP',
'CONN_WAS_BACKUP',
diff --git a/src/conn/conn_dhandle.c b/src/conn/conn_dhandle.c
index c5480897494..657cdebf7ee 100644
--- a/src/conn/conn_dhandle.c
+++ b/src/conn/conn_dhandle.c
@@ -314,7 +314,8 @@ __wt_conn_btree_open(
F_ISSET(dhandle, WT_DHANDLE_EXCLUSIVE) &&
!LF_ISSET(WT_DHANDLE_LOCK_ONLY));
- WT_ASSERT(session, !F_ISSET(S2C(session), WT_CONN_CLOSING));
+ WT_ASSERT(session,
+ !F_ISSET(S2C(session), WT_CONN_CLOSING_NO_MORE_OPENS));
/*
* If the handle is already open, it has to be closed so it can be
diff --git a/src/conn/conn_log.c b/src/conn/conn_log.c
index c6dd795389d..b8b5bd2a908 100644
--- a/src/conn/conn_log.c
+++ b/src/conn/conn_log.c
@@ -341,7 +341,7 @@ __wt_log_truncate_files(
conn = S2C(session);
if (!FLD_ISSET(conn->log_flags, WT_CONN_LOG_ENABLED))
return (0);
- if (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN) &&
+ if (F_ISSET(conn, WT_CONN_SERVER_LOG) &&
FLD_ISSET(conn->log_flags, WT_CONN_LOG_ARCHIVE))
WT_RET_MSG(session, EINVAL,
"Attempt to archive manually while a server is running");
@@ -382,7 +382,7 @@ __log_file_server(void *arg)
conn = S2C(session);
log = conn->log;
locked = false;
- while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) {
+ while (F_ISSET(conn, WT_CONN_SERVER_LOG)) {
/*
* If there is a log file to close, make sure any outstanding
* write operations have completed, then fsync and close it.
@@ -708,7 +708,7 @@ __log_wrlsn_server(void *arg)
log = conn->log;
yield = 0;
WT_INIT_LSN(&prev);
- while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) {
+ while (F_ISSET(conn, WT_CONN_SERVER_LOG)) {
/*
* Write out any log record buffers if anything was done
* since last time. Only call the function to walk the
@@ -783,7 +783,7 @@ __log_server(void *arg)
* takes to sync out an earlier file.
*/
did_work = true;
- while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) {
+ while (F_ISSET(conn, WT_CONN_SERVER_LOG)) {
/*
* Slots depend on future activity. Force out buffered
* writes in case we are idle. This cannot be part of the
@@ -923,7 +923,7 @@ __wt_logmgr_open(WT_SESSION_IMPL *session)
if (!FLD_ISSET(conn->log_flags, WT_CONN_LOG_ENABLED))
return (0);
- F_SET(conn, WT_CONN_LOG_SERVER_RUN);
+ F_SET(conn, WT_CONN_SERVER_LOG);
/*
* Start the log close thread. It is not configurable.
@@ -995,7 +995,7 @@ __wt_logmgr_destroy(WT_SESSION_IMPL *session)
conn = S2C(session);
- F_CLR(conn, WT_CONN_LOG_SERVER_RUN);
+ F_CLR(conn, WT_CONN_SERVER_LOG);
if (!FLD_ISSET(conn->log_flags, WT_CONN_LOG_ENABLED)) {
/*
diff --git a/src/conn/conn_open.c b/src/conn/conn_open.c
index 5b20377d437..eb3c79422a0 100644
--- a/src/conn/conn_open.c
+++ b/src/conn/conn_open.c
@@ -21,12 +21,6 @@ __wt_connection_open(WT_CONNECTION_IMPL *conn, const char *cfg[])
session = conn->default_session;
WT_ASSERT(session, session->iface.connection == &conn->iface);
- /*
- * Tell internal server threads to run: this must be set before opening
- * any sessions.
- */
- F_SET(conn, WT_CONN_SERVER_RUN);
-
/* WT_SESSION_IMPL array. */
WT_RET(__wt_calloc(session,
conn->session_size, sizeof(WT_SESSION_IMPL), &conn->sessions));
@@ -100,6 +94,10 @@ __wt_connection_close(WT_CONNECTION_IMPL *conn)
__wt_yield();
}
+ /* Shut down the subsystems, ensuring workers see the state change. */
+ F_SET(conn, WT_CONN_CLOSING);
+ WT_FULL_BARRIER();
+
/*
* Clear any pending async operations and shut down the async worker
* threads and system before closing LSM.
@@ -113,10 +111,15 @@ __wt_connection_close(WT_CONNECTION_IMPL *conn)
* btree handles, so take care in ordering shutdown to make sure they
* exit before files are closed.
*/
- F_CLR(conn, WT_CONN_SERVER_RUN);
WT_TRET(__wt_lsm_manager_destroy(session));
- F_SET(conn, WT_CONN_CLOSING);
+ /*
+ * Once the async and LSM threads exit, we shouldn't be opening any
+ * more files.
+ */
+ F_SET(conn, WT_CONN_CLOSING_NO_MORE_OPENS);
+ WT_FULL_BARRIER();
+
WT_TRET(__wt_checkpoint_server_destroy(session));
WT_TRET(__wt_statlog_destroy(session, true));
WT_TRET(__wt_sweep_destroy(session));
diff --git a/src/include/extern.h b/src/include/extern.h
index 2759ac1dec3..47b4e03a7b7 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -458,6 +458,7 @@ extern int __wt_lsm_work_bloom(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree)
extern int __wt_lsm_checkpoint_chunk(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree, WT_LSM_CHUNK *chunk) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden")));
extern int __wt_lsm_free_chunks(WT_SESSION_IMPL *session, WT_LSM_TREE *lsm_tree) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden")));
extern int __wt_lsm_worker_start(WT_SESSION_IMPL *session, WT_LSM_WORKER_ARGS *args) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden")));
+extern int __wt_lsm_worker_stop(WT_SESSION_IMPL *session, WT_LSM_WORKER_ARGS *args) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden")));
extern int __wt_meta_apply_all(WT_SESSION_IMPL *session, int (*file_func)(WT_SESSION_IMPL *, const char *[]), int (*name_func)(WT_SESSION_IMPL *, const char *, bool *), const char *cfg[]) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden")));
extern int __wt_meta_checkpoint(WT_SESSION_IMPL *session, const char *fname, const char *checkpoint, WT_CKPT *ckpt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden")));
extern int __wt_meta_checkpoint_last_name( WT_SESSION_IMPL *session, const char *fname, const char **namep) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)) WT_GCC_FUNC_DECL_ATTRIBUTE((visibility("hidden")));
diff --git a/src/include/flags.h b/src/include/flags.h
index c1fff920e3b..f26a45c68f5 100644
--- a/src/include/flags.h
+++ b/src/include/flags.h
@@ -6,19 +6,19 @@
#define WT_CONN_CACHE_POOL 0x00000001
#define WT_CONN_CKPT_SYNC 0x00000002
#define WT_CONN_CLOSING 0x00000004
-#define WT_CONN_EVICTION_RUN 0x00000008
-#define WT_CONN_IN_MEMORY 0x00000010
-#define WT_CONN_LAS_OPEN 0x00000020
-#define WT_CONN_LEAK_MEMORY 0x00000040
-#define WT_CONN_LOG_SERVER_RUN 0x00000080
+#define WT_CONN_CLOSING_NO_MORE_OPENS 0x00000008
+#define WT_CONN_EVICTION_RUN 0x00000010
+#define WT_CONN_IN_MEMORY 0x00000020
+#define WT_CONN_LAS_OPEN 0x00000040
+#define WT_CONN_LEAK_MEMORY 0x00000080
#define WT_CONN_LSM_MERGE 0x00000100
#define WT_CONN_PANIC 0x00000200
#define WT_CONN_READONLY 0x00000400
#define WT_CONN_RECOVERING 0x00000800
#define WT_CONN_SERVER_ASYNC 0x00001000
#define WT_CONN_SERVER_CHECKPOINT 0x00002000
-#define WT_CONN_SERVER_LSM 0x00004000
-#define WT_CONN_SERVER_RUN 0x00008000
+#define WT_CONN_SERVER_LOG 0x00004000
+#define WT_CONN_SERVER_LSM 0x00008000
#define WT_CONN_SERVER_STATISTICS 0x00010000
#define WT_CONN_SERVER_SWEEP 0x00020000
#define WT_CONN_WAS_BACKUP 0x00040000
diff --git a/src/include/lsm.h b/src/include/lsm.h
index 2bbb813bad2..e3f6897ef9d 100644
--- a/src/include/lsm.h
+++ b/src/include/lsm.h
@@ -23,11 +23,14 @@ struct __wt_lsm_worker_cookie {
struct __wt_lsm_worker_args {
WT_SESSION_IMPL *session; /* Session */
WT_CONDVAR *work_cond; /* Owned by the manager */
+
wt_thread_t tid; /* Thread id */
+ bool tid_set; /* Thread id set */
+
u_int id; /* My manager slot id */
uint32_t type; /* Types of operations handled */
-#define WT_LSM_WORKER_RUN 0x01
- uint32_t flags; /* Worker flags */
+
+ volatile bool running; /* Worker is running */
};
/*
@@ -162,6 +165,9 @@ struct __wt_lsm_manager {
#define WT_LSM_MAX_WORKERS 20
#define WT_LSM_MIN_WORKERS 3
WT_LSM_WORKER_ARGS lsm_worker_cookies[WT_LSM_MAX_WORKERS];
+
+#define WT_LSM_MANAGER_SHUTDOWN 0x01 /* Manager has shut down */
+ uint32_t flags;
};
/*
diff --git a/src/lsm/lsm_manager.c b/src/lsm/lsm_manager.c
index 6dc06146179..e33e119aa41 100644
--- a/src/lsm/lsm_manager.c
+++ b/src/lsm/lsm_manager.c
@@ -89,7 +89,6 @@ __lsm_general_worker_start(WT_SESSION_IMPL *session)
if (manager->lsm_workers % 2 == 0)
FLD_SET(worker_args->type, WT_LSM_WORK_MERGE);
}
- F_SET(worker_args, WT_LSM_WORKER_RUN);
WT_RET(__wt_lsm_worker_start(session, worker_args));
}
@@ -129,17 +128,13 @@ __lsm_stop_workers(WT_SESSION_IMPL *session)
manager->lsm_workers--) {
worker_args =
&manager->lsm_worker_cookies[manager->lsm_workers - 1];
- /*
- * Clear this worker's flag so it stops.
- */
- F_CLR(worker_args, WT_LSM_WORKER_RUN);
- WT_ASSERT(session, worker_args->tid != 0);
- WT_RET(__wt_thread_join(session, worker_args->tid));
- worker_args->tid = 0;
+ WT_ASSERT(session, worker_args->tid_set);
+
+ WT_RET(__wt_lsm_worker_stop(session, worker_args));
worker_args->type = 0;
- worker_args->flags = 0;
+
/*
- * We do not clear the session because they are allocated
+ * We do not clear the other fields because they are allocated
* statically when the connection was opened.
*/
}
@@ -237,12 +232,12 @@ __wt_lsm_manager_start(WT_SESSION_IMPL *session)
manager->lsm_worker_cookies[i].session = worker_session;
}
+ F_SET(conn, WT_CONN_SERVER_LSM);
+
/* Start the LSM manager thread. */
WT_ERR(__wt_thread_create(session, &manager->lsm_worker_cookies[0].tid,
__lsm_worker_manager, &manager->lsm_worker_cookies[0]));
- F_SET(conn, WT_CONN_SERVER_LSM);
-
if (0) {
err: for (i = 0;
(worker_session =
@@ -289,13 +284,18 @@ __wt_lsm_manager_destroy(WT_SESSION_IMPL *session)
manager = &conn->lsm_manager;
removed = 0;
+ /*
+ * Clear the LSM server flag and flush to ensure running threads see
+ * the state change.
+ */
+ F_CLR(conn, WT_CONN_SERVER_LSM);
+ WT_FULL_BARRIER();
+
WT_ASSERT(session, !F_ISSET(conn, WT_CONN_READONLY) ||
manager->lsm_workers == 0);
if (manager->lsm_workers > 0) {
- /*
- * Stop the main LSM manager thread first.
- */
- while (F_ISSET(conn, WT_CONN_SERVER_LSM))
+ /* Wait for the main LSM manager thread to finish. */
+ while (!F_ISSET(manager, WT_LSM_MANAGER_SHUTDOWN))
__wt_yield();
/* Clean up open LSM handles. */
@@ -303,7 +303,6 @@ __wt_lsm_manager_destroy(WT_SESSION_IMPL *session)
WT_TRET(__wt_thread_join(
session, manager->lsm_worker_cookies[0].tid));
- manager->lsm_worker_cookies[0].tid = 0;
/* Release memory from any operations left on the queue. */
while ((current = TAILQ_FIRST(&manager->switchqh)) != NULL) {
@@ -342,7 +341,7 @@ __wt_lsm_manager_destroy(WT_SESSION_IMPL *session)
/*
* __lsm_manager_worker_shutdown --
- * Shutdown the LSM manager and worker threads.
+ * Shutdown the LSM worker threads.
*/
static int
__lsm_manager_worker_shutdown(WT_SESSION_IMPL *session)
@@ -354,14 +353,13 @@ __lsm_manager_worker_shutdown(WT_SESSION_IMPL *session)
manager = &S2C(session)->lsm_manager;
/*
- * Wait for the rest of the LSM workers to shutdown. Stop at index
+ * Wait for the rest of the LSM workers to shutdown. Start at index
* one - since we (the manager) are at index 0.
*/
for (i = 1; i < manager->lsm_workers; i++) {
- WT_ASSERT(session, manager->lsm_worker_cookies[i].tid != 0);
- __wt_cond_signal(session, manager->work_cond);
- WT_TRET(__wt_thread_join(
- session, manager->lsm_worker_cookies[i].tid));
+ WT_ASSERT(session, manager->lsm_worker_cookies[i].tid_set);
+ WT_TRET(__wt_lsm_worker_stop(
+ session, &manager->lsm_worker_cookies[i]));
}
return (ret);
}
@@ -383,7 +381,7 @@ __lsm_manager_run_server(WT_SESSION_IMPL *session)
conn = S2C(session);
dhandle_locked = false;
- while (F_ISSET(conn, WT_CONN_SERVER_RUN)) {
+ while (F_ISSET(conn, WT_CONN_SERVER_LSM)) {
__wt_sleep(0, 10000);
if (TAILQ_EMPTY(&conn->lsmqh))
continue;
@@ -469,11 +467,13 @@ static WT_THREAD_RET
__lsm_worker_manager(void *arg)
{
WT_DECL_RET;
+ WT_LSM_MANAGER *manager;
WT_LSM_WORKER_ARGS *cookie;
WT_SESSION_IMPL *session;
cookie = (WT_LSM_WORKER_ARGS *)arg;
session = cookie->session;
+ manager = &S2C(session)->lsm_manager;
WT_ERR(__lsm_general_worker_start(session));
WT_ERR(__lsm_manager_run_server(session));
@@ -482,7 +482,11 @@ __lsm_worker_manager(void *arg)
if (ret != 0) {
err: WT_PANIC_MSG(session, ret, "LSM worker manager thread error");
}
- F_CLR(S2C(session), WT_CONN_SERVER_LSM);
+
+ /* Connection close waits on us to shutdown, let it know we're done. */
+ F_SET(manager, WT_LSM_MANAGER_SHUTDOWN);
+ WT_FULL_BARRIER();
+
return (WT_THREAD_RET_VALUE);
}
diff --git a/src/lsm/lsm_worker.c b/src/lsm/lsm_worker.c
index ffa00c0a5e7..1cabbd4888d 100644
--- a/src/lsm/lsm_worker.c
+++ b/src/lsm/lsm_worker.c
@@ -21,7 +21,23 @@ __wt_lsm_worker_start(WT_SESSION_IMPL *session, WT_LSM_WORKER_ARGS *args)
{
__wt_verbose(session, WT_VERB_LSM_MANAGER,
"Start LSM worker %u type %#" PRIx32, args->id, args->type);
- return (__wt_thread_create(session, &args->tid, __lsm_worker, args));
+
+ args->running = true;
+ WT_RET(__wt_thread_create(session, &args->tid, __lsm_worker, args));
+ args->tid_set = true;
+ return (0);
+}
+
+/*
+ * __wt_lsm_worker_stop --
+ * A wrapper around the LSM worker thread stop.
+ */
+int
+__wt_lsm_worker_stop(WT_SESSION_IMPL *session, WT_LSM_WORKER_ARGS *args)
+{
+ args->running = false;
+ args->tid_set = false;
+ return (__wt_thread_join(session, args->tid));
}
/*
@@ -84,7 +100,6 @@ err: __wt_lsm_manager_free_work_unit(session, entry);
static WT_THREAD_RET
__lsm_worker(void *arg)
{
- WT_CONNECTION_IMPL *conn;
WT_DECL_RET;
WT_LSM_WORK_UNIT *entry;
WT_LSM_WORKER_ARGS *cookie;
@@ -93,11 +108,9 @@ __lsm_worker(void *arg)
cookie = (WT_LSM_WORKER_ARGS *)arg;
session = cookie->session;
- conn = S2C(session);
entry = NULL;
- while (F_ISSET(conn, WT_CONN_SERVER_RUN) &&
- F_ISSET(cookie, WT_LSM_WORKER_RUN)) {
+ while (cookie->running) {
progress = false;
/*
diff --git a/src/os_posix/os_thread.c b/src/os_posix/os_thread.c
index 85d43f10a33..18e4c347436 100644
--- a/src/os_posix/os_thread.c
+++ b/src/os_posix/os_thread.c
@@ -18,6 +18,13 @@ __wt_thread_create(WT_SESSION_IMPL *session,
{
WT_DECL_RET;
+ /*
+ * Creating a thread isn't a memory barrier, but WiredTiger commonly
+ * sets flags and or state and then expects worker threads to start.
+ * Include a barrier to ensure safety in those cases.
+ */
+ WT_FULL_BARRIER();
+
/* Spawn a new thread of control. */
WT_SYSCALL_RETRY(pthread_create(tidret, NULL, func, arg), ret);
if (ret == 0)
@@ -34,6 +41,13 @@ __wt_thread_join(WT_SESSION_IMPL *session, wt_thread_t tid)
{
WT_DECL_RET;
+ /*
+ * Joining a thread isn't a memory barrier, but WiredTiger commonly
+ * sets flags and or state and then expects worker threads to halt.
+ * Include a barrier to ensure safety in those cases.
+ */
+ WT_FULL_BARRIER();
+
WT_SYSCALL(pthread_join(tid, NULL), ret);
if (ret == 0)
return (0);
diff --git a/src/os_win/os_thread.c b/src/os_win/os_thread.c
index 7442fb08a36..4c8f212bb4f 100644
--- a/src/os_win/os_thread.c
+++ b/src/os_win/os_thread.c
@@ -16,6 +16,13 @@ int
__wt_thread_create(WT_SESSION_IMPL *session,
wt_thread_t *tidret, WT_THREAD_CALLBACK(*func)(void *), void *arg)
{
+ /*
+ * Creating a thread isn't a memory barrier, but WiredTiger commonly
+ * sets flags and or state and then expects worker threads to start.
+ * Include a barrier to ensure safety in those cases.
+ */
+ WT_FULL_BARRIER();
+
/* Spawn a new thread of control. */
*tidret = (HANDLE)_beginthreadex(NULL, 0, func, arg, 0, NULL);
if (*tidret != 0)
@@ -33,6 +40,13 @@ __wt_thread_join(WT_SESSION_IMPL *session, wt_thread_t tid)
{
DWORD windows_error;
+ /*
+ * Joining a thread isn't a memory barrier, but WiredTiger commonly
+ * sets flags and or state and then expects worker threads to halt.
+ * Include a barrier to ensure safety in those cases.
+ */
+ WT_FULL_BARRIER();
+
if ((windows_error =
WaitForSingleObject(tid, INFINITE)) != WAIT_OBJECT_0) {
if (windows_error == WAIT_FAILED)
diff --git a/src/session/session_api.c b/src/session/session_api.c
index 51233e5e224..b7daf0e2e02 100644
--- a/src/session/session_api.c
+++ b/src/session/session_api.c
@@ -1502,7 +1502,7 @@ __transaction_sync_run_chk(WT_SESSION_IMPL *session)
conn = S2C(session);
- return (FLD_ISSET(conn->flags, WT_CONN_LOG_SERVER_RUN));
+ return (FLD_ISSET(conn->flags, WT_CONN_SERVER_LOG));
}
/*
@@ -1812,7 +1812,7 @@ __open_session(WT_CONNECTION_IMPL *conn,
* closes the connection. This is particularly intended to catch
* cases where server threads open sessions.
*/
- WT_ASSERT(session, F_ISSET(conn, WT_CONN_SERVER_RUN));
+ WT_ASSERT(session, !F_ISSET(conn, WT_CONN_CLOSING));
/* Find the first inactive session slot. */
for (session_ret = conn->sessions,