diff options
Diffstat (limited to 'src/third_party/wiredtiger/src/conn/conn_tiered.c')
-rw-r--r-- | src/third_party/wiredtiger/src/conn/conn_tiered.c | 93 |
1 files changed, 82 insertions, 11 deletions
diff --git a/src/third_party/wiredtiger/src/conn/conn_tiered.c b/src/third_party/wiredtiger/src/conn/conn_tiered.c index ecdf10d34c3..b331875855b 100644 --- a/src/third_party/wiredtiger/src/conn/conn_tiered.c +++ b/src/third_party/wiredtiger/src/conn/conn_tiered.c @@ -20,25 +20,68 @@ #endif /* + * __flush_tier_wait -- + * Wait for all previous work units queued to be processed. + */ +static void +__flush_tier_wait(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + int yield_count; + + conn = S2C(session); + yield_count = 0; + /* + * The internal thread needs the schema lock to perform its operations and flush tier also + * acquires the schema lock. We cannot be waiting in this function while holding that lock or no + * work will get done. + */ + WT_ASSERT(session, !FLD_ISSET(session->lock_flags, WT_SESSION_LOCKED_SCHEMA)); + + /* + * It may be worthwhile looking at the add and decrement values and make choices of whether to + * yield or wait based on how much of the workload has been performed. Flushing operations could + * take a long time so yielding may not be effective. + * + * TODO: We should consider a maximum wait value as a configuration setting. If we add one, then + * this function returns an int and this loop would check how much time we've waited and break + * out with EBUSY. + */ + while (!WT_FLUSH_STATE_DONE(conn->flush_state)) { + if (++yield_count < WT_THOUSAND) + __wt_yield(); + else + __wt_cond_wait(session, conn->flush_cond, 200, NULL); + } +} + +/* * __flush_tier_once -- * Perform one iteration of tiered storage maintenance. */ static int -__flush_tier_once(WT_SESSION_IMPL *session, bool force) +__flush_tier_once(WT_SESSION_IMPL *session, uint32_t flags) { WT_CURSOR *cursor; WT_DECL_RET; const char *key, *value; - WT_UNUSED(force); + WT_UNUSED(flags); __wt_verbose(session, WT_VERB_TIERED, "%s", "FLUSH_TIER_ONCE: Called"); + + cursor = NULL; /* - * - See if there is any "merging" work to do to prepare and create an object that is + * For supporting splits and merge: + * - See if there is any merging work to do to prepare and create an object that is * suitable for placing onto tiered storage. * - Do the work to create said objects. * - Move the objects. */ - cursor = NULL; + S2C(session)->flush_state = 0; + + /* + * XXX: Is it sufficient to walk the metadata cursor? If it is, why doesn't checkpoint do that? + */ WT_RET(__wt_metadata_cursor(session, &cursor)); while (cursor->next(cursor) == 0) { cursor->get_key(cursor, &key); @@ -46,6 +89,7 @@ __flush_tier_once(WT_SESSION_IMPL *session, bool force) /* For now just switch tiers which just does metadata manipulation. */ if (WT_PREFIX_MATCH(key, "tiered:")) { __wt_verbose(session, WT_VERB_TIERED, "FLUSH_TIER_ONCE: %s %s", key, value); + /* Is this instantiating every handle even if it is not opened or in use? */ WT_ERR(__wt_session_get_dhandle(session, key, NULL, NULL, WT_DHANDLE_EXCLUSIVE)); /* * When we call wt_tiered_switch the session->dhandle points to the tiered: entry and @@ -270,13 +314,13 @@ __tier_storage_copy(WT_SESSION_IMPL *session) /* * We are responsible for freeing the work unit when we're done with it. */ - __wt_free(session, entry); + __wt_tiered_work_free(session, entry); entry = NULL; } err: if (entry != NULL) - __wt_free(session, entry); + __wt_tiered_work_free(session, entry); return (ret); } @@ -307,21 +351,39 @@ __wt_flush_tier(WT_SESSION_IMPL *session, const char *config) { WT_CONFIG_ITEM cval; WT_DECL_RET; + uint32_t flags; const char *cfg[3]; - bool force; WT_STAT_CONN_INCR(session, flush_tier); if (FLD_ISSET(S2C(session)->server_flags, WT_CONN_SERVER_TIERED_MGR)) WT_RET_MSG( session, EINVAL, "Cannot call flush_tier when storage manager thread is configured"); + flags = 0; cfg[0] = WT_CONFIG_BASE(session, WT_SESSION_flush_tier); cfg[1] = (char *)config; cfg[2] = NULL; WT_RET(__wt_config_gets(session, cfg, "force", &cval)); - force = cval.val != 0; + if (cval.val) + LF_SET(WT_FLUSH_TIER_FORCE); + WT_RET(__wt_config_gets_def(session, cfg, "sync", 0, &cval)); + if (WT_STRING_MATCH("off", cval.str, cval.len)) + LF_SET(WT_FLUSH_TIER_OFF); + else if (WT_STRING_MATCH("on", cval.str, cval.len)) + LF_SET(WT_FLUSH_TIER_ON); + + /* + * We cannot perform another flush tier until any earlier ones are done. Often threads will wait + * after the flush tier based on the sync setting so this check will be fast. But if sync is + * turned off then any following call must wait and will do so here. We have to wait while not + * holding the schema lock. + */ + __flush_tier_wait(session); + + WT_WITH_SCHEMA_LOCK(session, ret = __flush_tier_once(session, flags)); - WT_WITH_SCHEMA_LOCK(session, ret = __flush_tier_once(session, force)); + if (ret == 0 && LF_ISSET(WT_FLUSH_TIER_ON)) + __flush_tier_wait(session); return (ret); } @@ -471,8 +533,10 @@ __tiered_mgr_server(void *arg) /* * Here is where we do work. Work we expect to do: */ - WT_WITH_SCHEMA_LOCK(session, ret = __flush_tier_once(session, false)); + WT_WITH_SCHEMA_LOCK(session, ret = __flush_tier_once(session, 0)); WT_ERR(ret); + if (ret == 0) + __flush_tier_wait(session); WT_ERR(__tier_storage_remove(session, false)); } @@ -523,6 +587,7 @@ __wt_tiered_storage_create(WT_SESSION_IMPL *session, const char *cfg[]) WT_RET(__tiered_manager_config(session, cfg, &start)); /* Start the internal thread. */ + WT_ERR(__wt_cond_alloc(session, "flush tier", &conn->flush_cond)); WT_ERR(__wt_cond_alloc(session, "storage server", &conn->tiered_cond)); FLD_SET(conn->server_flags, WT_CONN_SERVER_TIERED); @@ -559,14 +624,17 @@ __wt_tiered_storage_destroy(WT_SESSION_IMPL *session) conn = S2C(session); /* Stop the internal server thread. */ + if (conn->flush_cond != NULL) + __wt_cond_signal(session, conn->flush_cond); FLD_CLR(conn->server_flags, WT_CONN_SERVER_TIERED | WT_CONN_SERVER_TIERED_MGR); if (conn->tiered_tid_set) { + WT_ASSERT(session, conn->tiered_cond != NULL); __wt_cond_signal(session, conn->tiered_cond); WT_TRET(__wt_thread_join(session, &conn->tiered_tid)); conn->tiered_tid_set = false; while ((entry = TAILQ_FIRST(&conn->tieredqh)) != NULL) { TAILQ_REMOVE(&conn->tieredqh, entry, q); - __wt_free(session, entry); + __wt_tiered_work_free(session, entry); } } __wt_cond_destroy(session, &conn->tiered_cond); @@ -577,11 +645,14 @@ __wt_tiered_storage_destroy(WT_SESSION_IMPL *session) /* Stop the storage manager thread. */ if (conn->tiered_mgr_tid_set) { + WT_ASSERT(session, conn->tiered_mgr_cond != NULL); __wt_cond_signal(session, conn->tiered_mgr_cond); WT_TRET(__wt_thread_join(session, &conn->tiered_mgr_tid)); conn->tiered_mgr_tid_set = false; } __wt_cond_destroy(session, &conn->tiered_mgr_cond); + /* This condition variable is last because any internal thread could be using it. */ + __wt_cond_destroy(session, &conn->flush_cond); if (conn->tiered_mgr_session != NULL) { WT_TRET(__wt_session_close_internal(conn->tiered_mgr_session)); |