diff options
-rw-r--r-- | src/third_party/wiredtiger/dist/api_data.py | 2 | ||||
-rw-r--r-- | src/third_party/wiredtiger/import.data | 2 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/config/config_def.c | 10 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/conn/conn_api.c | 3 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/conn/conn_tiered.c | 111 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/include/connection.h | 1 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/include/extern.h | 7 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/include/tiered.h | 7 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/tiered/tiered_work.c | 29 | ||||
-rwxr-xr-x | src/third_party/wiredtiger/test/suite/test_tiered09.py | 6 | ||||
-rwxr-xr-x | src/third_party/wiredtiger/test/suite/test_tiered10.py | 4 | ||||
-rwxr-xr-x | src/third_party/wiredtiger/test/suite/test_tiered12.py | 104 |
12 files changed, 238 insertions, 48 deletions
diff --git a/src/third_party/wiredtiger/dist/api_data.py b/src/third_party/wiredtiger/dist/api_data.py index ab6656652ec..247ceefd2e4 100644 --- a/src/third_party/wiredtiger/dist/api_data.py +++ b/src/third_party/wiredtiger/dist/api_data.py @@ -867,7 +867,7 @@ connection_runtime_config = [ 'compact_slow', 'failpoint_history_store_delete_key_from_ts', 'history_store_checkpoint_delay', 'history_store_search', 'history_store_sweep_race', 'prepare_checkpoint_delay', 'split_1', 'split_2', 'split_3', 'split_4', 'split_5', - 'split_6', 'split_7']), + 'split_6', 'split_7', 'tiered_flush_finish']), Config('verbose', '[]', r''' enable messages for various subsystems and operations. Options are given as a list, where each message type can optionally define an associated verbosity level, such as diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index aa0b330a96d..05466397192 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-5.2", - "commit": "05cc96924e7ac750f7c4625cdb86b5dc0f6821ab" + "commit": "83e4d3a21b49584a77004ee586ee58a6c428b5c6" } diff --git a/src/third_party/wiredtiger/src/config/config_def.c b/src/third_party/wiredtiger/src/config/config_def.c index 4f8ffae0d47..d5f2db566f3 100644 --- a/src/third_party/wiredtiger/src/config/config_def.c +++ b/src/third_party/wiredtiger/src/config/config_def.c @@ -164,7 +164,7 @@ static const WT_CONFIG_CHECK confchk_WT_CONNECTION_reconfigure[] = { "\"history_store_checkpoint_delay\",\"history_store_search\"," "\"history_store_sweep_race\",\"prepare_checkpoint_delay\"," "\"split_1\",\"split_2\",\"split_3\",\"split_4\",\"split_5\"," - "\"split_6\",\"split_7\"]", + "\"split_6\",\"split_7\",\"tiered_flush_finish\"]", NULL, 0}, {"verbose", "list", NULL, "choices=[\"api\",\"backup\",\"block\",\"block_cache\"," @@ -904,7 +904,7 @@ static const WT_CONFIG_CHECK confchk_wiredtiger_open[] = { "\"history_store_checkpoint_delay\",\"history_store_search\"," "\"history_store_sweep_race\",\"prepare_checkpoint_delay\"," "\"split_1\",\"split_2\",\"split_3\",\"split_4\",\"split_5\"," - "\"split_6\",\"split_7\"]", + "\"split_6\",\"split_7\",\"tiered_flush_finish\"]", NULL, 0}, {"transaction_sync", "category", NULL, NULL, confchk_wiredtiger_open_transaction_sync_subconfigs, 2}, @@ -987,7 +987,7 @@ static const WT_CONFIG_CHECK confchk_wiredtiger_open_all[] = { "\"history_store_checkpoint_delay\",\"history_store_search\"," "\"history_store_sweep_race\",\"prepare_checkpoint_delay\"," "\"split_1\",\"split_2\",\"split_3\",\"split_4\",\"split_5\"," - "\"split_6\",\"split_7\"]", + "\"split_6\",\"split_7\",\"tiered_flush_finish\"]", NULL, 0}, {"transaction_sync", "category", NULL, NULL, confchk_wiredtiger_open_transaction_sync_subconfigs, 2}, @@ -1067,7 +1067,7 @@ static const WT_CONFIG_CHECK confchk_wiredtiger_open_basecfg[] = { "\"history_store_checkpoint_delay\",\"history_store_search\"," "\"history_store_sweep_race\",\"prepare_checkpoint_delay\"," "\"split_1\",\"split_2\",\"split_3\",\"split_4\",\"split_5\"," - "\"split_6\",\"split_7\"]", + "\"split_6\",\"split_7\",\"tiered_flush_finish\"]", NULL, 0}, {"transaction_sync", "category", NULL, NULL, confchk_wiredtiger_open_transaction_sync_subconfigs, 2}, @@ -1145,7 +1145,7 @@ static const WT_CONFIG_CHECK confchk_wiredtiger_open_usercfg[] = { "\"history_store_checkpoint_delay\",\"history_store_search\"," "\"history_store_sweep_race\",\"prepare_checkpoint_delay\"," "\"split_1\",\"split_2\",\"split_3\",\"split_4\",\"split_5\"," - "\"split_6\",\"split_7\"]", + "\"split_6\",\"split_7\",\"tiered_flush_finish\"]", NULL, 0}, {"transaction_sync", "category", NULL, NULL, confchk_wiredtiger_open_transaction_sync_subconfigs, 2}, diff --git a/src/third_party/wiredtiger/src/conn/conn_api.c b/src/third_party/wiredtiger/src/conn/conn_api.c index f36bbc8cb2d..36d38fef91d 100644 --- a/src/third_party/wiredtiger/src/conn/conn_api.c +++ b/src/third_party/wiredtiger/src/conn/conn_api.c @@ -2242,7 +2242,8 @@ __wt_timing_stress_config(WT_SESSION_IMPL *session, const char *cfg[]) {"split_1", WT_TIMING_STRESS_SPLIT_1}, {"split_2", WT_TIMING_STRESS_SPLIT_2}, {"split_3", WT_TIMING_STRESS_SPLIT_3}, {"split_4", WT_TIMING_STRESS_SPLIT_4}, {"split_5", WT_TIMING_STRESS_SPLIT_5}, {"split_6", WT_TIMING_STRESS_SPLIT_6}, - {"split_7", WT_TIMING_STRESS_SPLIT_7}, {NULL, 0}}; + {"split_7", WT_TIMING_STRESS_SPLIT_7}, + {"tiered_flush_finish", WT_TIMING_STRESS_TIERED_FLUSH_FINISH}, {NULL, 0}}; WT_CONFIG_ITEM cval, sval; WT_CONNECTION_IMPL *conn; WT_DECL_RET; diff --git a/src/third_party/wiredtiger/src/conn/conn_tiered.c b/src/third_party/wiredtiger/src/conn/conn_tiered.c index bff95e0420b..345704f74c3 100644 --- a/src/third_party/wiredtiger/src/conn/conn_tiered.c +++ b/src/third_party/wiredtiger/src/conn/conn_tiered.c @@ -291,12 +291,13 @@ err: } /* - * __wt_tier_do_flush -- - * Perform one iteration of copying newly flushed objects to the shared storage. + * __tier_do_operation -- + * Perform one iteration of copying newly flushed objects to shared storage or post-flush + * processing. */ -int -__wt_tier_do_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id, const char *local_uri, - const char *obj_uri) +static int +__tier_do_operation(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id, const char *local_uri, + const char *obj_uri, uint32_t op) { WT_CONFIG_ITEM pfx; WT_DECL_RET; @@ -306,6 +307,7 @@ __wt_tier_do_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id, con char *tmp; const char *cfg[2], *local_name, *obj_name; + WT_ASSERT(session, (op == WT_TIERED_WORK_FLUSH || op == WT_TIERED_WORK_FLUSH_FINISH)); tmp = NULL; storage_source = tiered->bstorage->storage_source; bucket_fs = tiered->bstorage->file_system; @@ -322,36 +324,45 @@ __wt_tier_do_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id, con WT_RET(__wt_calloc_def(session, len, &tmp)); WT_ERR(__wt_snprintf(tmp, len, "%.*s%s", (int)pfx.len, pfx.str, obj_name)); - /* This call make take a while, and may fail due to network timeout. */ - WT_ERR( - storage_source->ss_flush(storage_source, &session->iface, bucket_fs, local_name, tmp, NULL)); + if (op == WT_TIERED_WORK_FLUSH_FINISH) + WT_ERR(storage_source->ss_flush_finish( + storage_source, &session->iface, bucket_fs, local_name, tmp, NULL)); + else { + /* WT_TIERED_WORK_FLUSH */ + /* This call make take a while, and may fail due to network timeout. */ + WT_ERR(storage_source->ss_flush( + storage_source, &session->iface, bucket_fs, local_name, tmp, NULL)); + + WT_WITH_CHECKPOINT_LOCK(session, + WT_WITH_SCHEMA_LOCK( + session, ret = __tier_flush_meta(session, tiered, local_uri, obj_uri))); + WT_ERR(ret); - WT_WITH_CHECKPOINT_LOCK(session, - WT_WITH_SCHEMA_LOCK(session, ret = __tier_flush_meta(session, tiered, local_uri, obj_uri))); - WT_ERR(ret); + /* + * After successful flushing, push a work unit to perform whatever post-processing the + * shared storage wants to do for this object. Note that this work unit is unrelated to the + * drop local work unit below. They do not need to be in any order and do not interfere with + * each other. + */ + WT_ERR(__wt_tiered_put_flush_finish(session, tiered, id)); + /* + * After successful flushing, push a work unit to drop the local object in the future. The + * object will be removed locally after the local retention period expires. + */ + WT_ERR(__wt_tiered_put_drop_local(session, tiered, id)); + } - /* - * We may need a way to cleanup flushes for those not completed (after a crash), or failed (due - * to previous network outage). - */ - WT_ERR(storage_source->ss_flush_finish( - storage_source, &session->iface, bucket_fs, local_name, tmp, NULL)); - /* - * After successful flushing, push a work unit to drop the local object in the future. The - * object will be removed locally after the local retention period expires. - */ - WT_ERR(__wt_tiered_put_drop_local(session, tiered, id)); err: __wt_free(session, tmp); return (ret); } /* - * __wt_tier_flush -- - * Given an ID generate the URI names and call the flush code. + * __tier_operation -- + * Given an ID generate the URI names and call the operation code to flush or finish. */ -int -__wt_tier_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id) +static int +__tier_operation(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id, uint32_t op) { WT_DECL_RET; const char *local_uri, *obj_uri; @@ -359,7 +370,7 @@ __wt_tier_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id) local_uri = obj_uri = NULL; WT_ERR(__wt_tiered_name(session, &tiered->iface, id, WT_TIERED_NAME_LOCAL, &local_uri)); WT_ERR(__wt_tiered_name(session, &tiered->iface, id, WT_TIERED_NAME_OBJECT, &obj_uri)); - WT_ERR(__wt_tier_do_flush(session, tiered, id, local_uri, obj_uri)); + WT_ERR(__tier_do_operation(session, tiered, id, local_uri, obj_uri, op)); err: __wt_free(session, local_uri); @@ -368,6 +379,48 @@ err: } /* + * __tier_storage_finish -- + * Perform one iteration of shared storage post-flush work. This is separated from copying the + * objects to shared storage to allow the flush_tier call to return after only the necessary + * work has completed. + */ +static int +__tier_storage_finish(WT_SESSION_IMPL *session) +{ + WT_DECL_RET; + WT_TIERED_WORK_UNIT *entry; + + entry = NULL; + /* + * Sleep a known period of time so that tests using the timing stress flag can have an idea when + * to check for the cache operation to complete. Sleep one second before processing the work + * queue of cache work units. + */ + if (FLD_ISSET(S2C(session)->timing_stress_flags, WT_TIMING_STRESS_TIERED_FLUSH_FINISH)) + __wt_sleep(1, 0); + for (;;) { + /* Check if we're quitting or being reconfigured. */ + if (!__tiered_server_run_chk(session)) + break; + + __wt_tiered_get_flush_finish(session, &entry); + if (entry == NULL) + break; + WT_ERR(__tier_operation(session, entry->tiered, entry->id, WT_TIERED_WORK_FLUSH_FINISH)); + /* + * We are responsible for freeing the work unit when we're done with it. + */ + __wt_tiered_work_free(session, entry); + entry = NULL; + } + +err: + if (entry != NULL) + __wt_tiered_work_free(session, entry); + return (ret); +} + +/* * __tier_storage_copy -- * Perform one iteration of copying newly flushed objects to the shared storage. */ @@ -393,7 +446,7 @@ __tier_storage_copy(WT_SESSION_IMPL *session) __wt_tiered_get_flush(session, &entry); if (entry == NULL) break; - WT_ERR(__wt_tier_flush(session, entry->tiered, entry->id)); + WT_ERR(__tier_operation(session, entry->tiered, entry->id, WT_TIERED_WORK_FLUSH)); /* * We are responsible for freeing the work unit when we're done with it. */ @@ -572,10 +625,12 @@ __tiered_server(void *arg) /* * Here is where we do work. Work we expect to do: * - Copy any files that need moving from a flush tier call. + * - Perform any shared storage processing after flushing. * - Remove any cached objects that are aged out. */ if (timediff >= WT_MINUTE || signalled) { WT_ERR(__tier_storage_copy(session)); + WT_ERR(__tier_storage_finish(session)); WT_ERR(__tier_storage_remove(session, false)); } time_start = time_stop; diff --git a/src/third_party/wiredtiger/src/include/connection.h b/src/third_party/wiredtiger/src/include/connection.h index bbd4db1a9ae..9ca75eeaed3 100644 --- a/src/third_party/wiredtiger/src/include/connection.h +++ b/src/third_party/wiredtiger/src/include/connection.h @@ -592,6 +592,7 @@ struct __wt_connection_impl { #define WT_TIMING_STRESS_SPLIT_5 0x04000u #define WT_TIMING_STRESS_SPLIT_6 0x08000u #define WT_TIMING_STRESS_SPLIT_7 0x10000u +#define WT_TIMING_STRESS_TIERED_FLUSH_FINISH 0x20000u /* AUTOMATIC FLAG VALUE GENERATION STOP 64 */ uint64_t timing_stress_flags; diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h index 3842d28f621..3995243b512 100644 --- a/src/third_party/wiredtiger/src/include/extern.h +++ b/src/third_party/wiredtiger/src/include/extern.h @@ -1469,10 +1469,6 @@ extern int __wt_thread_group_destroy(WT_SESSION_IMPL *session, WT_THREAD_GROUP * extern int __wt_thread_group_resize(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, uint32_t new_min, uint32_t new_max, uint32_t flags) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); -extern int __wt_tier_do_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id, - const char *local_uri, const char *obj_uri) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); -extern int __wt_tier_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id) - WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_tiered_bucket_config(WT_SESSION_IMPL *session, const char *cfg[], WT_BUCKET_STORAGE **bstoragep) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_tiered_close(WT_SESSION_IMPL *session) @@ -1494,6 +1490,8 @@ extern int __wt_tiered_put_drop_shared(WT_SESSION_IMPL *session, WT_TIERED *tier WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_tiered_put_flush(WT_SESSION_IMPL *session, WT_TIERED *tiered) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_tiered_put_flush_finish(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_tiered_set_metadata(WT_SESSION_IMPL *session, WT_TIERED *tiered, WT_ITEM *buf) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_tiered_storage_create(WT_SESSION_IMPL *session, const char *cfg[]) @@ -1856,6 +1854,7 @@ extern void __wt_tiered_get_drop_local( WT_SESSION_IMPL *session, uint64_t now, WT_TIERED_WORK_UNIT **entryp); extern void __wt_tiered_get_drop_shared(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp); extern void __wt_tiered_get_flush(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp); +extern void __wt_tiered_get_flush_finish(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp); extern void __wt_tiered_pop_work( WT_SESSION_IMPL *session, uint32_t type, uint64_t maxval, WT_TIERED_WORK_UNIT **entryp); extern void __wt_tiered_push_work(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT *entry); diff --git a/src/third_party/wiredtiger/src/include/tiered.h b/src/third_party/wiredtiger/src/include/tiered.h index 1fa146aba5c..3fe6784c80b 100644 --- a/src/third_party/wiredtiger/src/include/tiered.h +++ b/src/third_party/wiredtiger/src/include/tiered.h @@ -62,9 +62,10 @@ struct __wt_tiered_manager { * Different types of work units for tiered trees. */ /* AUTOMATIC FLAG VALUE GENERATION START 0 */ -#define WT_TIERED_WORK_DROP_LOCAL 0x1u /* Drop object from local storage. */ -#define WT_TIERED_WORK_DROP_SHARED 0x2u /* Drop object from tier. */ -#define WT_TIERED_WORK_FLUSH 0x4u /* Flush object to tier. */ +#define WT_TIERED_WORK_DROP_LOCAL 0x1u /* Drop object from local storage. */ +#define WT_TIERED_WORK_DROP_SHARED 0x2u /* Drop object from tier. */ +#define WT_TIERED_WORK_FLUSH 0x4u /* Flush object to tier. */ +#define WT_TIERED_WORK_FLUSH_FINISH 0x8u /* Perform flush finish on object. */ /* AUTOMATIC FLAG VALUE GENERATION STOP 32 */ /* diff --git a/src/third_party/wiredtiger/src/tiered/tiered_work.c b/src/third_party/wiredtiger/src/tiered/tiered_work.c index efc80ea86bd..49097e87f0b 100644 --- a/src/third_party/wiredtiger/src/tiered/tiered_work.c +++ b/src/third_party/wiredtiger/src/tiered/tiered_work.c @@ -95,6 +95,18 @@ __wt_tiered_pop_work( } /* + * __wt_tiered_get_flush_finish -- + * Get the first flush_finish work unit from the queue. The id information cannot change between + * our caller and here. The caller is responsible for freeing the work unit. + */ +void +__wt_tiered_get_flush_finish(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp) +{ + __wt_tiered_pop_work(session, WT_TIERED_WORK_FLUSH_FINISH, 0, entryp); + return; +} + +/* * __wt_tiered_get_flush -- * Get the first flush work unit from the queue. The id information cannot change between our * caller and here. The caller is responsible for freeing the work unit. @@ -130,6 +142,23 @@ __wt_tiered_get_drop_shared(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entr } /* + * __wt_tiered_put_flush_finish -- + * Add a flush_finish work unit to the queue. + */ +int +__wt_tiered_put_flush_finish(WT_SESSION_IMPL *session, WT_TIERED *tiered, uint32_t id) +{ + WT_TIERED_WORK_UNIT *entry; + + WT_RET(__wt_calloc_one(session, &entry)); + entry->type = WT_TIERED_WORK_FLUSH_FINISH; + entry->id = id; + entry->tiered = tiered; + __wt_tiered_push_work(session, entry); + return (0); +} + +/* * __wt_tiered_put_drop_local -- * Add a drop local work unit for the given ID to the queue. */ diff --git a/src/third_party/wiredtiger/test/suite/test_tiered09.py b/src/third_party/wiredtiger/test/suite/test_tiered09.py index e44b828bfd9..507eb31cd85 100755 --- a/src/third_party/wiredtiger/test/suite/test_tiered09.py +++ b/src/third_party/wiredtiger/test/suite/test_tiered09.py @@ -103,7 +103,7 @@ class test_tiered09(wttest.WiredTigerTestCase): self.close_conn() self.assertTrue(os.path.exists(self.obj1file)) self.assertTrue(os.path.exists(self.obj2file)) - bucket_obj = self.bucket + '/' + self.prefix1 + self.obj1file + bucket_obj = os.path.join(self.bucket, self.prefix1 + self.obj1file) self.assertTrue(os.path.exists(bucket_obj)) # Since we've closed and reopened the connection we lost the work units # to drop the local objects. Clean them up now to make sure we can open @@ -132,9 +132,9 @@ class test_tiered09(wttest.WiredTigerTestCase): self.session.flush_tier(None) self.close_conn() # Check each table was created with the correct prefix. - bucket_obj = self.bucket + '/' + self.prefix2 + self.obj1second + bucket_obj = os.path.join(self.bucket, self.prefix2 + self.obj1second) self.assertTrue(os.path.exists(bucket_obj)) - bucket_obj = self.bucket + '/' + self.prefix1 + self.obj2file + bucket_obj = os.path.join(self.bucket, self.prefix1 + self.obj2file) self.assertTrue(os.path.exists(bucket_obj)) # Since we've closed and reopened the connection we lost the work units # to drop the local objects. Clean them up now to make sure we can open diff --git a/src/third_party/wiredtiger/test/suite/test_tiered10.py b/src/third_party/wiredtiger/test/suite/test_tiered10.py index a6b3e5dc56c..5b851000d12 100755 --- a/src/third_party/wiredtiger/test/suite/test_tiered10.py +++ b/src/third_party/wiredtiger/test/suite/test_tiered10.py @@ -117,8 +117,8 @@ class test_tiered10(wttest.WiredTigerTestCase): session1.flush_tier(None) session2.checkpoint() session2.flush_tier(None) - conn1_obj1 = self.bucket + '/' + self.prefix1 + self.obj1file - conn2_obj1 = self.bucket + '/' + self.prefix2 + self.obj1file + conn1_obj1 = os.path.join(self.bucket, self.prefix1 + self.obj1file) + conn2_obj1 = os.path.join(self.bucket, self.prefix2 + self.obj1file) self.assertTrue(os.path.exists(conn1_obj1)) self.assertTrue(os.path.exists(conn2_obj1)) conn1.close() diff --git a/src/third_party/wiredtiger/test/suite/test_tiered12.py b/src/third_party/wiredtiger/test/suite/test_tiered12.py new file mode 100755 index 00000000000..5797ac779c9 --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_tiered12.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import os, time, wiredtiger, wttest +from wiredtiger import stat +StorageSource = wiredtiger.StorageSource # easy access to constants + +# test_tiered12.py +# Test tiered storage with tiered flush finish timing delay. +class test_tiered12(wttest.WiredTigerTestCase): + + # If the 'uri' changes all the other names must change with it. + base = 'test_tiered12-000000000' + obj1file = base + '1.wtobj' + uri = "table:test_tiered12" + + auth_token = "test_token" + bucket = "mybucket" + cache = "cache-mybucket" + extension_name = "local_store" + prefix1 = "1_" + retention = 1 + saved_conn = '' + def conn_config(self): + os.mkdir(self.bucket) + self.saved_conn = \ + 'statistics=(all),timing_stress_for_test=(tiered_flush_finish),' + \ + 'tiered_storage=(auth_token=%s,' % self.auth_token + \ + 'bucket=%s,' % self.bucket + \ + 'bucket_prefix=%s,' % self.prefix1 + \ + 'local_retention=%d,' % self.retention + \ + 'name=%s)' % self.extension_name + return self.saved_conn + + # Load the local store extension. + def conn_extensions(self, extlist): + # Windows doesn't support dynamically loaded extension libraries. + if os.name == 'nt': + extlist.skip_if_missing = True + extlist.extension('storage_sources', self.extension_name) + + def check(self, tc, n): + for i in range(0, n): + self.assertEqual(tc[str(i)], str(i)) + tc.set_key(str(n)) + self.assertEquals(tc.search(), wiredtiger.WT_NOTFOUND) + + def test_tiered(self): + # Create a table. Add some data. Checkpoint and flush tier. + # We have configured the timing stress for tiered caching which delays + # the internal thread calling flush_finish for 1 second. + # So after flush tier completes, check that the cached object does not + # exist. Then sleep and check that it does exist. + # + # The idea is to make sure flush_tier is not waiting for unnecessary work + # to be done, but returns as soon as the copying to shared storage completes. + self.session.create(self.uri, 'key_format=S,value_format=S,') + + # Add data. Checkpoint and flush. + c = self.session.open_cursor(self.uri) + c["0"] = "0" + self.check(c, 1) + c.close() + self.session.checkpoint() + + bucket_obj = os.path.join(self.bucket, self.prefix1 + self.obj1file) + cache_obj = os.path.join(self.cache, self.prefix1 + self.obj1file) + self.session.flush_tier(None) + # Immediately after flush_tier finishes the cached object should not yet exist + # but the bucket object does exist. + self.assertFalse(os.path.exists(cache_obj)) + self.assertTrue(os.path.exists(bucket_obj)) + # Sleep more than the one second stress timing amount and give the thread time to run. + time.sleep(2) + # After sleeping, the internal thread should have created the cached object. + self.assertTrue(os.path.exists(cache_obj)) + +if __name__ == '__main__': + wttest.run() |