summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Chen <luke.chen@mongodb.com>2021-05-25 16:05:17 +1000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-25 06:28:10 +0000
commit882833bcbaf88c73e16a46d25ef350238bc4e096 (patch)
treeb76f2b87a5220b37487d6fcb4f38ac57302119d2
parentf7f4835efcb4632d5c12a95608ad9605598986b5 (diff)
downloadmongo-882833bcbaf88c73e16a46d25ef350238bc4e096.tar.gz
Import wiredtiger: f0d322252d4becd1b4d183c0a2d62bb904b5b3a6 from branch mongodb-5.0
ref: 2b482755b5..f0d322252d for: 5.0.0 WT-7498 Implement tiered storage internal thread operations
-rw-r--r--src/third_party/wiredtiger/dist/filelist1
-rw-r--r--src/third_party/wiredtiger/dist/stat_data.py2
-rw-r--r--src/third_party/wiredtiger/import.data2
-rw-r--r--src/third_party/wiredtiger/src/conn/conn_handle.c3
-rw-r--r--src/third_party/wiredtiger/src/conn/conn_tiered.c173
-rw-r--r--src/third_party/wiredtiger/src/include/connection.h12
-rw-r--r--src/third_party/wiredtiger/src/include/extern.h17
-rw-r--r--src/third_party/wiredtiger/src/include/stat.h4
-rw-r--r--src/third_party/wiredtiger/src/include/tiered.h26
-rw-r--r--src/third_party/wiredtiger/src/include/wiredtiger.in60
-rw-r--r--src/third_party/wiredtiger/src/include/wt_internal.h2
-rw-r--r--src/third_party/wiredtiger/src/schema/schema_alter.c15
-rw-r--r--src/third_party/wiredtiger/src/support/err.c5
-rw-r--r--src/third_party/wiredtiger/src/support/stat.c14
-rw-r--r--src/third_party/wiredtiger/src/tiered/tiered_handle.c63
-rw-r--r--src/third_party/wiredtiger/src/tiered/tiered_work.c151
-rwxr-xr-xsrc/third_party/wiredtiger/test/suite/test_tiered04.py15
17 files changed, 470 insertions, 95 deletions
diff --git a/src/third_party/wiredtiger/dist/filelist b/src/third_party/wiredtiger/dist/filelist
index ea101a60e15..5bdbe137fa2 100644
--- a/src/third_party/wiredtiger/dist/filelist
+++ b/src/third_party/wiredtiger/dist/filelist
@@ -212,6 +212,7 @@ src/support/timestamp.c
src/support/update_vector.c
src/tiered/tiered_config.c
src/tiered/tiered_handle.c
+src/tiered/tiered_work.c
src/txn/txn.c
src/txn/txn_ckpt.c
src/txn/txn_ext.c
diff --git a/src/third_party/wiredtiger/dist/stat_data.py b/src/third_party/wiredtiger/dist/stat_data.py
index 89a5578b362..e1bf0ebcc4e 100644
--- a/src/third_party/wiredtiger/dist/stat_data.py
+++ b/src/third_party/wiredtiger/dist/stat_data.py
@@ -865,6 +865,8 @@ conn_dsrc_stats = [
##########################################
StorageStat('tiered_object_size', 'tiered storage object size', 'no_clear,no_scale,size'),
StorageStat('tiered_retention', 'tiered storage local retention time (secs)', 'no_clear,no_scale,size'),
+ StorageStat('tiered_work_units_created', 'tiered operations scheduled'),
+ StorageStat('tiered_work_units_dequeued', 'tiered operations dequeued and processed'),
##########################################
# Transaction statistics
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data
index e400c9ff703..6a9d82ba451 100644
--- a/src/third_party/wiredtiger/import.data
+++ b/src/third_party/wiredtiger/import.data
@@ -2,5 +2,5 @@
"vendor": "wiredtiger",
"github": "wiredtiger/wiredtiger.git",
"branch": "mongodb-5.0",
- "commit": "2b482755b5a91d2c9bc7276069f3693d2d8d8bfb"
+ "commit": "f0d322252d4becd1b4d183c0a2d62bb904b5b3a6"
}
diff --git a/src/third_party/wiredtiger/src/conn/conn_handle.c b/src/third_party/wiredtiger/src/conn/conn_handle.c
index fb670dae9df..1e64aa61846 100644
--- a/src/third_party/wiredtiger/src/conn/conn_handle.c
+++ b/src/third_party/wiredtiger/src/conn/conn_handle.c
@@ -28,6 +28,7 @@ __wt_connection_init(WT_CONNECTION_IMPL *conn)
TAILQ_INIT(&conn->encryptqh); /* Encryptor list */
TAILQ_INIT(&conn->extractorqh); /* Extractor list */
TAILQ_INIT(&conn->storagesrcqh); /* Storage source list */
+ TAILQ_INIT(&conn->tieredqh); /* Tiered work unit list */
TAILQ_INIT(&conn->lsmqh); /* WT_LSM_TREE list */
@@ -54,6 +55,7 @@ __wt_connection_init(WT_CONNECTION_IMPL *conn)
WT_RET(__wt_spin_init(session, &conn->reconfig_lock, "reconfigure"));
WT_SPIN_INIT_SESSION_TRACKED(session, &conn->schema_lock, schema);
WT_RET(__wt_spin_init(session, &conn->storage_lock, "tiered storage"));
+ WT_RET(__wt_spin_init(session, &conn->tiered_lock, "tiered work unit list"));
WT_RET(__wt_spin_init(session, &conn->turtle_lock, "turtle file"));
/* Read-write locks */
@@ -120,6 +122,7 @@ __wt_connection_destroy(WT_CONNECTION_IMPL *conn)
__wt_spin_destroy(session, &conn->schema_lock);
__wt_spin_destroy(session, &conn->storage_lock);
__wt_rwlock_destroy(session, &conn->table_lock);
+ __wt_spin_destroy(session, &conn->tiered_lock);
__wt_spin_destroy(session, &conn->turtle_lock);
/* Free LSM serialization resources. */
diff --git a/src/third_party/wiredtiger/src/conn/conn_tiered.c b/src/third_party/wiredtiger/src/conn/conn_tiered.c
index c2c2b2428d5..25871cce3c0 100644
--- a/src/third_party/wiredtiger/src/conn/conn_tiered.c
+++ b/src/third_party/wiredtiger/src/conn/conn_tiered.c
@@ -120,24 +120,151 @@ err:
}
/*
- * __tier_storage_copy --
- * Perform one iteration of copying newly flushed objects to the shared storage.
+ * __tier_flush_meta --
+ * Perform one iteration of altering the metadata after a flush. This is in its own function so
+ * that we can hold the schema lock while doing the metadata tracking.
*/
static int
-__tier_storage_copy(WT_SESSION_IMPL *session)
+__tier_flush_meta(
+ WT_SESSION_IMPL *session, WT_TIERED *tiered, const char *local_uri, const char *obj_uri)
{
+ WT_DATA_HANDLE *dhandle;
+ WT_DECL_ITEM(buf);
+ WT_DECL_RET;
+ uint64_t now;
+ char *newconfig, *obj_value;
+ const char *cfg[3] = {NULL, NULL, NULL};
+ bool tracking;
+
+ tracking = false;
+ WT_RET(__wt_scr_alloc(session, 512, &buf));
+ dhandle = &tiered->iface;
+
+ newconfig = NULL;
+ WT_ERR(__wt_meta_track_on(session));
+ tracking = true;
+
+ WT_ERR(__wt_session_get_dhandle(session, dhandle->name, NULL, NULL, WT_DHANDLE_EXCLUSIVE));
/*
- * Walk the work queue and copy file:<name> to shared storage object:<name>. Walk a tiered
- * table's tiers array and copy it to any tier that allows WT_TIERS_OP_FLUSH.
+ * Once the flush call succeeds we want to first remove the file: entry from the metadata and
+ * then update the object: metadata to indicate the flush is complete.
*/
- /* XXX: We don't want to call this here, it is just to quiet the compiler that this function
- * can return NULL. So it is a placeholder until we have real content here.
+ WT_ERR(__wt_metadata_remove(session, local_uri));
+ WT_ERR(__wt_metadata_search(session, obj_uri, &obj_value));
+ __wt_seconds(session, &now);
+ WT_ERR(__wt_buf_fmt(session, buf, "flush=%" PRIu64, now));
+ cfg[0] = obj_value;
+ cfg[1] = buf->mem;
+ WT_ERR(__wt_config_collapse(session, cfg, &newconfig));
+ WT_ERR(__wt_metadata_update(session, obj_uri, newconfig));
+ WT_ERR(__wt_meta_track_off(session, true, ret != 0));
+ tracking = false;
+
+err:
+ __wt_free(session, newconfig);
+ WT_TRET(__wt_session_release_dhandle(session));
+ __wt_scr_free(session, &buf);
+ if (tracking)
+ WT_TRET(__wt_meta_track_off(session, true, ret != 0));
+ return (ret);
+}
+
+/*
+ * __wt_tier_do_flush --
+ * Perform one iteration of copying newly flushed objects to the shared storage.
+ */
+int
+__wt_tier_do_flush(
+ WT_SESSION_IMPL *session, WT_TIERED *tiered, const char *local_uri, const char *obj_uri)
+{
+ WT_DECL_RET;
+ WT_FILE_SYSTEM *bucket_fs;
+ WT_STORAGE_SOURCE *storage_source;
+ const char *local_name, *obj_name;
+
+ storage_source = tiered->bstorage->storage_source;
+ bucket_fs = tiered->bstorage->file_system;
+
+ local_name = local_uri;
+ WT_PREFIX_SKIP_REQUIRED(session, local_name, "file:");
+ obj_name = obj_uri;
+ WT_PREFIX_SKIP_REQUIRED(session, obj_name, "object:");
+
+ /* This call make take a while, and may fail due to network timeout. */
+ WT_RET(storage_source->ss_flush(
+ storage_source, &session->iface, bucket_fs, local_name, obj_name, NULL));
+
+ WT_WITH_CHECKPOINT_LOCK(session,
+ WT_WITH_SCHEMA_LOCK(session, ret = __tier_flush_meta(session, tiered, local_uri, obj_uri)));
+ WT_RET(ret);
+
+ /*
+ * We may need a way to cleanup flushes for those not completed (after a crash), or failed (due
+ * to previous network outage).
*/
- WT_RET(__tier_storage_remove_local(session, NULL, 0));
+ WT_RET(storage_source->ss_flush_finish(
+ storage_source, &session->iface, bucket_fs, local_name, obj_name, NULL));
return (0);
}
/*
+ * __wt_tier_flush --
+ * Given an ID generate the URI names and call the flush code.
+ */
+int
+__wt_tier_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id)
+{
+ WT_DECL_RET;
+ const char *local_uri, *obj_uri;
+
+ local_uri = obj_uri = NULL;
+ WT_ERR(__wt_tiered_name(session, &tiered->iface, id, WT_TIERED_NAME_LOCAL, &local_uri));
+ WT_ERR(__wt_tiered_name(session, &tiered->iface, id, WT_TIERED_NAME_OBJECT, &obj_uri));
+ WT_ERR(__wt_tier_do_flush(session, tiered, local_uri, obj_uri));
+
+err:
+ __wt_free(session, local_uri);
+ __wt_free(session, obj_uri);
+ return (ret);
+}
+
+/*
+ * __tier_storage_copy --
+ * Perform one iteration of copying newly flushed objects to the shared storage.
+ */
+static int
+__tier_storage_copy(WT_SESSION_IMPL *session)
+{
+ WT_DECL_RET;
+ WT_TIERED_WORK_UNIT *entry;
+
+ entry = NULL;
+ for (;;) {
+ /*
+ * We probably need some kind of flush generation so that we don't process flush items for
+ * tables that are added during an in-progress flush_tier. This thread could run due to a
+ * condition timeout rather than a signal. Checking that generation number would be part of
+ * calling __wt_tiered_get_flush so that we don't pull it off the queue until we're sure we
+ * want to process it.
+ */
+ __wt_tiered_get_flush(session, &entry);
+ if (entry == NULL)
+ break;
+ WT_ERR(__wt_tier_flush(session, entry->tiered, entry->id));
+ /*
+ * We are responsible for freeing the work unit when we're done with it.
+ */
+ __wt_free(session, entry);
+ entry = NULL;
+ }
+
+err:
+ if (entry != NULL)
+ __wt_free(session, entry);
+ return (ret);
+}
+
+/*
* __tier_storage_remove --
* Perform one iteration of tiered storage local tier removal.
*/
@@ -238,6 +365,8 @@ __tiered_server(void *arg)
WT_DECL_RET;
WT_ITEM path, tmp;
WT_SESSION_IMPL *session;
+ uint64_t cond_time, time_start, time_stop, timediff;
+ bool signalled;
session = arg;
conn = S2C(session);
@@ -245,21 +374,31 @@ __tiered_server(void *arg)
WT_CLEAR(path);
WT_CLEAR(tmp);
+ /* Condition timeout is in microseconds. */
+ cond_time = WT_MINUTE * WT_MILLION;
+ time_start = __wt_clock(session);
+ signalled = false;
for (;;) {
/* Wait until the next event. */
- __wt_cond_wait(session, conn->tiered_cond, WT_MINUTE, __tiered_server_run_chk);
+ __wt_cond_wait_signal(
+ session, conn->tiered_cond, cond_time, __tiered_server_run_chk, &signalled);
/* Check if we're quitting or being reconfigured. */
if (!__tiered_server_run_chk(session))
break;
+ time_stop = __wt_clock(session);
+ timediff = WT_CLOCKDIFF_SEC(time_stop, time_start);
/*
* Here is where we do work. Work we expect to do:
* - Copy any files that need moving from a flush tier call.
* - Remove any cached objects that are aged out.
*/
- WT_ERR(__tier_storage_copy(session));
- WT_ERR(__tier_storage_remove(session, false));
+ if (timediff >= WT_MINUTE || signalled) {
+ WT_ERR(__tier_storage_copy(session));
+ WT_ERR(__tier_storage_remove(session, false));
+ }
+ time_start = time_stop;
}
if (0) {
@@ -340,7 +479,7 @@ __tiered_mgr_start(WT_CONNECTION_IMPL *conn)
FLD_SET(conn->server_flags, WT_CONN_SERVER_TIERED_MGR);
WT_RET(__wt_open_internal_session(
- conn, "storage-mgr-server", true, 0, 0, &conn->tiered_mgr_session));
+ conn, "storage-mgr-server", false, 0, 0, &conn->tiered_mgr_session));
session = conn->tiered_mgr_session;
WT_RET(__wt_cond_alloc(session, "storage server", &conn->tiered_mgr_cond));
@@ -372,12 +511,12 @@ __wt_tiered_storage_create(WT_SESSION_IMPL *session, const char *cfg[], bool rec
WT_RET(__tiered_manager_config(session, cfg, &start));
/* Start the internal thread. */
+ WT_ERR(__wt_cond_alloc(session, "storage server", &conn->tiered_cond));
FLD_SET(conn->server_flags, WT_CONN_SERVER_TIERED);
WT_ERR(__wt_open_internal_session(conn, "storage-server", true, 0, 0, &conn->tiered_session));
session = conn->tiered_session;
-
- WT_ERR(__wt_cond_alloc(session, "storage server", &conn->tiered_cond));
+ WT_ERR(__wt_txn_reconfigure(session, "isolation=read-uncommitted"));
/* Start the thread. */
WT_ERR(__wt_thread_create(session, &conn->tiered_tid, __tiered_server, session));
@@ -389,6 +528,7 @@ __wt_tiered_storage_create(WT_SESSION_IMPL *session, const char *cfg[], bool rec
if (0) {
err:
+ FLD_CLR(conn->server_flags, WT_CONN_SERVER_TIERED);
WT_TRET(__wt_tiered_storage_destroy(session));
}
return (ret);
@@ -403,6 +543,7 @@ __wt_tiered_storage_destroy(WT_SESSION_IMPL *session)
{
WT_CONNECTION_IMPL *conn;
WT_DECL_RET;
+ WT_TIERED_WORK_UNIT *entry;
conn = S2C(session);
@@ -412,6 +553,10 @@ __wt_tiered_storage_destroy(WT_SESSION_IMPL *session)
__wt_cond_signal(session, conn->tiered_cond);
WT_TRET(__wt_thread_join(session, &conn->tiered_tid));
conn->tiered_tid_set = false;
+ while ((entry = TAILQ_FIRST(&conn->tieredqh)) != NULL) {
+ TAILQ_REMOVE(&conn->tieredqh, entry, q);
+ __wt_free(session, entry);
+ }
}
__wt_cond_destroy(session, &conn->tiered_cond);
if (conn->tiered_session != NULL) {
diff --git a/src/third_party/wiredtiger/src/include/connection.h b/src/third_party/wiredtiger/src/include/connection.h
index 4d78b695ec8..b13ab2b911e 100644
--- a/src/third_party/wiredtiger/src/include/connection.h
+++ b/src/third_party/wiredtiger/src/include/connection.h
@@ -229,6 +229,7 @@ struct __wt_connection_impl {
WT_SPINLOCK reconfig_lock; /* Single thread reconfigure */
WT_SPINLOCK schema_lock; /* Schema operation spinlock */
WT_RWLOCK table_lock; /* Table list lock */
+ WT_SPINLOCK tiered_lock; /* Tiered work queue spinlock */
WT_SPINLOCK turtle_lock; /* Turtle file spinlock */
WT_RWLOCK dhandle_lock; /* Data handle list lock */
@@ -278,13 +279,15 @@ struct __wt_connection_impl {
TAILQ_HEAD(__wt_dhhash, __wt_data_handle) * dhhash;
/* Locked: data handle list */
TAILQ_HEAD(__wt_dhandle_qh, __wt_data_handle) dhqh;
- /* Locked: LSM handle list. */
- TAILQ_HEAD(__wt_lsm_qh, __wt_lsm_tree) lsmqh;
+ /* Locked: dynamic library handle list */
+ TAILQ_HEAD(__wt_dlh_qh, __wt_dlh) dlhqh;
/* Locked: file list */
TAILQ_HEAD(__wt_fhhash, __wt_fh) * fhhash;
TAILQ_HEAD(__wt_fh_qh, __wt_fh) fhqh;
- /* Locked: library list */
- TAILQ_HEAD(__wt_dlh_qh, __wt_dlh) dlhqh;
+ /* Locked: LSM handle list. */
+ TAILQ_HEAD(__wt_lsm_qh, __wt_lsm_tree) lsmqh;
+ /* Locked: Tiered system work queue. */
+ TAILQ_HEAD(__wt_tiered_qh, __wt_tiered_work_unit) tieredqh;
WT_SPINLOCK block_lock; /* Locked: block manager list */
TAILQ_HEAD(__wt_blockhash, __wt_block) * blockhash;
@@ -413,6 +416,7 @@ struct __wt_connection_impl {
const char *stat_stamp; /* Statistics log entry timestamp */
uint64_t stat_usecs; /* Statistics log period */
+ uint64_t tiered_retention; /* Earliest time to check to remove local overlap copies */
WT_SESSION_IMPL *tiered_session; /* Tiered thread session */
wt_thread_t tiered_tid; /* Tiered thread */
bool tiered_tid_set; /* Tiered thread set */
diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h
index 3d0eb0554fd..8068e402dfd 100644
--- a/src/third_party/wiredtiger/src/include/extern.h
+++ b/src/third_party/wiredtiger/src/include/extern.h
@@ -1452,6 +1452,10 @@ extern int __wt_thread_group_destroy(WT_SESSION_IMPL *session, WT_THREAD_GROUP *
extern int __wt_thread_group_resize(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group,
uint32_t new_min, uint32_t new_max, uint32_t flags)
WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
+extern int __wt_tier_do_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, const char *local_uri,
+ const char *obj_uri) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
+extern int __wt_tier_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id)
+ WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_tiered_bucket_config(WT_SESSION_IMPL *session, const char *cfg[],
WT_BUCKET_STORAGE **bstoragep) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_tiered_close(WT_SESSION_IMPL *session, WT_TIERED *tiered)
@@ -1462,6 +1466,12 @@ extern int __wt_tiered_name(WT_SESSION_IMPL *session, WT_DATA_HANDLE *dhandle, u
uint32_t flags, const char **retp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_tiered_open(WT_SESSION_IMPL *session, const char *cfg[])
WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
+extern int __wt_tiered_put_drop_local(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id)
+ WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
+extern int __wt_tiered_put_drop_shared(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id)
+ WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
+extern int __wt_tiered_put_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered)
+ WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_tiered_storage_create(WT_SESSION_IMPL *session, const char *cfg[], bool reconfig)
WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_tiered_storage_destroy(WT_SESSION_IMPL *session)
@@ -1807,6 +1817,13 @@ extern void __wt_stat_session_init_single(WT_SESSION_STATS *stats);
extern void __wt_thread_group_start_one(
WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool is_locked);
extern void __wt_thread_group_stop_one(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group);
+extern void __wt_tiered_get_drop_local(
+ WT_SESSION_IMPL *session, uint64_t now, WT_TIERED_WORK_UNIT **entryp);
+extern void __wt_tiered_get_drop_shared(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp);
+extern void __wt_tiered_get_flush(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp);
+extern void __wt_tiered_pop_work(
+ WT_SESSION_IMPL *session, uint32_t type, uint64_t maxval, WT_TIERED_WORK_UNIT **entryp);
+extern void __wt_tiered_push_work(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT *entry);
extern void __wt_timestamp_to_hex_string(wt_timestamp_t ts, char *hex_timestamp);
extern void __wt_txn_bump_snapshot(WT_SESSION_IMPL *session);
extern void __wt_txn_clear_durable_timestamp(WT_SESSION_IMPL *session);
diff --git a/src/third_party/wiredtiger/src/include/stat.h b/src/third_party/wiredtiger/src/include/stat.h
index 540c79187b4..3f4bd5150e3 100644
--- a/src/third_party/wiredtiger/src/include/stat.h
+++ b/src/third_party/wiredtiger/src/include/stat.h
@@ -777,6 +777,8 @@ struct __wt_connection_stats {
int64_t rec_time_window_durable_stop_ts;
int64_t rec_time_window_stop_ts;
int64_t rec_time_window_stop_txn;
+ int64_t tiered_work_units_dequeued;
+ int64_t tiered_work_units_created;
int64_t tiered_retention;
int64_t tiered_object_size;
int64_t txn_read_race_prepare_update;
@@ -996,6 +998,8 @@ struct __wt_dsrc_stats {
int64_t rec_time_window_durable_stop_ts;
int64_t rec_time_window_stop_ts;
int64_t rec_time_window_stop_txn;
+ int64_t tiered_work_units_dequeued;
+ int64_t tiered_work_units_created;
int64_t tiered_retention;
int64_t tiered_object_size;
int64_t txn_read_race_prepare_update;
diff --git a/src/third_party/wiredtiger/src/include/tiered.h b/src/third_party/wiredtiger/src/include/tiered.h
index 6182215d235..8fa2fec35a7 100644
--- a/src/third_party/wiredtiger/src/include/tiered.h
+++ b/src/third_party/wiredtiger/src/include/tiered.h
@@ -47,6 +47,32 @@ struct __wt_tiered_manager {
/* AUTOMATIC FLAG VALUE GENERATION STOP */
/*
+ * Different types of work units for tiered trees.
+ */
+/* AUTOMATIC FLAG VALUE GENERATION START */
+#define WT_TIERED_WORK_DROP_LOCAL 0x1u /* Drop object from local storage. */
+#define WT_TIERED_WORK_DROP_SHARED 0x2u /* Drop object from tier. */
+#define WT_TIERED_WORK_FLUSH 0x4u /* Flush object to tier. */
+/* AUTOMATIC FLAG VALUE GENERATION STOP */
+
+/*
+ * WT_TIERED_WORK_UNIT --
+ * A definition of maintenance that a tiered tree needs done.
+ */
+struct __wt_tiered_work_unit {
+ TAILQ_ENTRY(__wt_tiered_work_unit) q; /* Worker unit queue */
+ uint32_t type; /* Type of operation */
+ uint64_t op_val; /* A value for the operation */
+ WT_TIERED *tiered; /* Tiered tree */
+ uint64_t id; /* Id of the object */
+/* AUTOMATIC FLAG VALUE GENERATION START */
+#define WT_TIERED_WORK_FORCE 0x1u /* Force operation */
+#define WT_TIERED_WORK_FREE 0x2u /* Free data after operation */
+ /* AUTOMATIC FLAG VALUE GENERATION STOP */
+ uint32_t flags; /* Flags for operation */
+};
+
+/*
* WT_TIERED_TIERS --
* Information we need to keep about each tier such as its data handle and name.
* We define operations that each tier can accept. The local tier should be able to accept
diff --git a/src/third_party/wiredtiger/src/include/wiredtiger.in b/src/third_party/wiredtiger/src/include/wiredtiger.in
index 8c626235ec9..c0e37648b07 100644
--- a/src/third_party/wiredtiger/src/include/wiredtiger.in
+++ b/src/third_party/wiredtiger/src/include/wiredtiger.in
@@ -6067,35 +6067,39 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection);
#define WT_STAT_CONN_REC_TIME_WINDOW_STOP_TS 1462
/*! reconciliation: records written including a stop transaction ID */
#define WT_STAT_CONN_REC_TIME_WINDOW_STOP_TXN 1463
+/*! session: tiered operations dequeued and processed */
+#define WT_STAT_CONN_TIERED_WORK_UNITS_DEQUEUED 1464
+/*! session: tiered operations scheduled */
+#define WT_STAT_CONN_TIERED_WORK_UNITS_CREATED 1465
/*! session: tiered storage local retention time (secs) */
-#define WT_STAT_CONN_TIERED_RETENTION 1464
+#define WT_STAT_CONN_TIERED_RETENTION 1466
/*! session: tiered storage object size */
-#define WT_STAT_CONN_TIERED_OBJECT_SIZE 1465
+#define WT_STAT_CONN_TIERED_OBJECT_SIZE 1467
/*! transaction: race to read prepared update retry */
-#define WT_STAT_CONN_TXN_READ_RACE_PREPARE_UPDATE 1466
+#define WT_STAT_CONN_TXN_READ_RACE_PREPARE_UPDATE 1468
/*!
* transaction: rollback to stable history store records with stop
* timestamps older than newer records
*/
-#define WT_STAT_CONN_TXN_RTS_HS_STOP_OLDER_THAN_NEWER_START 1467
+#define WT_STAT_CONN_TXN_RTS_HS_STOP_OLDER_THAN_NEWER_START 1469
/*! transaction: rollback to stable inconsistent checkpoint */
-#define WT_STAT_CONN_TXN_RTS_INCONSISTENT_CKPT 1468
+#define WT_STAT_CONN_TXN_RTS_INCONSISTENT_CKPT 1470
/*! transaction: rollback to stable keys removed */
-#define WT_STAT_CONN_TXN_RTS_KEYS_REMOVED 1469
+#define WT_STAT_CONN_TXN_RTS_KEYS_REMOVED 1471
/*! transaction: rollback to stable keys restored */
-#define WT_STAT_CONN_TXN_RTS_KEYS_RESTORED 1470
+#define WT_STAT_CONN_TXN_RTS_KEYS_RESTORED 1472
/*! transaction: rollback to stable restored tombstones from history store */
-#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_TOMBSTONES 1471
+#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_TOMBSTONES 1473
/*! transaction: rollback to stable restored updates from history store */
-#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_UPDATES 1472
+#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_UPDATES 1474
/*! transaction: rollback to stable sweeping history store keys */
-#define WT_STAT_CONN_TXN_RTS_SWEEP_HS_KEYS 1473
+#define WT_STAT_CONN_TXN_RTS_SWEEP_HS_KEYS 1475
/*! transaction: rollback to stable updates removed from history store */
-#define WT_STAT_CONN_TXN_RTS_HS_REMOVED 1474
+#define WT_STAT_CONN_TXN_RTS_HS_REMOVED 1476
/*! transaction: transaction checkpoints due to obsolete pages */
-#define WT_STAT_CONN_TXN_CHECKPOINT_OBSOLETE_APPLIED 1475
+#define WT_STAT_CONN_TXN_CHECKPOINT_OBSOLETE_APPLIED 1477
/*! transaction: update conflicts */
-#define WT_STAT_CONN_TXN_UPDATE_CONFLICT 1476
+#define WT_STAT_CONN_TXN_UPDATE_CONFLICT 1478
/*!
* @}
@@ -6691,35 +6695,39 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection);
#define WT_STAT_DSRC_REC_TIME_WINDOW_STOP_TS 2197
/*! reconciliation: records written including a stop transaction ID */
#define WT_STAT_DSRC_REC_TIME_WINDOW_STOP_TXN 2198
+/*! session: tiered operations dequeued and processed */
+#define WT_STAT_DSRC_TIERED_WORK_UNITS_DEQUEUED 2199
+/*! session: tiered operations scheduled */
+#define WT_STAT_DSRC_TIERED_WORK_UNITS_CREATED 2200
/*! session: tiered storage local retention time (secs) */
-#define WT_STAT_DSRC_TIERED_RETENTION 2199
+#define WT_STAT_DSRC_TIERED_RETENTION 2201
/*! session: tiered storage object size */
-#define WT_STAT_DSRC_TIERED_OBJECT_SIZE 2200
+#define WT_STAT_DSRC_TIERED_OBJECT_SIZE 2202
/*! transaction: race to read prepared update retry */
-#define WT_STAT_DSRC_TXN_READ_RACE_PREPARE_UPDATE 2201
+#define WT_STAT_DSRC_TXN_READ_RACE_PREPARE_UPDATE 2203
/*!
* transaction: rollback to stable history store records with stop
* timestamps older than newer records
*/
-#define WT_STAT_DSRC_TXN_RTS_HS_STOP_OLDER_THAN_NEWER_START 2202
+#define WT_STAT_DSRC_TXN_RTS_HS_STOP_OLDER_THAN_NEWER_START 2204
/*! transaction: rollback to stable inconsistent checkpoint */
-#define WT_STAT_DSRC_TXN_RTS_INCONSISTENT_CKPT 2203
+#define WT_STAT_DSRC_TXN_RTS_INCONSISTENT_CKPT 2205
/*! transaction: rollback to stable keys removed */
-#define WT_STAT_DSRC_TXN_RTS_KEYS_REMOVED 2204
+#define WT_STAT_DSRC_TXN_RTS_KEYS_REMOVED 2206
/*! transaction: rollback to stable keys restored */
-#define WT_STAT_DSRC_TXN_RTS_KEYS_RESTORED 2205
+#define WT_STAT_DSRC_TXN_RTS_KEYS_RESTORED 2207
/*! transaction: rollback to stable restored tombstones from history store */
-#define WT_STAT_DSRC_TXN_RTS_HS_RESTORE_TOMBSTONES 2206
+#define WT_STAT_DSRC_TXN_RTS_HS_RESTORE_TOMBSTONES 2208
/*! transaction: rollback to stable restored updates from history store */
-#define WT_STAT_DSRC_TXN_RTS_HS_RESTORE_UPDATES 2207
+#define WT_STAT_DSRC_TXN_RTS_HS_RESTORE_UPDATES 2209
/*! transaction: rollback to stable sweeping history store keys */
-#define WT_STAT_DSRC_TXN_RTS_SWEEP_HS_KEYS 2208
+#define WT_STAT_DSRC_TXN_RTS_SWEEP_HS_KEYS 2210
/*! transaction: rollback to stable updates removed from history store */
-#define WT_STAT_DSRC_TXN_RTS_HS_REMOVED 2209
+#define WT_STAT_DSRC_TXN_RTS_HS_REMOVED 2211
/*! transaction: transaction checkpoints due to obsolete pages */
-#define WT_STAT_DSRC_TXN_CHECKPOINT_OBSOLETE_APPLIED 2210
+#define WT_STAT_DSRC_TXN_CHECKPOINT_OBSOLETE_APPLIED 2212
/*! transaction: update conflicts */
-#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2211
+#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2213
/*!
* @}
diff --git a/src/third_party/wiredtiger/src/include/wt_internal.h b/src/third_party/wiredtiger/src/include/wt_internal.h
index 0afc5000662..4f896a73525 100644
--- a/src/third_party/wiredtiger/src/include/wt_internal.h
+++ b/src/third_party/wiredtiger/src/include/wt_internal.h
@@ -325,6 +325,8 @@ struct __wt_tiered_tiers;
typedef struct __wt_tiered_tiers WT_TIERED_TIERS;
struct __wt_tiered_tree;
typedef struct __wt_tiered_tree WT_TIERED_TREE;
+struct __wt_tiered_work_unit;
+typedef struct __wt_tiered_work_unit WT_TIERED_WORK_UNIT;
struct __wt_time_aggregate;
typedef struct __wt_time_aggregate WT_TIME_AGGREGATE;
struct __wt_time_window;
diff --git a/src/third_party/wiredtiger/src/schema/schema_alter.c b/src/third_party/wiredtiger/src/schema/schema_alter.c
index 930fdedba8b..3bc35ddcd89 100644
--- a/src/third_party/wiredtiger/src/schema/schema_alter.c
+++ b/src/third_party/wiredtiger/src/schema/schema_alter.c
@@ -79,6 +79,19 @@ __alter_file(WT_SESSION_IMPL *session, const char *newcfg[])
}
/*
+ * __alter_object --
+ * Alter a tiered object. There are no object dhandles.
+ */
+static int
+__alter_object(WT_SESSION_IMPL *session, const char *uri, const char *newcfg[])
+{
+ if (!WT_PREFIX_MATCH(uri, "object:"))
+ return (__wt_unexpected_object_type(session, uri, "object:"));
+
+ return (__alter_apply(session, uri, newcfg, WT_CONFIG_BASE(session, object_meta)));
+}
+
+/*
* __alter_tree --
* Alter an index or colgroup reference.
*/
@@ -218,6 +231,8 @@ __schema_alter(WT_SESSION_IMPL *session, const char *uri, const char *newcfg[])
return (__alter_tree(session, uri, newcfg));
if (WT_PREFIX_MATCH(uri, "lsm:"))
return (__wt_lsm_tree_worker(session, uri, __alter_file, NULL, newcfg, flags));
+ if (WT_PREFIX_MATCH(uri, "object:"))
+ return (__alter_object(session, uri, newcfg));
if (WT_PREFIX_MATCH(uri, "table:"))
return (__alter_table(session, uri, newcfg, exclusive_refreshed));
if (WT_PREFIX_MATCH(uri, "tiered:"))
diff --git a/src/third_party/wiredtiger/src/support/err.c b/src/third_party/wiredtiger/src/support/err.c
index f4c20e02746..81212ff65da 100644
--- a/src/third_party/wiredtiger/src/support/err.c
+++ b/src/third_party/wiredtiger/src/support/err.c
@@ -573,8 +573,9 @@ __wt_bad_object_type(WT_SESSION_IMPL *session, const char *uri) WT_GCC_FUNC_ATTR
if (WT_PREFIX_MATCH(uri, "backup:") || WT_PREFIX_MATCH(uri, "colgroup:") ||
WT_PREFIX_MATCH(uri, "config:") || WT_PREFIX_MATCH(uri, "file:") ||
WT_PREFIX_MATCH(uri, "index:") || WT_PREFIX_MATCH(uri, "log:") ||
- WT_PREFIX_MATCH(uri, "lsm:") || WT_PREFIX_MATCH(uri, "statistics:") ||
- WT_PREFIX_MATCH(uri, "table:") || WT_PREFIX_MATCH(uri, "tiered:"))
+ WT_PREFIX_MATCH(uri, "lsm:") || WT_PREFIX_MATCH(uri, "object:") ||
+ WT_PREFIX_MATCH(uri, "statistics:") || WT_PREFIX_MATCH(uri, "table:") ||
+ WT_PREFIX_MATCH(uri, "tiered:"))
return (__wt_object_unsupported(session, uri));
WT_RET_MSG(session, ENOTSUP, "unknown object type: %s", uri);
diff --git a/src/third_party/wiredtiger/src/support/stat.c b/src/third_party/wiredtiger/src/support/stat.c
index fb9f7870a54..647d87bf9fb 100644
--- a/src/third_party/wiredtiger/src/support/stat.c
+++ b/src/third_party/wiredtiger/src/support/stat.c
@@ -207,6 +207,8 @@ static const char *const __stats_dsrc_desc[] = {
"reconciliation: records written including a stop durable timestamp",
"reconciliation: records written including a stop timestamp",
"reconciliation: records written including a stop transaction ID",
+ "session: tiered operations dequeued and processed",
+ "session: tiered operations scheduled",
"session: tiered storage local retention time (secs)",
"session: tiered storage object size",
"transaction: race to read prepared update retry",
@@ -460,6 +462,8 @@ __wt_stat_dsrc_clear_single(WT_DSRC_STATS *stats)
stats->rec_time_window_durable_stop_ts = 0;
stats->rec_time_window_stop_ts = 0;
stats->rec_time_window_stop_txn = 0;
+ stats->tiered_work_units_dequeued = 0;
+ stats->tiered_work_units_created = 0;
/* not clearing tiered_retention */
/* not clearing tiered_object_size */
stats->txn_read_race_prepare_update = 0;
@@ -699,6 +703,8 @@ __wt_stat_dsrc_aggregate_single(WT_DSRC_STATS *from, WT_DSRC_STATS *to)
to->rec_time_window_durable_stop_ts += from->rec_time_window_durable_stop_ts;
to->rec_time_window_stop_ts += from->rec_time_window_stop_ts;
to->rec_time_window_stop_txn += from->rec_time_window_stop_txn;
+ to->tiered_work_units_dequeued += from->tiered_work_units_dequeued;
+ to->tiered_work_units_created += from->tiered_work_units_created;
to->tiered_retention += from->tiered_retention;
to->tiered_object_size += from->tiered_object_size;
to->txn_read_race_prepare_update += from->txn_read_race_prepare_update;
@@ -945,6 +951,8 @@ __wt_stat_dsrc_aggregate(WT_DSRC_STATS **from, WT_DSRC_STATS *to)
to->rec_time_window_durable_stop_ts += WT_STAT_READ(from, rec_time_window_durable_stop_ts);
to->rec_time_window_stop_ts += WT_STAT_READ(from, rec_time_window_stop_ts);
to->rec_time_window_stop_txn += WT_STAT_READ(from, rec_time_window_stop_txn);
+ to->tiered_work_units_dequeued += WT_STAT_READ(from, tiered_work_units_dequeued);
+ to->tiered_work_units_created += WT_STAT_READ(from, tiered_work_units_created);
to->tiered_retention += WT_STAT_READ(from, tiered_retention);
to->tiered_object_size += WT_STAT_READ(from, tiered_object_size);
to->txn_read_race_prepare_update += WT_STAT_READ(from, txn_read_race_prepare_update);
@@ -1436,6 +1444,8 @@ static const char *const __stats_connection_desc[] = {
"reconciliation: records written including a stop durable timestamp",
"reconciliation: records written including a stop timestamp",
"reconciliation: records written including a stop transaction ID",
+ "session: tiered operations dequeued and processed",
+ "session: tiered operations scheduled",
"session: tiered storage local retention time (secs)",
"session: tiered storage object size",
"transaction: race to read prepared update retry",
@@ -1954,6 +1964,8 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats)
stats->rec_time_window_durable_stop_ts = 0;
stats->rec_time_window_stop_ts = 0;
stats->rec_time_window_stop_txn = 0;
+ stats->tiered_work_units_dequeued = 0;
+ stats->tiered_work_units_created = 0;
/* not clearing tiered_retention */
/* not clearing tiered_object_size */
stats->txn_read_race_prepare_update = 0;
@@ -2483,6 +2495,8 @@ __wt_stat_connection_aggregate(WT_CONNECTION_STATS **from, WT_CONNECTION_STATS *
to->rec_time_window_durable_stop_ts += WT_STAT_READ(from, rec_time_window_durable_stop_ts);
to->rec_time_window_stop_ts += WT_STAT_READ(from, rec_time_window_stop_ts);
to->rec_time_window_stop_txn += WT_STAT_READ(from, rec_time_window_stop_txn);
+ to->tiered_work_units_dequeued += WT_STAT_READ(from, tiered_work_units_dequeued);
+ to->tiered_work_units_created += WT_STAT_READ(from, tiered_work_units_created);
to->tiered_retention += WT_STAT_READ(from, tiered_retention);
to->tiered_object_size += WT_STAT_READ(from, tiered_object_size);
to->txn_read_race_prepare_update += WT_STAT_READ(from, txn_read_race_prepare_update);
diff --git a/src/third_party/wiredtiger/src/tiered/tiered_handle.c b/src/third_party/wiredtiger/src/tiered/tiered_handle.c
index 363a9c97140..11ecd0a4941 100644
--- a/src/third_party/wiredtiger/src/tiered/tiered_handle.c
+++ b/src/third_party/wiredtiger/src/tiered/tiered_handle.c
@@ -138,7 +138,7 @@ __tiered_create_object(WT_SESSION_IMPL *session, WT_TIERED *tiered)
orig_name = tiered->tiers[WT_TIERED_INDEX_LOCAL].name;
/*
* If we have an existing local file in the tier, alter the table to indicate this one is now
- * readonly.
+ * readonly. We are already holding the schema lock so we can call alter.
*/
if (orig_name != NULL) {
cfg[0] = "readonly=true";
@@ -161,13 +161,6 @@ __tiered_create_object(WT_SESSION_IMPL *session, WT_TIERED *tiered)
/* Create the new shared object. */
WT_ERR(__wt_schema_create(session, name, config));
-#if 0
- /*
- * If we get here we have successfully created the object. It is ready to be fully flushed to
- * the cloud. Push a work element to let the internal thread do that here.
- */
-#endif
-
err:
__wt_free(session, config);
__wt_free(session, name);
@@ -313,10 +306,6 @@ static int
__tiered_switch(WT_SESSION_IMPL *session, const char *config)
{
WT_DECL_RET;
-#if 0
- WT_FILE_SYSTEM *fs;
- WT_STORAGE_SOURCE *storage_source;
-#endif
WT_TIERED *tiered;
bool need_object, need_tree, tracking;
@@ -357,42 +346,22 @@ __tiered_switch(WT_SESSION_IMPL *session, const char *config)
WT_RET(__wt_meta_track_on(session));
tracking = true;
- /* Create the object: entry in the metadata. */
- if (need_object)
- WT_ERR(__tiered_create_object(session, tiered));
-
if (need_tree)
WT_ERR(__tiered_create_tier_tree(session, tiered));
+ /* Create the object: entry in the metadata. */
+ if (need_object) {
+ WT_ERR(__tiered_create_object(session, tiered));
+#if 1
+ WT_ERR(__wt_tiered_put_flush(session, tiered));
+#else
+ WT_ERR(__wt_tier_flush(session, tiered, tiered->current_id));
+#endif
+ }
+
/* We always need to create a local object. */
WT_ERR(__tiered_create_local(session, tiered));
-#if 0
- /*
- * We expect this part to be done asynchronously in its own thread. First flush the contents of
- * the data file to the new cloud object.
- */
- storage_source = tiered->bstorage->storage_source;
- fs = tiered->bucket_storage->file_system;
- WT_ASSERT(session, storage_source != NULL);
-
- /* This call make take a while, and may fail due to network timeout. */
- WT_ERR(storage_source->ss_flush(storage_source, &session->iface,
- fs, old_filename, object_name, NULL));
-
- /*
- * The metadata for the old local object will be initialized with "flush=0". When the flush call
- * completes, it can be marked as "flush=1". When that's done, we can finish the flush. The
- * flush finish call moves the file from the home directory to the extension's cache. Then the
- * extension will own it.
- *
- * We may need a way to restart flushes for those not completed (after a crash), or failed (due
- * to previous network outage).
- */
- WT_ERR(storage_source->ss_flush_finish(storage_source, &session->iface,
- fs, old_filename, object_name, NULL));
-#endif
-
/* Update the tiered: metadata to new object number and tiered array. */
WT_ERR(__tiered_update_metadata(session, tiered, config));
tracking = false;
@@ -485,7 +454,10 @@ __tiered_open(WT_SESSION_IMPL *session, const char *cfg[])
WT_DECL_ITEM(tmp);
WT_DECL_RET;
WT_TIERED *tiered;
+#if 1
+ WT_TIERED_WORK_UNIT *entry;
uint32_t unused;
+#endif
char *metaconf;
const char *obj_cfg[] = {WT_CONFIG_BASE(session, object_meta), NULL, NULL};
const char **tiered_cfg, *config;
@@ -549,10 +521,17 @@ __tiered_open(WT_SESSION_IMPL *session, const char *cfg[])
__wt_free(session, dhandle->cfg[1]);
dhandle->cfg[1] = metaconf;
}
+#if 1
if (0) {
/* Temp code to keep s_all happy. */
FLD_SET(unused, WT_TIERED_OBJ_LOCAL | WT_TIERED_TREE_UNUSED);
+ FLD_SET(unused, WT_TIERED_WORK_FORCE | WT_TIERED_WORK_FREE);
+ WT_ERR(__wt_tiered_put_drop_local(session, tiered, tiered->current_id));
+ WT_ERR(__wt_tiered_put_drop_shared(session, tiered, tiered->current_id));
+ __wt_tiered_get_drop_local(session, 0, &entry);
+ __wt_tiered_get_drop_shared(session, &entry);
}
+#endif
if (0) {
err:
diff --git a/src/third_party/wiredtiger/src/tiered/tiered_work.c b/src/third_party/wiredtiger/src/tiered/tiered_work.c
new file mode 100644
index 00000000000..728a7a0b3b2
--- /dev/null
+++ b/src/third_party/wiredtiger/src/tiered/tiered_work.c
@@ -0,0 +1,151 @@
+/*-
+ * Copyright (c) 2014-present MongoDB, Inc.
+ * Copyright (c) 2008-2014 WiredTiger, Inc.
+ * All rights reserved.
+ *
+ * See the file LICENSE for redistribution information.
+ */
+
+#include "wt_internal.h"
+
+/*
+ * __wt_tiered_push_work --
+ * Push a work unit to the queue. Assumes it is passed an already filled out structure.
+ */
+void
+__wt_tiered_push_work(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT *entry)
+{
+ WT_CONNECTION_IMPL *conn;
+
+ conn = S2C(session);
+ __wt_spin_lock(session, &conn->tiered_lock);
+ TAILQ_INSERT_TAIL(&conn->tieredqh, entry, q);
+ WT_STAT_CONN_INCR(session, tiered_work_units_created);
+ __wt_spin_unlock(session, &conn->tiered_lock);
+ __wt_cond_signal(session, conn->tiered_cond);
+ return;
+}
+
+/*
+ * __wt_tiered_pop_work --
+ * Pop a work unit of the given type from the queue. If a maximum value is given, only return a
+ * work unit that is less than the maximum value. The caller is responsible for freeing the
+ * returned work unit structure.
+ */
+void
+__wt_tiered_pop_work(
+ WT_SESSION_IMPL *session, uint32_t type, uint64_t maxval, WT_TIERED_WORK_UNIT **entryp)
+{
+ WT_CONNECTION_IMPL *conn;
+ WT_TIERED_WORK_UNIT *entry;
+
+ *entryp = entry = NULL;
+
+ conn = S2C(session);
+ if (TAILQ_EMPTY(&conn->tieredqh))
+ return;
+ __wt_spin_lock(session, &conn->tiered_lock);
+
+ TAILQ_FOREACH (entry, &conn->tieredqh, q) {
+ if (FLD_ISSET(type, entry->type) && (maxval == 0 || entry->op_val < maxval)) {
+ TAILQ_REMOVE(&conn->tieredqh, entry, q);
+ WT_STAT_CONN_INCR(session, tiered_work_units_dequeued);
+ break;
+ }
+ }
+ *entryp = entry;
+ __wt_spin_unlock(session, &conn->tiered_lock);
+ return;
+}
+
+/*
+ * __wt_tiered_get_flush --
+ * Get the first flush work unit from the queue. The id information cannot change between our
+ * caller and here. The caller is responsible for freeing the work unit.
+ */
+void
+__wt_tiered_get_flush(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp)
+{
+ __wt_tiered_pop_work(session, WT_TIERED_WORK_FLUSH, 0, entryp);
+ return;
+}
+
+/*
+ * __wt_tiered_get_drop_local --
+ * Get a drop local work unit if it is less than the time given. The caller is responsible for
+ * freeing the work unit.
+ */
+void
+__wt_tiered_get_drop_local(WT_SESSION_IMPL *session, uint64_t now, WT_TIERED_WORK_UNIT **entryp)
+{
+ __wt_tiered_pop_work(session, WT_TIERED_WORK_DROP_LOCAL, now, entryp);
+ return;
+}
+
+/*
+ * __wt_tiered_get_drop_shared --
+ * Get a drop shared work unit. The caller is responsible for freeing the work unit.
+ */
+void
+__wt_tiered_get_drop_shared(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp)
+{
+ __wt_tiered_pop_work(session, WT_TIERED_WORK_DROP_SHARED, 0, entryp);
+ return;
+}
+
+/*
+ * __wt_tiered_put_drop_local --
+ * Add a drop local work unit for the given ID to the queue.
+ */
+int
+__wt_tiered_put_drop_local(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id)
+{
+ WT_TIERED_WORK_UNIT *entry;
+ uint64_t now;
+
+ WT_RET(__wt_calloc_one(session, &entry));
+ entry->type = WT_TIERED_WORK_DROP_LOCAL;
+ entry->id = id;
+ WT_ASSERT(session, tiered->bstorage != NULL);
+ __wt_seconds(session, &now);
+ /* Put a work unit in the queue with the time this object expires. */
+ entry->op_val = now + tiered->bstorage->retain_secs;
+ entry->tiered = tiered;
+ __wt_tiered_push_work(session, entry);
+ return (0);
+}
+
+/*
+ * __wt_tiered_put_drop_shared --
+ * Add a drop shared work unit for the given ID to the queue.
+ */
+int
+__wt_tiered_put_drop_shared(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id)
+{
+ WT_TIERED_WORK_UNIT *entry;
+
+ WT_RET(__wt_calloc_one(session, &entry));
+ entry->type = WT_TIERED_WORK_DROP_SHARED;
+ entry->id = id;
+ entry->tiered = tiered;
+ __wt_tiered_push_work(session, entry);
+ return (0);
+}
+
+/*
+ * __wt_tiered_put_flush --
+ * Add a flush work unit to the queue. We're single threaded so the tiered structure's id
+ * information cannot change between our caller and here.
+ */
+int
+__wt_tiered_put_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered)
+{
+ WT_TIERED_WORK_UNIT *entry;
+
+ WT_RET(__wt_calloc_one(session, &entry));
+ entry->type = WT_TIERED_WORK_FLUSH;
+ entry->id = tiered->current_id;
+ entry->tiered = tiered;
+ __wt_tiered_push_work(session, entry);
+ return (0);
+}
diff --git a/src/third_party/wiredtiger/test/suite/test_tiered04.py b/src/third_party/wiredtiger/test/suite/test_tiered04.py
index 75d4fac1b19..e442fc23f14 100755
--- a/src/third_party/wiredtiger/test/suite/test_tiered04.py
+++ b/src/third_party/wiredtiger/test/suite/test_tiered04.py
@@ -35,7 +35,7 @@ StorageSource = wiredtiger.StorageSource # easy access to constants
class test_tiered04(wttest.WiredTigerTestCase):
# If the 'uri' changes all the other names must change with it.
- fileuri = 'file:test_tiered04-0000000001.wtobj'
+ fileuri_base = 'file:test_tiered04-000000000'
objuri = 'object:test_tiered04-0000000001.wtobj'
tiereduri = "tiered:test_tiered04"
uri = "table:test_tiered04"
@@ -118,12 +118,15 @@ class test_tiered04(wttest.WiredTigerTestCase):
self.pr("flush tier again")
self.session.flush_tier(None)
calls = self.get_stat(stat.conn.flush_tier, None)
- self.assertEqual(calls, 2)
+ flush = 2
+ self.assertEqual(calls, flush)
obj = self.get_stat(stat.conn.tiered_object_size, None)
self.assertEqual(obj, self.object_sys_val)
+ # As we flush each object, we are currently removing the file: object. So N + 1 exists.
+ fileuri = self.fileuri_base + str(flush + 1) + '.wtobj'
self.check_metadata(self.tiereduri, intl_page)
- self.check_metadata(self.fileuri, intl_page)
+ self.check_metadata(fileuri, intl_page)
self.check_metadata(self.objuri, intl_page)
#self.pr("verify stats")
@@ -155,11 +158,11 @@ class test_tiered04(wttest.WiredTigerTestCase):
config = 'tiered_storage=(local_retention=%d)' % new
self.pr("reconfigure")
self.conn.reconfigure(config)
- self.session.flush_tier(None)
retain = self.get_stat(stat.conn.tiered_retention, None)
- calls = self.get_stat(stat.conn.flush_tier, None)
self.assertEqual(retain, new)
- self.assertEqual(calls, 5)
+ #self.session.flush_tier(None)
+ #calls = self.get_stat(stat.conn.flush_tier, None)
+ #self.assertEqual(calls, 5)
if __name__ == '__main__':
wttest.run()