diff options
author | Luke Chen <luke.chen@mongodb.com> | 2021-05-25 16:05:17 +1000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-25 06:28:10 +0000 |
commit | 882833bcbaf88c73e16a46d25ef350238bc4e096 (patch) | |
tree | b76f2b87a5220b37487d6fcb4f38ac57302119d2 /src | |
parent | f7f4835efcb4632d5c12a95608ad9605598986b5 (diff) | |
download | mongo-882833bcbaf88c73e16a46d25ef350238bc4e096.tar.gz |
Import wiredtiger: f0d322252d4becd1b4d183c0a2d62bb904b5b3a6 from branch mongodb-5.0
ref: 2b482755b5..f0d322252d
for: 5.0.0
WT-7498 Implement tiered storage internal thread operations
Diffstat (limited to 'src')
17 files changed, 470 insertions, 95 deletions
diff --git a/src/third_party/wiredtiger/dist/filelist b/src/third_party/wiredtiger/dist/filelist index ea101a60e15..5bdbe137fa2 100644 --- a/src/third_party/wiredtiger/dist/filelist +++ b/src/third_party/wiredtiger/dist/filelist @@ -212,6 +212,7 @@ src/support/timestamp.c src/support/update_vector.c src/tiered/tiered_config.c src/tiered/tiered_handle.c +src/tiered/tiered_work.c src/txn/txn.c src/txn/txn_ckpt.c src/txn/txn_ext.c diff --git a/src/third_party/wiredtiger/dist/stat_data.py b/src/third_party/wiredtiger/dist/stat_data.py index 89a5578b362..e1bf0ebcc4e 100644 --- a/src/third_party/wiredtiger/dist/stat_data.py +++ b/src/third_party/wiredtiger/dist/stat_data.py @@ -865,6 +865,8 @@ conn_dsrc_stats = [ ########################################## StorageStat('tiered_object_size', 'tiered storage object size', 'no_clear,no_scale,size'), StorageStat('tiered_retention', 'tiered storage local retention time (secs)', 'no_clear,no_scale,size'), + StorageStat('tiered_work_units_created', 'tiered operations scheduled'), + StorageStat('tiered_work_units_dequeued', 'tiered operations dequeued and processed'), ########################################## # Transaction statistics diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index e400c9ff703..6a9d82ba451 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-5.0", - "commit": "2b482755b5a91d2c9bc7276069f3693d2d8d8bfb" + "commit": "f0d322252d4becd1b4d183c0a2d62bb904b5b3a6" } diff --git a/src/third_party/wiredtiger/src/conn/conn_handle.c b/src/third_party/wiredtiger/src/conn/conn_handle.c index fb670dae9df..1e64aa61846 100644 --- a/src/third_party/wiredtiger/src/conn/conn_handle.c +++ b/src/third_party/wiredtiger/src/conn/conn_handle.c @@ -28,6 +28,7 @@ __wt_connection_init(WT_CONNECTION_IMPL *conn) TAILQ_INIT(&conn->encryptqh); /* Encryptor list */ TAILQ_INIT(&conn->extractorqh); /* Extractor list */ TAILQ_INIT(&conn->storagesrcqh); /* Storage source list */ + TAILQ_INIT(&conn->tieredqh); /* Tiered work unit list */ TAILQ_INIT(&conn->lsmqh); /* WT_LSM_TREE list */ @@ -54,6 +55,7 @@ __wt_connection_init(WT_CONNECTION_IMPL *conn) WT_RET(__wt_spin_init(session, &conn->reconfig_lock, "reconfigure")); WT_SPIN_INIT_SESSION_TRACKED(session, &conn->schema_lock, schema); WT_RET(__wt_spin_init(session, &conn->storage_lock, "tiered storage")); + WT_RET(__wt_spin_init(session, &conn->tiered_lock, "tiered work unit list")); WT_RET(__wt_spin_init(session, &conn->turtle_lock, "turtle file")); /* Read-write locks */ @@ -120,6 +122,7 @@ __wt_connection_destroy(WT_CONNECTION_IMPL *conn) __wt_spin_destroy(session, &conn->schema_lock); __wt_spin_destroy(session, &conn->storage_lock); __wt_rwlock_destroy(session, &conn->table_lock); + __wt_spin_destroy(session, &conn->tiered_lock); __wt_spin_destroy(session, &conn->turtle_lock); /* Free LSM serialization resources. */ diff --git a/src/third_party/wiredtiger/src/conn/conn_tiered.c b/src/third_party/wiredtiger/src/conn/conn_tiered.c index c2c2b2428d5..25871cce3c0 100644 --- a/src/third_party/wiredtiger/src/conn/conn_tiered.c +++ b/src/third_party/wiredtiger/src/conn/conn_tiered.c @@ -120,24 +120,151 @@ err: } /* - * __tier_storage_copy -- - * Perform one iteration of copying newly flushed objects to the shared storage. + * __tier_flush_meta -- + * Perform one iteration of altering the metadata after a flush. This is in its own function so + * that we can hold the schema lock while doing the metadata tracking. */ static int -__tier_storage_copy(WT_SESSION_IMPL *session) +__tier_flush_meta( + WT_SESSION_IMPL *session, WT_TIERED *tiered, const char *local_uri, const char *obj_uri) { + WT_DATA_HANDLE *dhandle; + WT_DECL_ITEM(buf); + WT_DECL_RET; + uint64_t now; + char *newconfig, *obj_value; + const char *cfg[3] = {NULL, NULL, NULL}; + bool tracking; + + tracking = false; + WT_RET(__wt_scr_alloc(session, 512, &buf)); + dhandle = &tiered->iface; + + newconfig = NULL; + WT_ERR(__wt_meta_track_on(session)); + tracking = true; + + WT_ERR(__wt_session_get_dhandle(session, dhandle->name, NULL, NULL, WT_DHANDLE_EXCLUSIVE)); /* - * Walk the work queue and copy file:<name> to shared storage object:<name>. Walk a tiered - * table's tiers array and copy it to any tier that allows WT_TIERS_OP_FLUSH. + * Once the flush call succeeds we want to first remove the file: entry from the metadata and + * then update the object: metadata to indicate the flush is complete. */ - /* XXX: We don't want to call this here, it is just to quiet the compiler that this function - * can return NULL. So it is a placeholder until we have real content here. + WT_ERR(__wt_metadata_remove(session, local_uri)); + WT_ERR(__wt_metadata_search(session, obj_uri, &obj_value)); + __wt_seconds(session, &now); + WT_ERR(__wt_buf_fmt(session, buf, "flush=%" PRIu64, now)); + cfg[0] = obj_value; + cfg[1] = buf->mem; + WT_ERR(__wt_config_collapse(session, cfg, &newconfig)); + WT_ERR(__wt_metadata_update(session, obj_uri, newconfig)); + WT_ERR(__wt_meta_track_off(session, true, ret != 0)); + tracking = false; + +err: + __wt_free(session, newconfig); + WT_TRET(__wt_session_release_dhandle(session)); + __wt_scr_free(session, &buf); + if (tracking) + WT_TRET(__wt_meta_track_off(session, true, ret != 0)); + return (ret); +} + +/* + * __wt_tier_do_flush -- + * Perform one iteration of copying newly flushed objects to the shared storage. + */ +int +__wt_tier_do_flush( + WT_SESSION_IMPL *session, WT_TIERED *tiered, const char *local_uri, const char *obj_uri) +{ + WT_DECL_RET; + WT_FILE_SYSTEM *bucket_fs; + WT_STORAGE_SOURCE *storage_source; + const char *local_name, *obj_name; + + storage_source = tiered->bstorage->storage_source; + bucket_fs = tiered->bstorage->file_system; + + local_name = local_uri; + WT_PREFIX_SKIP_REQUIRED(session, local_name, "file:"); + obj_name = obj_uri; + WT_PREFIX_SKIP_REQUIRED(session, obj_name, "object:"); + + /* This call make take a while, and may fail due to network timeout. */ + WT_RET(storage_source->ss_flush( + storage_source, &session->iface, bucket_fs, local_name, obj_name, NULL)); + + WT_WITH_CHECKPOINT_LOCK(session, + WT_WITH_SCHEMA_LOCK(session, ret = __tier_flush_meta(session, tiered, local_uri, obj_uri))); + WT_RET(ret); + + /* + * We may need a way to cleanup flushes for those not completed (after a crash), or failed (due + * to previous network outage). */ - WT_RET(__tier_storage_remove_local(session, NULL, 0)); + WT_RET(storage_source->ss_flush_finish( + storage_source, &session->iface, bucket_fs, local_name, obj_name, NULL)); return (0); } /* + * __wt_tier_flush -- + * Given an ID generate the URI names and call the flush code. + */ +int +__wt_tier_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id) +{ + WT_DECL_RET; + const char *local_uri, *obj_uri; + + local_uri = obj_uri = NULL; + WT_ERR(__wt_tiered_name(session, &tiered->iface, id, WT_TIERED_NAME_LOCAL, &local_uri)); + WT_ERR(__wt_tiered_name(session, &tiered->iface, id, WT_TIERED_NAME_OBJECT, &obj_uri)); + WT_ERR(__wt_tier_do_flush(session, tiered, local_uri, obj_uri)); + +err: + __wt_free(session, local_uri); + __wt_free(session, obj_uri); + return (ret); +} + +/* + * __tier_storage_copy -- + * Perform one iteration of copying newly flushed objects to the shared storage. + */ +static int +__tier_storage_copy(WT_SESSION_IMPL *session) +{ + WT_DECL_RET; + WT_TIERED_WORK_UNIT *entry; + + entry = NULL; + for (;;) { + /* + * We probably need some kind of flush generation so that we don't process flush items for + * tables that are added during an in-progress flush_tier. This thread could run due to a + * condition timeout rather than a signal. Checking that generation number would be part of + * calling __wt_tiered_get_flush so that we don't pull it off the queue until we're sure we + * want to process it. + */ + __wt_tiered_get_flush(session, &entry); + if (entry == NULL) + break; + WT_ERR(__wt_tier_flush(session, entry->tiered, entry->id)); + /* + * We are responsible for freeing the work unit when we're done with it. + */ + __wt_free(session, entry); + entry = NULL; + } + +err: + if (entry != NULL) + __wt_free(session, entry); + return (ret); +} + +/* * __tier_storage_remove -- * Perform one iteration of tiered storage local tier removal. */ @@ -238,6 +365,8 @@ __tiered_server(void *arg) WT_DECL_RET; WT_ITEM path, tmp; WT_SESSION_IMPL *session; + uint64_t cond_time, time_start, time_stop, timediff; + bool signalled; session = arg; conn = S2C(session); @@ -245,21 +374,31 @@ __tiered_server(void *arg) WT_CLEAR(path); WT_CLEAR(tmp); + /* Condition timeout is in microseconds. */ + cond_time = WT_MINUTE * WT_MILLION; + time_start = __wt_clock(session); + signalled = false; for (;;) { /* Wait until the next event. */ - __wt_cond_wait(session, conn->tiered_cond, WT_MINUTE, __tiered_server_run_chk); + __wt_cond_wait_signal( + session, conn->tiered_cond, cond_time, __tiered_server_run_chk, &signalled); /* Check if we're quitting or being reconfigured. */ if (!__tiered_server_run_chk(session)) break; + time_stop = __wt_clock(session); + timediff = WT_CLOCKDIFF_SEC(time_stop, time_start); /* * Here is where we do work. Work we expect to do: * - Copy any files that need moving from a flush tier call. * - Remove any cached objects that are aged out. */ - WT_ERR(__tier_storage_copy(session)); - WT_ERR(__tier_storage_remove(session, false)); + if (timediff >= WT_MINUTE || signalled) { + WT_ERR(__tier_storage_copy(session)); + WT_ERR(__tier_storage_remove(session, false)); + } + time_start = time_stop; } if (0) { @@ -340,7 +479,7 @@ __tiered_mgr_start(WT_CONNECTION_IMPL *conn) FLD_SET(conn->server_flags, WT_CONN_SERVER_TIERED_MGR); WT_RET(__wt_open_internal_session( - conn, "storage-mgr-server", true, 0, 0, &conn->tiered_mgr_session)); + conn, "storage-mgr-server", false, 0, 0, &conn->tiered_mgr_session)); session = conn->tiered_mgr_session; WT_RET(__wt_cond_alloc(session, "storage server", &conn->tiered_mgr_cond)); @@ -372,12 +511,12 @@ __wt_tiered_storage_create(WT_SESSION_IMPL *session, const char *cfg[], bool rec WT_RET(__tiered_manager_config(session, cfg, &start)); /* Start the internal thread. */ + WT_ERR(__wt_cond_alloc(session, "storage server", &conn->tiered_cond)); FLD_SET(conn->server_flags, WT_CONN_SERVER_TIERED); WT_ERR(__wt_open_internal_session(conn, "storage-server", true, 0, 0, &conn->tiered_session)); session = conn->tiered_session; - - WT_ERR(__wt_cond_alloc(session, "storage server", &conn->tiered_cond)); + WT_ERR(__wt_txn_reconfigure(session, "isolation=read-uncommitted")); /* Start the thread. */ WT_ERR(__wt_thread_create(session, &conn->tiered_tid, __tiered_server, session)); @@ -389,6 +528,7 @@ __wt_tiered_storage_create(WT_SESSION_IMPL *session, const char *cfg[], bool rec if (0) { err: + FLD_CLR(conn->server_flags, WT_CONN_SERVER_TIERED); WT_TRET(__wt_tiered_storage_destroy(session)); } return (ret); @@ -403,6 +543,7 @@ __wt_tiered_storage_destroy(WT_SESSION_IMPL *session) { WT_CONNECTION_IMPL *conn; WT_DECL_RET; + WT_TIERED_WORK_UNIT *entry; conn = S2C(session); @@ -412,6 +553,10 @@ __wt_tiered_storage_destroy(WT_SESSION_IMPL *session) __wt_cond_signal(session, conn->tiered_cond); WT_TRET(__wt_thread_join(session, &conn->tiered_tid)); conn->tiered_tid_set = false; + while ((entry = TAILQ_FIRST(&conn->tieredqh)) != NULL) { + TAILQ_REMOVE(&conn->tieredqh, entry, q); + __wt_free(session, entry); + } } __wt_cond_destroy(session, &conn->tiered_cond); if (conn->tiered_session != NULL) { diff --git a/src/third_party/wiredtiger/src/include/connection.h b/src/third_party/wiredtiger/src/include/connection.h index 4d78b695ec8..b13ab2b911e 100644 --- a/src/third_party/wiredtiger/src/include/connection.h +++ b/src/third_party/wiredtiger/src/include/connection.h @@ -229,6 +229,7 @@ struct __wt_connection_impl { WT_SPINLOCK reconfig_lock; /* Single thread reconfigure */ WT_SPINLOCK schema_lock; /* Schema operation spinlock */ WT_RWLOCK table_lock; /* Table list lock */ + WT_SPINLOCK tiered_lock; /* Tiered work queue spinlock */ WT_SPINLOCK turtle_lock; /* Turtle file spinlock */ WT_RWLOCK dhandle_lock; /* Data handle list lock */ @@ -278,13 +279,15 @@ struct __wt_connection_impl { TAILQ_HEAD(__wt_dhhash, __wt_data_handle) * dhhash; /* Locked: data handle list */ TAILQ_HEAD(__wt_dhandle_qh, __wt_data_handle) dhqh; - /* Locked: LSM handle list. */ - TAILQ_HEAD(__wt_lsm_qh, __wt_lsm_tree) lsmqh; + /* Locked: dynamic library handle list */ + TAILQ_HEAD(__wt_dlh_qh, __wt_dlh) dlhqh; /* Locked: file list */ TAILQ_HEAD(__wt_fhhash, __wt_fh) * fhhash; TAILQ_HEAD(__wt_fh_qh, __wt_fh) fhqh; - /* Locked: library list */ - TAILQ_HEAD(__wt_dlh_qh, __wt_dlh) dlhqh; + /* Locked: LSM handle list. */ + TAILQ_HEAD(__wt_lsm_qh, __wt_lsm_tree) lsmqh; + /* Locked: Tiered system work queue. */ + TAILQ_HEAD(__wt_tiered_qh, __wt_tiered_work_unit) tieredqh; WT_SPINLOCK block_lock; /* Locked: block manager list */ TAILQ_HEAD(__wt_blockhash, __wt_block) * blockhash; @@ -413,6 +416,7 @@ struct __wt_connection_impl { const char *stat_stamp; /* Statistics log entry timestamp */ uint64_t stat_usecs; /* Statistics log period */ + uint64_t tiered_retention; /* Earliest time to check to remove local overlap copies */ WT_SESSION_IMPL *tiered_session; /* Tiered thread session */ wt_thread_t tiered_tid; /* Tiered thread */ bool tiered_tid_set; /* Tiered thread set */ diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h index 3d0eb0554fd..8068e402dfd 100644 --- a/src/third_party/wiredtiger/src/include/extern.h +++ b/src/third_party/wiredtiger/src/include/extern.h @@ -1452,6 +1452,10 @@ extern int __wt_thread_group_destroy(WT_SESSION_IMPL *session, WT_THREAD_GROUP * extern int __wt_thread_group_resize(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, uint32_t new_min, uint32_t new_max, uint32_t flags) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_tier_do_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, const char *local_uri, + const char *obj_uri) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_tier_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_tiered_bucket_config(WT_SESSION_IMPL *session, const char *cfg[], WT_BUCKET_STORAGE **bstoragep) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_tiered_close(WT_SESSION_IMPL *session, WT_TIERED *tiered) @@ -1462,6 +1466,12 @@ extern int __wt_tiered_name(WT_SESSION_IMPL *session, WT_DATA_HANDLE *dhandle, u uint32_t flags, const char **retp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_tiered_open(WT_SESSION_IMPL *session, const char *cfg[]) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_tiered_put_drop_local(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_tiered_put_drop_shared(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_tiered_put_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_tiered_storage_create(WT_SESSION_IMPL *session, const char *cfg[], bool reconfig) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_tiered_storage_destroy(WT_SESSION_IMPL *session) @@ -1807,6 +1817,13 @@ extern void __wt_stat_session_init_single(WT_SESSION_STATS *stats); extern void __wt_thread_group_start_one( WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool is_locked); extern void __wt_thread_group_stop_one(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group); +extern void __wt_tiered_get_drop_local( + WT_SESSION_IMPL *session, uint64_t now, WT_TIERED_WORK_UNIT **entryp); +extern void __wt_tiered_get_drop_shared(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp); +extern void __wt_tiered_get_flush(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp); +extern void __wt_tiered_pop_work( + WT_SESSION_IMPL *session, uint32_t type, uint64_t maxval, WT_TIERED_WORK_UNIT **entryp); +extern void __wt_tiered_push_work(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT *entry); extern void __wt_timestamp_to_hex_string(wt_timestamp_t ts, char *hex_timestamp); extern void __wt_txn_bump_snapshot(WT_SESSION_IMPL *session); extern void __wt_txn_clear_durable_timestamp(WT_SESSION_IMPL *session); diff --git a/src/third_party/wiredtiger/src/include/stat.h b/src/third_party/wiredtiger/src/include/stat.h index 540c79187b4..3f4bd5150e3 100644 --- a/src/third_party/wiredtiger/src/include/stat.h +++ b/src/third_party/wiredtiger/src/include/stat.h @@ -777,6 +777,8 @@ struct __wt_connection_stats { int64_t rec_time_window_durable_stop_ts; int64_t rec_time_window_stop_ts; int64_t rec_time_window_stop_txn; + int64_t tiered_work_units_dequeued; + int64_t tiered_work_units_created; int64_t tiered_retention; int64_t tiered_object_size; int64_t txn_read_race_prepare_update; @@ -996,6 +998,8 @@ struct __wt_dsrc_stats { int64_t rec_time_window_durable_stop_ts; int64_t rec_time_window_stop_ts; int64_t rec_time_window_stop_txn; + int64_t tiered_work_units_dequeued; + int64_t tiered_work_units_created; int64_t tiered_retention; int64_t tiered_object_size; int64_t txn_read_race_prepare_update; diff --git a/src/third_party/wiredtiger/src/include/tiered.h b/src/third_party/wiredtiger/src/include/tiered.h index 6182215d235..8fa2fec35a7 100644 --- a/src/third_party/wiredtiger/src/include/tiered.h +++ b/src/third_party/wiredtiger/src/include/tiered.h @@ -47,6 +47,32 @@ struct __wt_tiered_manager { /* AUTOMATIC FLAG VALUE GENERATION STOP */ /* + * Different types of work units for tiered trees. + */ +/* AUTOMATIC FLAG VALUE GENERATION START */ +#define WT_TIERED_WORK_DROP_LOCAL 0x1u /* Drop object from local storage. */ +#define WT_TIERED_WORK_DROP_SHARED 0x2u /* Drop object from tier. */ +#define WT_TIERED_WORK_FLUSH 0x4u /* Flush object to tier. */ +/* AUTOMATIC FLAG VALUE GENERATION STOP */ + +/* + * WT_TIERED_WORK_UNIT -- + * A definition of maintenance that a tiered tree needs done. + */ +struct __wt_tiered_work_unit { + TAILQ_ENTRY(__wt_tiered_work_unit) q; /* Worker unit queue */ + uint32_t type; /* Type of operation */ + uint64_t op_val; /* A value for the operation */ + WT_TIERED *tiered; /* Tiered tree */ + uint64_t id; /* Id of the object */ +/* AUTOMATIC FLAG VALUE GENERATION START */ +#define WT_TIERED_WORK_FORCE 0x1u /* Force operation */ +#define WT_TIERED_WORK_FREE 0x2u /* Free data after operation */ + /* AUTOMATIC FLAG VALUE GENERATION STOP */ + uint32_t flags; /* Flags for operation */ +}; + +/* * WT_TIERED_TIERS -- * Information we need to keep about each tier such as its data handle and name. * We define operations that each tier can accept. The local tier should be able to accept diff --git a/src/third_party/wiredtiger/src/include/wiredtiger.in b/src/third_party/wiredtiger/src/include/wiredtiger.in index 8c626235ec9..c0e37648b07 100644 --- a/src/third_party/wiredtiger/src/include/wiredtiger.in +++ b/src/third_party/wiredtiger/src/include/wiredtiger.in @@ -6067,35 +6067,39 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection); #define WT_STAT_CONN_REC_TIME_WINDOW_STOP_TS 1462 /*! reconciliation: records written including a stop transaction ID */ #define WT_STAT_CONN_REC_TIME_WINDOW_STOP_TXN 1463 +/*! session: tiered operations dequeued and processed */ +#define WT_STAT_CONN_TIERED_WORK_UNITS_DEQUEUED 1464 +/*! session: tiered operations scheduled */ +#define WT_STAT_CONN_TIERED_WORK_UNITS_CREATED 1465 /*! session: tiered storage local retention time (secs) */ -#define WT_STAT_CONN_TIERED_RETENTION 1464 +#define WT_STAT_CONN_TIERED_RETENTION 1466 /*! session: tiered storage object size */ -#define WT_STAT_CONN_TIERED_OBJECT_SIZE 1465 +#define WT_STAT_CONN_TIERED_OBJECT_SIZE 1467 /*! transaction: race to read prepared update retry */ -#define WT_STAT_CONN_TXN_READ_RACE_PREPARE_UPDATE 1466 +#define WT_STAT_CONN_TXN_READ_RACE_PREPARE_UPDATE 1468 /*! * transaction: rollback to stable history store records with stop * timestamps older than newer records */ -#define WT_STAT_CONN_TXN_RTS_HS_STOP_OLDER_THAN_NEWER_START 1467 +#define WT_STAT_CONN_TXN_RTS_HS_STOP_OLDER_THAN_NEWER_START 1469 /*! transaction: rollback to stable inconsistent checkpoint */ -#define WT_STAT_CONN_TXN_RTS_INCONSISTENT_CKPT 1468 +#define WT_STAT_CONN_TXN_RTS_INCONSISTENT_CKPT 1470 /*! transaction: rollback to stable keys removed */ -#define WT_STAT_CONN_TXN_RTS_KEYS_REMOVED 1469 +#define WT_STAT_CONN_TXN_RTS_KEYS_REMOVED 1471 /*! transaction: rollback to stable keys restored */ -#define WT_STAT_CONN_TXN_RTS_KEYS_RESTORED 1470 +#define WT_STAT_CONN_TXN_RTS_KEYS_RESTORED 1472 /*! transaction: rollback to stable restored tombstones from history store */ -#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_TOMBSTONES 1471 +#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_TOMBSTONES 1473 /*! transaction: rollback to stable restored updates from history store */ -#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_UPDATES 1472 +#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_UPDATES 1474 /*! transaction: rollback to stable sweeping history store keys */ -#define WT_STAT_CONN_TXN_RTS_SWEEP_HS_KEYS 1473 +#define WT_STAT_CONN_TXN_RTS_SWEEP_HS_KEYS 1475 /*! transaction: rollback to stable updates removed from history store */ -#define WT_STAT_CONN_TXN_RTS_HS_REMOVED 1474 +#define WT_STAT_CONN_TXN_RTS_HS_REMOVED 1476 /*! transaction: transaction checkpoints due to obsolete pages */ -#define WT_STAT_CONN_TXN_CHECKPOINT_OBSOLETE_APPLIED 1475 +#define WT_STAT_CONN_TXN_CHECKPOINT_OBSOLETE_APPLIED 1477 /*! transaction: update conflicts */ -#define WT_STAT_CONN_TXN_UPDATE_CONFLICT 1476 +#define WT_STAT_CONN_TXN_UPDATE_CONFLICT 1478 /*! * @} @@ -6691,35 +6695,39 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection); #define WT_STAT_DSRC_REC_TIME_WINDOW_STOP_TS 2197 /*! reconciliation: records written including a stop transaction ID */ #define WT_STAT_DSRC_REC_TIME_WINDOW_STOP_TXN 2198 +/*! session: tiered operations dequeued and processed */ +#define WT_STAT_DSRC_TIERED_WORK_UNITS_DEQUEUED 2199 +/*! session: tiered operations scheduled */ +#define WT_STAT_DSRC_TIERED_WORK_UNITS_CREATED 2200 /*! session: tiered storage local retention time (secs) */ -#define WT_STAT_DSRC_TIERED_RETENTION 2199 +#define WT_STAT_DSRC_TIERED_RETENTION 2201 /*! session: tiered storage object size */ -#define WT_STAT_DSRC_TIERED_OBJECT_SIZE 2200 +#define WT_STAT_DSRC_TIERED_OBJECT_SIZE 2202 /*! transaction: race to read prepared update retry */ -#define WT_STAT_DSRC_TXN_READ_RACE_PREPARE_UPDATE 2201 +#define WT_STAT_DSRC_TXN_READ_RACE_PREPARE_UPDATE 2203 /*! * transaction: rollback to stable history store records with stop * timestamps older than newer records */ -#define WT_STAT_DSRC_TXN_RTS_HS_STOP_OLDER_THAN_NEWER_START 2202 +#define WT_STAT_DSRC_TXN_RTS_HS_STOP_OLDER_THAN_NEWER_START 2204 /*! transaction: rollback to stable inconsistent checkpoint */ -#define WT_STAT_DSRC_TXN_RTS_INCONSISTENT_CKPT 2203 +#define WT_STAT_DSRC_TXN_RTS_INCONSISTENT_CKPT 2205 /*! transaction: rollback to stable keys removed */ -#define WT_STAT_DSRC_TXN_RTS_KEYS_REMOVED 2204 +#define WT_STAT_DSRC_TXN_RTS_KEYS_REMOVED 2206 /*! transaction: rollback to stable keys restored */ -#define WT_STAT_DSRC_TXN_RTS_KEYS_RESTORED 2205 +#define WT_STAT_DSRC_TXN_RTS_KEYS_RESTORED 2207 /*! transaction: rollback to stable restored tombstones from history store */ -#define WT_STAT_DSRC_TXN_RTS_HS_RESTORE_TOMBSTONES 2206 +#define WT_STAT_DSRC_TXN_RTS_HS_RESTORE_TOMBSTONES 2208 /*! transaction: rollback to stable restored updates from history store */ -#define WT_STAT_DSRC_TXN_RTS_HS_RESTORE_UPDATES 2207 +#define WT_STAT_DSRC_TXN_RTS_HS_RESTORE_UPDATES 2209 /*! transaction: rollback to stable sweeping history store keys */ -#define WT_STAT_DSRC_TXN_RTS_SWEEP_HS_KEYS 2208 +#define WT_STAT_DSRC_TXN_RTS_SWEEP_HS_KEYS 2210 /*! transaction: rollback to stable updates removed from history store */ -#define WT_STAT_DSRC_TXN_RTS_HS_REMOVED 2209 +#define WT_STAT_DSRC_TXN_RTS_HS_REMOVED 2211 /*! transaction: transaction checkpoints due to obsolete pages */ -#define WT_STAT_DSRC_TXN_CHECKPOINT_OBSOLETE_APPLIED 2210 +#define WT_STAT_DSRC_TXN_CHECKPOINT_OBSOLETE_APPLIED 2212 /*! transaction: update conflicts */ -#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2211 +#define WT_STAT_DSRC_TXN_UPDATE_CONFLICT 2213 /*! * @} diff --git a/src/third_party/wiredtiger/src/include/wt_internal.h b/src/third_party/wiredtiger/src/include/wt_internal.h index 0afc5000662..4f896a73525 100644 --- a/src/third_party/wiredtiger/src/include/wt_internal.h +++ b/src/third_party/wiredtiger/src/include/wt_internal.h @@ -325,6 +325,8 @@ struct __wt_tiered_tiers; typedef struct __wt_tiered_tiers WT_TIERED_TIERS; struct __wt_tiered_tree; typedef struct __wt_tiered_tree WT_TIERED_TREE; +struct __wt_tiered_work_unit; +typedef struct __wt_tiered_work_unit WT_TIERED_WORK_UNIT; struct __wt_time_aggregate; typedef struct __wt_time_aggregate WT_TIME_AGGREGATE; struct __wt_time_window; diff --git a/src/third_party/wiredtiger/src/schema/schema_alter.c b/src/third_party/wiredtiger/src/schema/schema_alter.c index 930fdedba8b..3bc35ddcd89 100644 --- a/src/third_party/wiredtiger/src/schema/schema_alter.c +++ b/src/third_party/wiredtiger/src/schema/schema_alter.c @@ -79,6 +79,19 @@ __alter_file(WT_SESSION_IMPL *session, const char *newcfg[]) } /* + * __alter_object -- + * Alter a tiered object. There are no object dhandles. + */ +static int +__alter_object(WT_SESSION_IMPL *session, const char *uri, const char *newcfg[]) +{ + if (!WT_PREFIX_MATCH(uri, "object:")) + return (__wt_unexpected_object_type(session, uri, "object:")); + + return (__alter_apply(session, uri, newcfg, WT_CONFIG_BASE(session, object_meta))); +} + +/* * __alter_tree -- * Alter an index or colgroup reference. */ @@ -218,6 +231,8 @@ __schema_alter(WT_SESSION_IMPL *session, const char *uri, const char *newcfg[]) return (__alter_tree(session, uri, newcfg)); if (WT_PREFIX_MATCH(uri, "lsm:")) return (__wt_lsm_tree_worker(session, uri, __alter_file, NULL, newcfg, flags)); + if (WT_PREFIX_MATCH(uri, "object:")) + return (__alter_object(session, uri, newcfg)); if (WT_PREFIX_MATCH(uri, "table:")) return (__alter_table(session, uri, newcfg, exclusive_refreshed)); if (WT_PREFIX_MATCH(uri, "tiered:")) diff --git a/src/third_party/wiredtiger/src/support/err.c b/src/third_party/wiredtiger/src/support/err.c index f4c20e02746..81212ff65da 100644 --- a/src/third_party/wiredtiger/src/support/err.c +++ b/src/third_party/wiredtiger/src/support/err.c @@ -573,8 +573,9 @@ __wt_bad_object_type(WT_SESSION_IMPL *session, const char *uri) WT_GCC_FUNC_ATTR if (WT_PREFIX_MATCH(uri, "backup:") || WT_PREFIX_MATCH(uri, "colgroup:") || WT_PREFIX_MATCH(uri, "config:") || WT_PREFIX_MATCH(uri, "file:") || WT_PREFIX_MATCH(uri, "index:") || WT_PREFIX_MATCH(uri, "log:") || - WT_PREFIX_MATCH(uri, "lsm:") || WT_PREFIX_MATCH(uri, "statistics:") || - WT_PREFIX_MATCH(uri, "table:") || WT_PREFIX_MATCH(uri, "tiered:")) + WT_PREFIX_MATCH(uri, "lsm:") || WT_PREFIX_MATCH(uri, "object:") || + WT_PREFIX_MATCH(uri, "statistics:") || WT_PREFIX_MATCH(uri, "table:") || + WT_PREFIX_MATCH(uri, "tiered:")) return (__wt_object_unsupported(session, uri)); WT_RET_MSG(session, ENOTSUP, "unknown object type: %s", uri); diff --git a/src/third_party/wiredtiger/src/support/stat.c b/src/third_party/wiredtiger/src/support/stat.c index fb9f7870a54..647d87bf9fb 100644 --- a/src/third_party/wiredtiger/src/support/stat.c +++ b/src/third_party/wiredtiger/src/support/stat.c @@ -207,6 +207,8 @@ static const char *const __stats_dsrc_desc[] = { "reconciliation: records written including a stop durable timestamp", "reconciliation: records written including a stop timestamp", "reconciliation: records written including a stop transaction ID", + "session: tiered operations dequeued and processed", + "session: tiered operations scheduled", "session: tiered storage local retention time (secs)", "session: tiered storage object size", "transaction: race to read prepared update retry", @@ -460,6 +462,8 @@ __wt_stat_dsrc_clear_single(WT_DSRC_STATS *stats) stats->rec_time_window_durable_stop_ts = 0; stats->rec_time_window_stop_ts = 0; stats->rec_time_window_stop_txn = 0; + stats->tiered_work_units_dequeued = 0; + stats->tiered_work_units_created = 0; /* not clearing tiered_retention */ /* not clearing tiered_object_size */ stats->txn_read_race_prepare_update = 0; @@ -699,6 +703,8 @@ __wt_stat_dsrc_aggregate_single(WT_DSRC_STATS *from, WT_DSRC_STATS *to) to->rec_time_window_durable_stop_ts += from->rec_time_window_durable_stop_ts; to->rec_time_window_stop_ts += from->rec_time_window_stop_ts; to->rec_time_window_stop_txn += from->rec_time_window_stop_txn; + to->tiered_work_units_dequeued += from->tiered_work_units_dequeued; + to->tiered_work_units_created += from->tiered_work_units_created; to->tiered_retention += from->tiered_retention; to->tiered_object_size += from->tiered_object_size; to->txn_read_race_prepare_update += from->txn_read_race_prepare_update; @@ -945,6 +951,8 @@ __wt_stat_dsrc_aggregate(WT_DSRC_STATS **from, WT_DSRC_STATS *to) to->rec_time_window_durable_stop_ts += WT_STAT_READ(from, rec_time_window_durable_stop_ts); to->rec_time_window_stop_ts += WT_STAT_READ(from, rec_time_window_stop_ts); to->rec_time_window_stop_txn += WT_STAT_READ(from, rec_time_window_stop_txn); + to->tiered_work_units_dequeued += WT_STAT_READ(from, tiered_work_units_dequeued); + to->tiered_work_units_created += WT_STAT_READ(from, tiered_work_units_created); to->tiered_retention += WT_STAT_READ(from, tiered_retention); to->tiered_object_size += WT_STAT_READ(from, tiered_object_size); to->txn_read_race_prepare_update += WT_STAT_READ(from, txn_read_race_prepare_update); @@ -1436,6 +1444,8 @@ static const char *const __stats_connection_desc[] = { "reconciliation: records written including a stop durable timestamp", "reconciliation: records written including a stop timestamp", "reconciliation: records written including a stop transaction ID", + "session: tiered operations dequeued and processed", + "session: tiered operations scheduled", "session: tiered storage local retention time (secs)", "session: tiered storage object size", "transaction: race to read prepared update retry", @@ -1954,6 +1964,8 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats) stats->rec_time_window_durable_stop_ts = 0; stats->rec_time_window_stop_ts = 0; stats->rec_time_window_stop_txn = 0; + stats->tiered_work_units_dequeued = 0; + stats->tiered_work_units_created = 0; /* not clearing tiered_retention */ /* not clearing tiered_object_size */ stats->txn_read_race_prepare_update = 0; @@ -2483,6 +2495,8 @@ __wt_stat_connection_aggregate(WT_CONNECTION_STATS **from, WT_CONNECTION_STATS * to->rec_time_window_durable_stop_ts += WT_STAT_READ(from, rec_time_window_durable_stop_ts); to->rec_time_window_stop_ts += WT_STAT_READ(from, rec_time_window_stop_ts); to->rec_time_window_stop_txn += WT_STAT_READ(from, rec_time_window_stop_txn); + to->tiered_work_units_dequeued += WT_STAT_READ(from, tiered_work_units_dequeued); + to->tiered_work_units_created += WT_STAT_READ(from, tiered_work_units_created); to->tiered_retention += WT_STAT_READ(from, tiered_retention); to->tiered_object_size += WT_STAT_READ(from, tiered_object_size); to->txn_read_race_prepare_update += WT_STAT_READ(from, txn_read_race_prepare_update); diff --git a/src/third_party/wiredtiger/src/tiered/tiered_handle.c b/src/third_party/wiredtiger/src/tiered/tiered_handle.c index 363a9c97140..11ecd0a4941 100644 --- a/src/third_party/wiredtiger/src/tiered/tiered_handle.c +++ b/src/third_party/wiredtiger/src/tiered/tiered_handle.c @@ -138,7 +138,7 @@ __tiered_create_object(WT_SESSION_IMPL *session, WT_TIERED *tiered) orig_name = tiered->tiers[WT_TIERED_INDEX_LOCAL].name; /* * If we have an existing local file in the tier, alter the table to indicate this one is now - * readonly. + * readonly. We are already holding the schema lock so we can call alter. */ if (orig_name != NULL) { cfg[0] = "readonly=true"; @@ -161,13 +161,6 @@ __tiered_create_object(WT_SESSION_IMPL *session, WT_TIERED *tiered) /* Create the new shared object. */ WT_ERR(__wt_schema_create(session, name, config)); -#if 0 - /* - * If we get here we have successfully created the object. It is ready to be fully flushed to - * the cloud. Push a work element to let the internal thread do that here. - */ -#endif - err: __wt_free(session, config); __wt_free(session, name); @@ -313,10 +306,6 @@ static int __tiered_switch(WT_SESSION_IMPL *session, const char *config) { WT_DECL_RET; -#if 0 - WT_FILE_SYSTEM *fs; - WT_STORAGE_SOURCE *storage_source; -#endif WT_TIERED *tiered; bool need_object, need_tree, tracking; @@ -357,42 +346,22 @@ __tiered_switch(WT_SESSION_IMPL *session, const char *config) WT_RET(__wt_meta_track_on(session)); tracking = true; - /* Create the object: entry in the metadata. */ - if (need_object) - WT_ERR(__tiered_create_object(session, tiered)); - if (need_tree) WT_ERR(__tiered_create_tier_tree(session, tiered)); + /* Create the object: entry in the metadata. */ + if (need_object) { + WT_ERR(__tiered_create_object(session, tiered)); +#if 1 + WT_ERR(__wt_tiered_put_flush(session, tiered)); +#else + WT_ERR(__wt_tier_flush(session, tiered, tiered->current_id)); +#endif + } + /* We always need to create a local object. */ WT_ERR(__tiered_create_local(session, tiered)); -#if 0 - /* - * We expect this part to be done asynchronously in its own thread. First flush the contents of - * the data file to the new cloud object. - */ - storage_source = tiered->bstorage->storage_source; - fs = tiered->bucket_storage->file_system; - WT_ASSERT(session, storage_source != NULL); - - /* This call make take a while, and may fail due to network timeout. */ - WT_ERR(storage_source->ss_flush(storage_source, &session->iface, - fs, old_filename, object_name, NULL)); - - /* - * The metadata for the old local object will be initialized with "flush=0". When the flush call - * completes, it can be marked as "flush=1". When that's done, we can finish the flush. The - * flush finish call moves the file from the home directory to the extension's cache. Then the - * extension will own it. - * - * We may need a way to restart flushes for those not completed (after a crash), or failed (due - * to previous network outage). - */ - WT_ERR(storage_source->ss_flush_finish(storage_source, &session->iface, - fs, old_filename, object_name, NULL)); -#endif - /* Update the tiered: metadata to new object number and tiered array. */ WT_ERR(__tiered_update_metadata(session, tiered, config)); tracking = false; @@ -485,7 +454,10 @@ __tiered_open(WT_SESSION_IMPL *session, const char *cfg[]) WT_DECL_ITEM(tmp); WT_DECL_RET; WT_TIERED *tiered; +#if 1 + WT_TIERED_WORK_UNIT *entry; uint32_t unused; +#endif char *metaconf; const char *obj_cfg[] = {WT_CONFIG_BASE(session, object_meta), NULL, NULL}; const char **tiered_cfg, *config; @@ -549,10 +521,17 @@ __tiered_open(WT_SESSION_IMPL *session, const char *cfg[]) __wt_free(session, dhandle->cfg[1]); dhandle->cfg[1] = metaconf; } +#if 1 if (0) { /* Temp code to keep s_all happy. */ FLD_SET(unused, WT_TIERED_OBJ_LOCAL | WT_TIERED_TREE_UNUSED); + FLD_SET(unused, WT_TIERED_WORK_FORCE | WT_TIERED_WORK_FREE); + WT_ERR(__wt_tiered_put_drop_local(session, tiered, tiered->current_id)); + WT_ERR(__wt_tiered_put_drop_shared(session, tiered, tiered->current_id)); + __wt_tiered_get_drop_local(session, 0, &entry); + __wt_tiered_get_drop_shared(session, &entry); } +#endif if (0) { err: diff --git a/src/third_party/wiredtiger/src/tiered/tiered_work.c b/src/third_party/wiredtiger/src/tiered/tiered_work.c new file mode 100644 index 00000000000..728a7a0b3b2 --- /dev/null +++ b/src/third_party/wiredtiger/src/tiered/tiered_work.c @@ -0,0 +1,151 @@ +/*- + * Copyright (c) 2014-present MongoDB, Inc. + * Copyright (c) 2008-2014 WiredTiger, Inc. + * All rights reserved. + * + * See the file LICENSE for redistribution information. + */ + +#include "wt_internal.h" + +/* + * __wt_tiered_push_work -- + * Push a work unit to the queue. Assumes it is passed an already filled out structure. + */ +void +__wt_tiered_push_work(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT *entry) +{ + WT_CONNECTION_IMPL *conn; + + conn = S2C(session); + __wt_spin_lock(session, &conn->tiered_lock); + TAILQ_INSERT_TAIL(&conn->tieredqh, entry, q); + WT_STAT_CONN_INCR(session, tiered_work_units_created); + __wt_spin_unlock(session, &conn->tiered_lock); + __wt_cond_signal(session, conn->tiered_cond); + return; +} + +/* + * __wt_tiered_pop_work -- + * Pop a work unit of the given type from the queue. If a maximum value is given, only return a + * work unit that is less than the maximum value. The caller is responsible for freeing the + * returned work unit structure. + */ +void +__wt_tiered_pop_work( + WT_SESSION_IMPL *session, uint32_t type, uint64_t maxval, WT_TIERED_WORK_UNIT **entryp) +{ + WT_CONNECTION_IMPL *conn; + WT_TIERED_WORK_UNIT *entry; + + *entryp = entry = NULL; + + conn = S2C(session); + if (TAILQ_EMPTY(&conn->tieredqh)) + return; + __wt_spin_lock(session, &conn->tiered_lock); + + TAILQ_FOREACH (entry, &conn->tieredqh, q) { + if (FLD_ISSET(type, entry->type) && (maxval == 0 || entry->op_val < maxval)) { + TAILQ_REMOVE(&conn->tieredqh, entry, q); + WT_STAT_CONN_INCR(session, tiered_work_units_dequeued); + break; + } + } + *entryp = entry; + __wt_spin_unlock(session, &conn->tiered_lock); + return; +} + +/* + * __wt_tiered_get_flush -- + * Get the first flush work unit from the queue. The id information cannot change between our + * caller and here. The caller is responsible for freeing the work unit. + */ +void +__wt_tiered_get_flush(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp) +{ + __wt_tiered_pop_work(session, WT_TIERED_WORK_FLUSH, 0, entryp); + return; +} + +/* + * __wt_tiered_get_drop_local -- + * Get a drop local work unit if it is less than the time given. The caller is responsible for + * freeing the work unit. + */ +void +__wt_tiered_get_drop_local(WT_SESSION_IMPL *session, uint64_t now, WT_TIERED_WORK_UNIT **entryp) +{ + __wt_tiered_pop_work(session, WT_TIERED_WORK_DROP_LOCAL, now, entryp); + return; +} + +/* + * __wt_tiered_get_drop_shared -- + * Get a drop shared work unit. The caller is responsible for freeing the work unit. + */ +void +__wt_tiered_get_drop_shared(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp) +{ + __wt_tiered_pop_work(session, WT_TIERED_WORK_DROP_SHARED, 0, entryp); + return; +} + +/* + * __wt_tiered_put_drop_local -- + * Add a drop local work unit for the given ID to the queue. + */ +int +__wt_tiered_put_drop_local(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id) +{ + WT_TIERED_WORK_UNIT *entry; + uint64_t now; + + WT_RET(__wt_calloc_one(session, &entry)); + entry->type = WT_TIERED_WORK_DROP_LOCAL; + entry->id = id; + WT_ASSERT(session, tiered->bstorage != NULL); + __wt_seconds(session, &now); + /* Put a work unit in the queue with the time this object expires. */ + entry->op_val = now + tiered->bstorage->retain_secs; + entry->tiered = tiered; + __wt_tiered_push_work(session, entry); + return (0); +} + +/* + * __wt_tiered_put_drop_shared -- + * Add a drop shared work unit for the given ID to the queue. + */ +int +__wt_tiered_put_drop_shared(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint64_t id) +{ + WT_TIERED_WORK_UNIT *entry; + + WT_RET(__wt_calloc_one(session, &entry)); + entry->type = WT_TIERED_WORK_DROP_SHARED; + entry->id = id; + entry->tiered = tiered; + __wt_tiered_push_work(session, entry); + return (0); +} + +/* + * __wt_tiered_put_flush -- + * Add a flush work unit to the queue. We're single threaded so the tiered structure's id + * information cannot change between our caller and here. + */ +int +__wt_tiered_put_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered) +{ + WT_TIERED_WORK_UNIT *entry; + + WT_RET(__wt_calloc_one(session, &entry)); + entry->type = WT_TIERED_WORK_FLUSH; + entry->id = tiered->current_id; + entry->tiered = tiered; + __wt_tiered_push_work(session, entry); + return (0); +} diff --git a/src/third_party/wiredtiger/test/suite/test_tiered04.py b/src/third_party/wiredtiger/test/suite/test_tiered04.py index 75d4fac1b19..e442fc23f14 100755 --- a/src/third_party/wiredtiger/test/suite/test_tiered04.py +++ b/src/third_party/wiredtiger/test/suite/test_tiered04.py @@ -35,7 +35,7 @@ StorageSource = wiredtiger.StorageSource # easy access to constants class test_tiered04(wttest.WiredTigerTestCase): # If the 'uri' changes all the other names must change with it. - fileuri = 'file:test_tiered04-0000000001.wtobj' + fileuri_base = 'file:test_tiered04-000000000' objuri = 'object:test_tiered04-0000000001.wtobj' tiereduri = "tiered:test_tiered04" uri = "table:test_tiered04" @@ -118,12 +118,15 @@ class test_tiered04(wttest.WiredTigerTestCase): self.pr("flush tier again") self.session.flush_tier(None) calls = self.get_stat(stat.conn.flush_tier, None) - self.assertEqual(calls, 2) + flush = 2 + self.assertEqual(calls, flush) obj = self.get_stat(stat.conn.tiered_object_size, None) self.assertEqual(obj, self.object_sys_val) + # As we flush each object, we are currently removing the file: object. So N + 1 exists. + fileuri = self.fileuri_base + str(flush + 1) + '.wtobj' self.check_metadata(self.tiereduri, intl_page) - self.check_metadata(self.fileuri, intl_page) + self.check_metadata(fileuri, intl_page) self.check_metadata(self.objuri, intl_page) #self.pr("verify stats") @@ -155,11 +158,11 @@ class test_tiered04(wttest.WiredTigerTestCase): config = 'tiered_storage=(local_retention=%d)' % new self.pr("reconfigure") self.conn.reconfigure(config) - self.session.flush_tier(None) retain = self.get_stat(stat.conn.tiered_retention, None) - calls = self.get_stat(stat.conn.flush_tier, None) self.assertEqual(retain, new) - self.assertEqual(calls, 5) + #self.session.flush_tier(None) + #calls = self.get_stat(stat.conn.flush_tier, None) + #self.assertEqual(calls, 5) if __name__ == '__main__': wttest.run() |