summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/src/conn/conn_tiered.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/src/conn/conn_tiered.c')
-rw-r--r--src/third_party/wiredtiger/src/conn/conn_tiered.c93
1 files changed, 82 insertions, 11 deletions
diff --git a/src/third_party/wiredtiger/src/conn/conn_tiered.c b/src/third_party/wiredtiger/src/conn/conn_tiered.c
index ecdf10d34c3..b331875855b 100644
--- a/src/third_party/wiredtiger/src/conn/conn_tiered.c
+++ b/src/third_party/wiredtiger/src/conn/conn_tiered.c
@@ -20,25 +20,68 @@
#endif
/*
+ * __flush_tier_wait --
+ * Wait for all previous work units queued to be processed.
+ */
+static void
+__flush_tier_wait(WT_SESSION_IMPL *session)
+{
+ WT_CONNECTION_IMPL *conn;
+ int yield_count;
+
+ conn = S2C(session);
+ yield_count = 0;
+ /*
+ * The internal thread needs the schema lock to perform its operations and flush tier also
+ * acquires the schema lock. We cannot be waiting in this function while holding that lock or no
+ * work will get done.
+ */
+ WT_ASSERT(session, !FLD_ISSET(session->lock_flags, WT_SESSION_LOCKED_SCHEMA));
+
+ /*
+ * It may be worthwhile looking at the add and decrement values and make choices of whether to
+ * yield or wait based on how much of the workload has been performed. Flushing operations could
+ * take a long time so yielding may not be effective.
+ *
+ * TODO: We should consider a maximum wait value as a configuration setting. If we add one, then
+ * this function returns an int and this loop would check how much time we've waited and break
+ * out with EBUSY.
+ */
+ while (!WT_FLUSH_STATE_DONE(conn->flush_state)) {
+ if (++yield_count < WT_THOUSAND)
+ __wt_yield();
+ else
+ __wt_cond_wait(session, conn->flush_cond, 200, NULL);
+ }
+}
+
+/*
* __flush_tier_once --
* Perform one iteration of tiered storage maintenance.
*/
static int
-__flush_tier_once(WT_SESSION_IMPL *session, bool force)
+__flush_tier_once(WT_SESSION_IMPL *session, uint32_t flags)
{
WT_CURSOR *cursor;
WT_DECL_RET;
const char *key, *value;
- WT_UNUSED(force);
+ WT_UNUSED(flags);
__wt_verbose(session, WT_VERB_TIERED, "%s", "FLUSH_TIER_ONCE: Called");
+
+ cursor = NULL;
/*
- * - See if there is any "merging" work to do to prepare and create an object that is
+ * For supporting splits and merge:
+ * - See if there is any merging work to do to prepare and create an object that is
* suitable for placing onto tiered storage.
* - Do the work to create said objects.
* - Move the objects.
*/
- cursor = NULL;
+ S2C(session)->flush_state = 0;
+
+ /*
+ * XXX: Is it sufficient to walk the metadata cursor? If it is, why doesn't checkpoint do that?
+ */
WT_RET(__wt_metadata_cursor(session, &cursor));
while (cursor->next(cursor) == 0) {
cursor->get_key(cursor, &key);
@@ -46,6 +89,7 @@ __flush_tier_once(WT_SESSION_IMPL *session, bool force)
/* For now just switch tiers which just does metadata manipulation. */
if (WT_PREFIX_MATCH(key, "tiered:")) {
__wt_verbose(session, WT_VERB_TIERED, "FLUSH_TIER_ONCE: %s %s", key, value);
+ /* Is this instantiating every handle even if it is not opened or in use? */
WT_ERR(__wt_session_get_dhandle(session, key, NULL, NULL, WT_DHANDLE_EXCLUSIVE));
/*
* When we call wt_tiered_switch the session->dhandle points to the tiered: entry and
@@ -270,13 +314,13 @@ __tier_storage_copy(WT_SESSION_IMPL *session)
/*
* We are responsible for freeing the work unit when we're done with it.
*/
- __wt_free(session, entry);
+ __wt_tiered_work_free(session, entry);
entry = NULL;
}
err:
if (entry != NULL)
- __wt_free(session, entry);
+ __wt_tiered_work_free(session, entry);
return (ret);
}
@@ -307,21 +351,39 @@ __wt_flush_tier(WT_SESSION_IMPL *session, const char *config)
{
WT_CONFIG_ITEM cval;
WT_DECL_RET;
+ uint32_t flags;
const char *cfg[3];
- bool force;
WT_STAT_CONN_INCR(session, flush_tier);
if (FLD_ISSET(S2C(session)->server_flags, WT_CONN_SERVER_TIERED_MGR))
WT_RET_MSG(
session, EINVAL, "Cannot call flush_tier when storage manager thread is configured");
+ flags = 0;
cfg[0] = WT_CONFIG_BASE(session, WT_SESSION_flush_tier);
cfg[1] = (char *)config;
cfg[2] = NULL;
WT_RET(__wt_config_gets(session, cfg, "force", &cval));
- force = cval.val != 0;
+ if (cval.val)
+ LF_SET(WT_FLUSH_TIER_FORCE);
+ WT_RET(__wt_config_gets_def(session, cfg, "sync", 0, &cval));
+ if (WT_STRING_MATCH("off", cval.str, cval.len))
+ LF_SET(WT_FLUSH_TIER_OFF);
+ else if (WT_STRING_MATCH("on", cval.str, cval.len))
+ LF_SET(WT_FLUSH_TIER_ON);
+
+ /*
+ * We cannot perform another flush tier until any earlier ones are done. Often threads will wait
+ * after the flush tier based on the sync setting so this check will be fast. But if sync is
+ * turned off then any following call must wait and will do so here. We have to wait while not
+ * holding the schema lock.
+ */
+ __flush_tier_wait(session);
+
+ WT_WITH_SCHEMA_LOCK(session, ret = __flush_tier_once(session, flags));
- WT_WITH_SCHEMA_LOCK(session, ret = __flush_tier_once(session, force));
+ if (ret == 0 && LF_ISSET(WT_FLUSH_TIER_ON))
+ __flush_tier_wait(session);
return (ret);
}
@@ -471,8 +533,10 @@ __tiered_mgr_server(void *arg)
/*
* Here is where we do work. Work we expect to do:
*/
- WT_WITH_SCHEMA_LOCK(session, ret = __flush_tier_once(session, false));
+ WT_WITH_SCHEMA_LOCK(session, ret = __flush_tier_once(session, 0));
WT_ERR(ret);
+ if (ret == 0)
+ __flush_tier_wait(session);
WT_ERR(__tier_storage_remove(session, false));
}
@@ -523,6 +587,7 @@ __wt_tiered_storage_create(WT_SESSION_IMPL *session, const char *cfg[])
WT_RET(__tiered_manager_config(session, cfg, &start));
/* Start the internal thread. */
+ WT_ERR(__wt_cond_alloc(session, "flush tier", &conn->flush_cond));
WT_ERR(__wt_cond_alloc(session, "storage server", &conn->tiered_cond));
FLD_SET(conn->server_flags, WT_CONN_SERVER_TIERED);
@@ -559,14 +624,17 @@ __wt_tiered_storage_destroy(WT_SESSION_IMPL *session)
conn = S2C(session);
/* Stop the internal server thread. */
+ if (conn->flush_cond != NULL)
+ __wt_cond_signal(session, conn->flush_cond);
FLD_CLR(conn->server_flags, WT_CONN_SERVER_TIERED | WT_CONN_SERVER_TIERED_MGR);
if (conn->tiered_tid_set) {
+ WT_ASSERT(session, conn->tiered_cond != NULL);
__wt_cond_signal(session, conn->tiered_cond);
WT_TRET(__wt_thread_join(session, &conn->tiered_tid));
conn->tiered_tid_set = false;
while ((entry = TAILQ_FIRST(&conn->tieredqh)) != NULL) {
TAILQ_REMOVE(&conn->tieredqh, entry, q);
- __wt_free(session, entry);
+ __wt_tiered_work_free(session, entry);
}
}
__wt_cond_destroy(session, &conn->tiered_cond);
@@ -577,11 +645,14 @@ __wt_tiered_storage_destroy(WT_SESSION_IMPL *session)
/* Stop the storage manager thread. */
if (conn->tiered_mgr_tid_set) {
+ WT_ASSERT(session, conn->tiered_mgr_cond != NULL);
__wt_cond_signal(session, conn->tiered_mgr_cond);
WT_TRET(__wt_thread_join(session, &conn->tiered_mgr_tid));
conn->tiered_mgr_tid_set = false;
}
__wt_cond_destroy(session, &conn->tiered_mgr_cond);
+ /* This condition variable is last because any internal thread could be using it. */
+ __wt_cond_destroy(session, &conn->flush_cond);
if (conn->tiered_mgr_session != NULL) {
WT_TRET(__wt_session_close_internal(conn->tiered_mgr_session));