diff options
author | Luke Chen <luke.chen@mongodb.com> | 2021-03-18 16:04:25 +1100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-18 05:26:55 +0000 |
commit | 50f5691a67c71f3cf969205037aff2c44f6160e2 (patch) | |
tree | 65735c6472fa15d7e2be116aa7ea78ee68ec94d1 | |
parent | fad9de4ece9f5399acc804f206e40621930be5af (diff) | |
download | mongo-50f5691a67c71f3cf969205037aff2c44f6160e2.tar.gz |
Import wiredtiger: 31624ffd8c6eb39794ddd6c41cb48db42a646a87 from branch mongodb-5.0
ref: 98b3599256..31624ffd8c
for: 4.9.0
WT-7275 Add timestamp and transaction management to the test framework
14 files changed, 413 insertions, 70 deletions
diff --git a/src/third_party/wiredtiger/dist/test_data.py b/src/third_party/wiredtiger/dist/test_data.py index 15d3d2e7150..4f6aab5f5c7 100644 --- a/src/third_party/wiredtiger/dist/test_data.py +++ b/src/third_party/wiredtiger/dist/test_data.py @@ -106,6 +106,23 @@ runtime_monitor_config = throttle_config +[ type='category', subconfig=limit_stat) ] +transaction_config = [ + Config('min_operation_per_transaction', 1, r''' + The minimum number of operations per transaction''', min=1, max=200000), + Config('max_operation_per_transaction', 1, r''' + The maximum number of operations per transaction''', min=1, max=200000), +] + +timestamp_config = [ + Config('enable_timestamp', 'true', r''' + Enables timestamp management''', type='boolean'), + Config('oldest_lag', 0, r''' + The duration between the stable and oldest timestamps''', min=0, max=1000000), + Config('stable_lag', 0, r''' + The duration between the latest and stable timestamps''', min=0, max=1000000), +] + methods = { -'poc_test' : Method(load_config + workload_config + runtime_monitor_config + test_config), + 'poc_test' : Method(load_config + workload_config + runtime_monitor_config + transaction_config + + timestamp_config + test_config), } diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index ef42cb5b1f1..c0278929937 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-5.0", - "commit": "98b3599256c8ce629ba632021edd0ab2b9d695d9" + "commit": "31624ffd8c6eb39794ddd6c41cb48db42a646a87" } diff --git a/src/third_party/wiredtiger/src/config/test_config.c b/src/third_party/wiredtiger/src/config/test_config.c index 8012ea026c7..dd2264c32f9 100644 --- a/src/third_party/wiredtiger/src/config/test_config.c +++ b/src/third_party/wiredtiger/src/config/test_config.c @@ -10,13 +10,18 @@ static const WT_CONFIG_CHECK confchk_poc_test[] = { {"cache_size_mb", "int", NULL, "min=0,max=100000000000", NULL, 0}, {"collection_count", "int", NULL, "min=0,max=200000", NULL, 0}, {"duration_seconds", "int", NULL, "min=0,max=1000000", NULL, 0}, + {"enable_timestamp", "boolean", NULL, NULL, NULL, 0}, {"enable_tracking", "boolean", NULL, NULL, NULL, 0}, {"insert_config", "string", NULL, NULL, NULL, 0}, {"insert_threads", "int", NULL, "min=0,max=20", NULL, 0}, {"key_count", "int", NULL, "min=0,max=1000000", NULL, 0}, {"key_size", "int", NULL, "min=0,max=10000", NULL, 0}, + {"max_operation_per_transaction", "int", NULL, "min=1,max=200000", NULL, 0}, + {"min_operation_per_transaction", "int", NULL, "min=1,max=200000", NULL, 0}, + {"oldest_lag", "int", NULL, "min=0,max=1000000", NULL, 0}, {"rate_per_second", "int", NULL, "min=1,max=1000", NULL, 0}, {"read_threads", "int", NULL, "min=0,max=100", NULL, 0}, + {"stable_lag", "int", NULL, "min=0,max=1000000", NULL, 0}, {"stat_cache_size", "category", NULL, NULL, confchk_stat_cache_size_subconfigs, 2}, {"update_config", "string", NULL, NULL, NULL, 0}, {"update_threads", "int", NULL, "min=0,max=20", NULL, 0}, @@ -25,11 +30,13 @@ static const WT_CONFIG_CHECK confchk_poc_test[] = { static const WT_CONFIG_ENTRY config_entries[] = { {"poc_test", "cache_size_mb=0,collection_count=1,duration_seconds=0," - "enable_tracking=true,insert_config=,insert_threads=0,key_count=0" - ",key_size=0,rate_per_second=1,read_threads=0," + "enable_timestamp=true,enable_tracking=true,insert_config=," + "insert_threads=0,key_count=0,key_size=0," + "max_operation_per_transaction=1,min_operation_per_transaction=1," + "oldest_lag=0,rate_per_second=1,read_threads=0,stable_lag=0," "stat_cache_size=(enabled=false,limit=),update_config=," "update_threads=0,value_size=0", - confchk_poc_test, 14}, + confchk_poc_test, 19}, {NULL, NULL, NULL, 0}}; /* diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h index 61f61c9a838..0ada02bdde3 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h @@ -36,17 +36,25 @@ namespace test_harness { static const char *CACHE_SIZE_MB = "cache_size_mb"; static const char *COLLECTION_COUNT = "collection_count"; static const char *DURATION_SECONDS = "duration_seconds"; -static const char *ENABLE_TRACKING = "enable_tracking"; static const char *ENABLED = "enabled"; +static const char *ENABLE_TIMESTAMP = "enable_timestamp"; +static const char *ENABLE_TRACKING = "enable_tracking"; static const char *KEY_COUNT = "key_count"; static const char *LIMIT = "limit"; +static const char *MAX_OPERATION_PER_TRANSACTION = "max_operation_per_transaction"; +static const char *MIN_OPERATION_PER_TRANSACTION = "min_operation_per_transaction"; +static const char *OLDEST_LAG = "oldest_lag"; static const char *RATE_PER_SECOND = "rate_per_second"; static const char *READ_THREADS = "read_threads"; +static const char *STABLE_LAG = "stable_lag"; static const char *STAT_CACHE_SIZE = "stat_cache_size"; static const char *VALUE_SIZE = "value_size"; /* WiredTiger API consts. */ +static const char *COMMIT_TS = "commit_timestamp"; static const char *CONNECTION_CREATE = "create"; +static const char *OLDEST_TS = "oldest_timestamp"; +static const char *STABLE_TS = "stable_timestamp"; /* Test harness consts. */ static const char *TABLE_OPERATION_TRACKING = "table:operation_tracking"; diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration.h index b0c40e3a6e3..2e38dc4da52 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration.h @@ -26,8 +26,8 @@ * OTHER DEALINGS IN THE SOFTWARE. */ -#ifndef CONFIGURATION_SETTINGS_H -#define CONFIGURATION_SETTINGS_H +#ifndef CONFIGURATION_H +#define CONFIGURATION_H extern "C" { #include "test_util.h" diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h index f4d50e4778b..f3344ff4a30 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h @@ -54,7 +54,7 @@ class connection_manager { instance() { static connection_manager _instance; - return _instance; + return (_instance); } void @@ -96,7 +96,18 @@ class connection_manager { testutil_check(_conn->open_session(_conn, NULL, NULL, &session)); _conn_mutex.unlock(); - return session; + return (session); + } + + /* + * set_timestamp calls into the connection API in a thread safe manner to set global timestamps. + */ + void + set_timestamp(const std::string &config) + { + _conn_mutex.lock(); + testutil_check(_conn->set_timestamp(_conn, config.c_str())); + _conn_mutex.unlock(); } private: diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/random_generator.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/random_generator.h index ebcc0033a38..455d4173b86 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/random_generator.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/random_generator.h @@ -33,7 +33,7 @@ namespace test_harness { -/* Helper class to generate random values. */ +/* Helper class to generate random values using uniform distributions. */ class random_generator { public: /* No copies of the singleton allowed. */ @@ -47,6 +47,7 @@ class random_generator { return _instance; } + /* Generate a random string of a given length. */ std::string generate_string(std::size_t length) { @@ -58,6 +59,14 @@ class random_generator { return (random_string); } + /* Generate a random integer between min and max. */ + int64_t + generate_integer(int64_t min, int64_t max) + { + std::uniform_int_distribution<> dis(min, max); + return dis(_generator); + } + private: random_generator() { diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h index cee7d3185eb..e1a4f3e4360 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h @@ -141,7 +141,7 @@ class runtime_monitor : public component { testutil_check(_config->get(STAT_CACHE_SIZE, &nested)); configuration sub_config = configuration(nested); _stats.push_back(new cache_limit_statistic(&sub_config)); - _running = true; + component::load(); } void diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h index d6768ce7e48..4bcc4e92689 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h @@ -61,7 +61,8 @@ class test { _timestamp_manager = new timestamp_manager(_configuration); _workload_tracking = new workload_tracking(_configuration, OPERATION_TRACKING_TABLE_CONFIG, TABLE_OPERATION_TRACKING, SCHEMA_TRACKING_TABLE_CONFIG, TABLE_SCHEMA_TRACKING); - _workload_generator = new workload_generator(_configuration, _workload_tracking); + _workload_generator = + new workload_generator(_configuration, _timestamp_manager, _workload_tracking); _thread_manager = new thread_manager(); /* * Ordering is not important here, any dependencies between components should be resolved @@ -95,8 +96,7 @@ class test { void run() { - int64_t cache_size_mb = 100; - int64_t duration_seconds = 0; + int64_t cache_size_mb = 100, duration_seconds = 0; bool enable_tracking = false, is_success = true; /* Build the database creation config string. */ @@ -118,6 +118,7 @@ class test { /* Sleep duration seconds. */ testutil_check(_configuration->get_int(DURATION_SECONDS, duration_seconds)); + testutil_assert(duration_seconds >= 0); std::this_thread::sleep_for(std::chrono::seconds(duration_seconds)); /* End the test. */ @@ -175,8 +176,8 @@ class test { std::vector<component *> _components; configuration *_configuration; runtime_monitor *_runtime_monitor; - timestamp_manager *_timestamp_manager; thread_manager *_thread_manager; + timestamp_manager *_timestamp_manager; workload_generator *_workload_generator; workload_tracking *_workload_tracking; }; diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/thread_context.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/thread_context.h index 2613989f983..61ee7e01a88 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/thread_context.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/thread_context.h @@ -29,6 +29,9 @@ #ifndef THREAD_CONTEXT_H #define THREAD_CONTEXT_H +#include "random_generator.h" +#include "workload_tracking.h" + namespace test_harness { /* Define the different thread operations. */ enum class thread_operation { @@ -45,13 +48,16 @@ enum class thread_operation { /* Container class for a thread and any data types it may need to interact with the database. */ class thread_context { public: - thread_context(std::vector<std::string> collection_names, thread_operation type) - : _collection_names(collection_names), _running(false), _type(type) + thread_context(timestamp_manager *timestamp_manager, workload_tracking *tracking, + std::vector<std::string> collection_names, thread_operation type, int64_t max_op, + int64_t min_op, int64_t value_size) + : _collection_names(collection_names), _current_op_count(0U), _in_txn(false), + _running(false), _min_op(min_op), _max_op(max_op), _max_op_count(0), + _timestamp_manager(timestamp_manager), _type(type), _tracking(tracking), + _value_size(value_size) { } - thread_context(thread_operation type) : _running(false), _type(type) {} - void finish() { @@ -61,19 +67,31 @@ class thread_context { const std::vector<std::string> & get_collection_names() const { - return _collection_names; + return (_collection_names); } thread_operation get_thread_operation() const { - return _type; + return (_type); + } + + workload_tracking * + get_tracking() const + { + return (_tracking); + } + + int64_t + get_value_size() const + { + return (_value_size); } bool is_running() const { - return _running; + return (_running); } void @@ -82,10 +100,81 @@ class thread_context { _running = running; } + void + begin_transaction(WT_SESSION *session, const std::string &config) + { + if (!_in_txn && _timestamp_manager->is_enabled()) { + testutil_check( + session->begin_transaction(session, config.empty() ? NULL : config.c_str())); + /* This randomizes the number of operations to be executed in one transaction. */ + _max_op_count = random_generator::instance().generate_integer(_min_op, _max_op); + _current_op_count = 0; + _in_txn = true; + } + } + + /* Returns true if the current transaction has been committed. */ + bool + commit_transaction(WT_SESSION *session, const std::string &config) + { + if (!_timestamp_manager->is_enabled()) + return (true); + + /* A transaction cannot be committed if not started. */ + testutil_assert(_in_txn); + /* The current transaction should be committed if: + * - The thread is done working. This is useful when the test is ended and the thread has + * not reached the maximum number of operations per transaction. + * - The number of operations executed in the current transaction has exceeded the + * threshold. + */ + if (!_running || (++_current_op_count > _max_op_count)) { + testutil_check( + session->commit_transaction(session, config.empty() ? nullptr : config.c_str())); + _in_txn = false; + } + + return (!_in_txn); + } + + /* + * Set a commit timestamp if the timestamp manager is enabled and always return the timestamp + * that should have been used for the commit. + */ + wt_timestamp_t + set_commit_timestamp(WT_SESSION *session) + { + + wt_timestamp_t ts = _timestamp_manager->get_next_ts(); + std::string config; + + if (_timestamp_manager->is_enabled()) { + config = std::string(COMMIT_TS) + "=" + _timestamp_manager->decimal_to_hex(ts); + testutil_check(session->timestamp_transaction(session, config.c_str())); + } + + return (ts); + } + private: const std::vector<std::string> _collection_names; - bool _running; + /* + * _current_op_count is the current number of operations that have been executed in the current + * transaction. + */ + uint64_t _current_op_count; + bool _in_txn, _running; + /* + * _min_op and _max_op are the minimum and maximum number of operations within one transaction. + * _max_op_count is the current maximum number of operations that can be executed in the current + * transaction. _max_op_count will always be <= _max_op. + */ + int64_t _min_op, _max_op, _max_op_count; + timestamp_manager *_timestamp_manager; const thread_operation _type; + workload_tracking *_tracking; + /* Temporary member that comes from the test configuration. */ + int64_t _value_size; }; } // namespace test_harness diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h index 10aa0482bf4..a1cf1ad8d1d 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h @@ -30,23 +30,133 @@ #define TIMESTAMP_MANAGER_H #include "component.h" +#include <atomic> +#include <chrono> +#include <sstream> +#include <thread> namespace test_harness { /* * The timestamp monitor class manages global timestamp state for all components in the test - * harness. It also manages the global timestamps within WiredTiger. + * harness. It also manages the global timestamps within WiredTiger. All timestamps are in seconds + * unless specified otherwise. */ class timestamp_manager : public component { public: - timestamp_manager(configuration *config) : component(config) {} + timestamp_manager(configuration *config) + : /* _periodic_update_s is hardcoded to 1 second for now. */ + component(config), _increment_ts(0U), _is_enabled(false), _latest_ts(0U), _oldest_lag(0), + _oldest_ts(0U), _periodic_update_s(1), _stable_lag(0), _stable_ts(0U) + { + } + + /* Delete the copy constructor. */ + timestamp_manager(const timestamp_manager &) = delete; + + void + load() + { + testutil_assert(_config != nullptr); + testutil_check(_config->get_int(OLDEST_LAG, _oldest_lag)); + testutil_assert(_oldest_lag >= 0); + testutil_check(_config->get_int(STABLE_LAG, _stable_lag)); + testutil_assert(_stable_lag >= 0); + testutil_check(_config->get_bool(ENABLE_TIMESTAMP, _is_enabled)); + component::load(); + } void run() { - while (_running) { - /* Do something. */ + std::string config; + /* latest_ts_s represents the time component of the latest timestamp provided. */ + wt_timestamp_t latest_ts_s; + + while (_is_enabled && _running) { + /* Timestamps are checked periodically. */ + std::this_thread::sleep_for(std::chrono::seconds(_periodic_update_s)); + latest_ts_s = (_latest_ts >> 32); + /* + * Keep a time window between the latest and stable ts less than the max defined in the + * configuration. + */ + testutil_assert(latest_ts_s >= _stable_ts); + if ((latest_ts_s - _stable_ts) > _stable_lag) { + _stable_ts = latest_ts_s - _stable_lag; + config += std::string(STABLE_TS) + "=" + decimal_to_hex(_stable_ts); + } + + /* + * Keep a time window between the stable and oldest ts less than the max defined in the + * configuration. + */ + testutil_assert(_stable_ts > _oldest_ts); + if ((_stable_ts - _oldest_ts) > _oldest_lag) { + _oldest_ts = _stable_ts - _oldest_lag; + if (!config.empty()) + config += ","; + config += std::string(OLDEST_TS) + "=" + decimal_to_hex(_oldest_ts); + } + + /* Save the new timestamps. */ + if (!config.empty()) { + connection_manager::instance().set_timestamp(config); + config = ""; + } } } + + bool + is_enabled() const + { + return _is_enabled; + } + + /* + * Get a unique commit timestamp. The first 32 bits represent the epoch time in seconds. The + * last 32 bits represent an increment for uniqueness. + */ + wt_timestamp_t + get_next_ts() + { + uint64_t current_time = get_time_now_s(); + _increment_ts.fetch_add(1); + + current_time = (current_time << 32) | (_increment_ts & 0x00000000FFFFFFFF); + _latest_ts = current_time; + + return (_latest_ts); + } + + static const std::string + decimal_to_hex(int64_t value) + { + std::stringstream ss; + ss << std::hex << value; + std::string res(ss.str()); + return (res); + } + + private: + uint64_t + get_time_now_s() const + { + auto now = std::chrono::system_clock::now().time_since_epoch(); + uint64_t current_time_s = + static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(now).count()); + return (current_time_s); + } + + private: + bool _is_enabled; + const wt_timestamp_t _periodic_update_s; + std::atomic<wt_timestamp_t> _increment_ts; + wt_timestamp_t _latest_ts, _oldest_ts, _stable_ts; + /* + * _oldest_lag is the time window between the stable and oldest timestamps. + * _stable_lag is the time window between the latest and stable timestamps. + */ + int64_t _oldest_lag, _stable_lag; }; } // namespace test_harness diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h index c5f81553d81..0b6cc1b2a84 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h @@ -30,6 +30,7 @@ #define WORKLOAD_GENERATOR_H #include <algorithm> +#include <atomic> #include <map> #include "random_generator.h" @@ -41,8 +42,9 @@ namespace test_harness { */ class workload_generator : public component { public: - workload_generator(configuration *configuration, workload_tracking *tracking) - : component(configuration), _tracking(tracking) + workload_generator(configuration *configuration, timestamp_manager *timestamp_manager, + workload_tracking *tracking) + : component(configuration), _timestamp_manager(timestamp_manager), _tracking(tracking) { } @@ -71,8 +73,10 @@ class workload_generator : public component { { WT_CURSOR *cursor; WT_SESSION *session; + wt_timestamp_t ts; int64_t collection_count, key_count, value_size; - std::string collection_name, home; + std::string collection_name, config, generated_value, home; + bool ts_enabled = _timestamp_manager->is_enabled(); cursor = nullptr; collection_count = key_count = value_size = 0; @@ -85,7 +89,8 @@ class workload_generator : public component { for (int i = 0; i < collection_count; ++i) { collection_name = "table:collection" + std::to_string(i); testutil_check(session->create(session, collection_name.c_str(), DEFAULT_TABLE_SCHEMA)); - testutil_check(_tracking->save(tracking_operation::CREATE, collection_name, 0, "")); + ts = _timestamp_manager->get_next_ts(); + testutil_check(_tracking->save(tracking_operation::CREATE, collection_name, 0, "", ts)); _collection_names.push_back(collection_name); } debug_info( @@ -94,17 +99,27 @@ class workload_generator : public component { /* Open a cursor on each collection and use the configuration to insert key/value pairs. */ testutil_check(_config->get_int(KEY_COUNT, key_count)); testutil_check(_config->get_int(VALUE_SIZE, value_size)); + testutil_assert(value_size >= 0); for (const auto &collection_name : _collection_names) { /* WiredTiger lets you open a cursor on a collection using the same pointer. When a * session is closed, WiredTiger APIs close the cursors too. */ testutil_check( session->open_cursor(session, collection_name.c_str(), NULL, NULL, &cursor)); for (size_t j = 0; j < key_count; ++j) { - /* Generation of a random string value using the size defined in the test - * configuration. */ - std::string generated_value = + /* + * Generation of a random string value using the size defined in the test + * configuration. + */ + generated_value = random_generator::random_generator::instance().generate_string(value_size); - testutil_check(insert(cursor, collection_name, j + 1, generated_value.c_str())); + ts = _timestamp_manager->get_next_ts(); + if (ts_enabled) + testutil_check(session->begin_transaction(session, "")); + testutil_check(insert(cursor, collection_name, j + 1, generated_value.c_str(), ts)); + if (ts_enabled) { + config = std::string(COMMIT_TS) + "=" + _timestamp_manager->decimal_to_hex(ts); + testutil_check(session->commit_transaction(session, config.c_str())); + } } } debug_info("Populate stage done", _trace_level, DEBUG_INFO); @@ -114,18 +129,30 @@ class workload_generator : public component { void run() { - int64_t duration_seconds, read_threads; - - duration_seconds = read_threads = 0; + WT_SESSION *session = nullptr; + int64_t duration_seconds, read_threads, min_operation_per_transaction, + max_operation_per_transaction, value_size; /* Populate the database. */ populate(); + /* Retrieve useful parameters from the test configuration. */ testutil_check(_config->get_int(DURATION_SECONDS, duration_seconds)); + testutil_assert(duration_seconds >= 0); testutil_check(_config->get_int(READ_THREADS, read_threads)); + testutil_check( + _config->get_int(MIN_OPERATION_PER_TRANSACTION, min_operation_per_transaction)); + testutil_check( + _config->get_int(MAX_OPERATION_PER_TRANSACTION, max_operation_per_transaction)); + testutil_assert(max_operation_per_transaction >= min_operation_per_transaction); + testutil_check(_config->get_int(VALUE_SIZE, value_size)); + testutil_assert(value_size >= 0); + /* Generate threads to execute read operations on the collections. */ for (int i = 0; i < read_threads; ++i) { - thread_context *tc = new thread_context(_collection_names, thread_operation::READ); + thread_context *tc = new thread_context(_timestamp_manager, _tracking, + _collection_names, thread_operation::READ, max_operation_per_transaction, + min_operation_per_transaction, value_size); _workers.push_back(tc); _thread_manager.add_thread(tc, &execute_operation); } @@ -134,11 +161,11 @@ class workload_generator : public component { void finish() { - debug_info("Workload generator stage done", _trace_level, DEBUG_INFO); for (const auto &it : _workers) { it->finish(); } _thread_manager.join(); + debug_info("Workload generator: run stage done", _trace_level, DEBUG_INFO); } /* Workload threaded operations. */ @@ -155,11 +182,13 @@ class workload_generator : public component { break; case thread_operation::REMOVE: case thread_operation::INSERT: - case thread_operation::UPDATE: /* Sleep until it is implemented. */ while (context.is_running()) std::this_thread::sleep_for(std::chrono::seconds(1)); break; + case thread_operation::UPDATE: + update_operation(context, session); + break; default: testutil_die(DEBUG_ABORT, "system: thread_operation is unknown : %d", static_cast<int>(context.get_thread_operation())); @@ -167,6 +196,51 @@ class workload_generator : public component { } } + /* + * Basic update operation that currently update the same key with a random value in each + * collection. + */ + static void + update_operation(thread_context &context, WT_SESSION *session) + { + WT_CURSOR *cursor; + WT_DECL_RET; + wt_timestamp_t ts; + std::vector<WT_CURSOR *> cursors; + std::vector<std::string> collection_names; + std::string generated_value; + bool has_committed = true; + int64_t cpt, value_size = context.get_value_size(); + + testutil_assert(session != nullptr); + /* Get a cursor for each collection in collection_names. */ + for (const auto &it : context.get_collection_names()) { + testutil_check(session->open_cursor(session, it.c_str(), NULL, NULL, &cursor)); + cursors.push_back(cursor); + collection_names.push_back(it); + } + + while (context.is_running()) { + /* Walk each cursor. */ + context.begin_transaction(session, ""); + ts = context.set_commit_timestamp(session); + cpt = 0; + for (const auto &it : cursors) { + generated_value = + random_generator::random_generator::instance().generate_string(value_size); + /* Key is hard coded for now. */ + testutil_check(update(context.get_tracking(), it, collection_names[cpt], 1, + generated_value.c_str(), ts)); + ++cpt; + } + has_committed = context.commit_transaction(session, ""); + } + + /* Make sure the last operation is committed now the work is finished. */ + if (!has_committed) + context.commit_transaction(session, ""); + } + /* Basic read operation that walks a cursors across all collections. */ static void read_operation(thread_context &context, WT_SESSION *session) @@ -175,6 +249,7 @@ class workload_generator : public component { WT_DECL_RET; std::vector<WT_CURSOR *> cursors; + testutil_assert(session != nullptr); /* Get a cursor for each collection in collection_names. */ for (const auto &it : context.get_collection_names()) { testutil_check(session->open_cursor(session, it.c_str(), NULL, NULL, &cursor)); @@ -193,55 +268,68 @@ class workload_generator : public component { /* WiredTiger APIs wrappers for single operations. */ template <typename K, typename V> int - insert(WT_CURSOR *cursor, const std::string &collection_name, K key, V value) + insert(WT_CURSOR *cursor, const std::string &collection_name, K key, V value, wt_timestamp_t ts) { int error_code; - if (cursor == nullptr) - throw std::invalid_argument("Failed to call insert, invalid cursor"); - + testutil_assert(cursor != nullptr); cursor->set_key(cursor, key); cursor->set_value(cursor, value); error_code = cursor->insert(cursor); if (error_code == 0) { debug_info("key/value inserted", _trace_level, DEBUG_INFO); - error_code = _tracking->save(tracking_operation::INSERT, collection_name, key, value); + error_code = + _tracking->save(tracking_operation::INSERT, collection_name, key, value, ts); } else debug_info("key/value insertion failed", _trace_level, DEBUG_ERROR); - return error_code; + return (error_code); } static int search(WT_CURSOR *cursor) { - if (cursor == nullptr) - throw std::invalid_argument("Failed to call search, invalid cursor"); + testutil_assert(cursor != nullptr); return (cursor->search(cursor)); } static int search_near(WT_CURSOR *cursor, int *exact) { - if (cursor == nullptr) - throw std::invalid_argument("Failed to call search_near, invalid cursor"); + testutil_assert(cursor != nullptr); return (cursor->search_near(cursor, exact)); } + template <typename K, typename V> static int - update(WT_CURSOR *cursor) + update(workload_tracking *tracking, WT_CURSOR *cursor, const std::string &collection_name, + K key, V value, wt_timestamp_t ts) { - if (cursor == nullptr) - throw std::invalid_argument("Failed to call update, invalid cursor"); - return (cursor->update(cursor)); + int error_code; + + testutil_assert(tracking != nullptr); + testutil_assert(cursor != nullptr); + cursor->set_key(cursor, key); + cursor->set_value(cursor, value); + error_code = cursor->update(cursor); + + if (error_code == 0) { + debug_info("key/value update", _trace_level, DEBUG_INFO); + error_code = + tracking->save(tracking_operation::UPDATE, collection_name, key, value, ts); + } else + debug_info("key/value update failed", _trace_level, DEBUG_ERROR); + + return (error_code); } private: std::vector<std::string> _collection_names; thread_manager _thread_manager; - std::vector<thread_context *> _workers; + timestamp_manager *_timestamp_manager; workload_tracking *_tracking; + std::vector<thread_context *> _workers; }; } // namespace test_harness diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h index 12d543df68e..e8c7d748e79 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h @@ -49,7 +49,7 @@ namespace test_harness { /* Tracking operations. */ -enum class tracking_operation { CREATE, DELETE_COLLECTION, DELETE_KEY, INSERT }; +enum class tracking_operation { CREATE, DELETE_COLLECTION, DELETE_KEY, INSERT, UPDATE }; /* Class used to track operations performed on collections */ class workload_tracking : public component { @@ -57,13 +57,16 @@ class workload_tracking : public component { workload_tracking(configuration *_config, const std::string &operation_table_config, const std::string &operation_table_name, const std::string &schema_table_config, const std::string &schema_table_name) - : component(_config), _cursor_operations(nullptr), _cursor_schema(nullptr), + : component(_config), _cursor_operations(nullptr), _cursor_schema(nullptr), _enabled(false), _operation_table_config(operation_table_config), _operation_table_name(operation_table_name), _schema_table_config(schema_table_config), - _schema_table_name(schema_table_name), _timestamp(0U), _enabled(false) + _schema_table_name(schema_table_name) { } + /* Delete the copy constructor. */ + workload_tracking(const workload_tracking &) = delete; + const std::string & get_schema_table_name() const { @@ -110,7 +113,7 @@ class workload_tracking : public component { template <typename K, typename V> int save(const tracking_operation &operation, const std::string &collection_name, const K &key, - const V &value) + const V &value, wt_timestamp_t ts) { WT_CURSOR *cursor; int error_code = 0; @@ -123,13 +126,13 @@ class workload_tracking : public component { case tracking_operation::CREATE: case tracking_operation::DELETE_COLLECTION: cursor = _cursor_schema; - cursor->set_key(cursor, collection_name.c_str(), _timestamp++); + cursor->set_key(cursor, collection_name.c_str(), ts); cursor->set_value(cursor, static_cast<int>(operation)); break; default: cursor = _cursor_operations; - cursor->set_key(cursor, collection_name.c_str(), key, _timestamp++); + cursor->set_key(cursor, collection_name.c_str(), key, ts); cursor->set_value(cursor, static_cast<int>(operation), value); break; } @@ -145,14 +148,13 @@ class workload_tracking : public component { } private: + WT_CURSOR *_cursor_operations; + WT_CURSOR *_cursor_schema; + bool _enabled; const std::string _operation_table_config; const std::string _operation_table_name; const std::string _schema_table_config; const std::string _schema_table_name; - WT_CURSOR *_cursor_operations; - WT_CURSOR *_cursor_schema; - uint64_t _timestamp; - bool _enabled; }; } // namespace test_harness diff --git a/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx b/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx index 2d8cc319565..cf25e5e9a93 100755 --- a/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx +++ b/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx @@ -48,9 +48,10 @@ class poc_test : public test_harness::test { const std::string poc_test::test::name = "poc_test"; const std::string poc_test::test::default_config = - "collection_count=2,key_count=5,value_size=10," - "read_threads=1,duration_seconds=10,cache_size_mb=1000," - "stat_cache_size=(enabled=true,limit=100),rate_per_second=10,enable_tracking=true"; + "collection_count=2,key_count=5,value_size=10,read_threads=1,duration_seconds=10," + "cache_size_mb=1000,stat_cache_size=(enabled=true,limit=100),rate_per_second=10," + "enable_tracking=true,enable_timestamp=true,oldest_lag=1,stable_lag=1," + "min_operation_per_transaction=1,max_operation_per_transaction=1"; int main(int argc, char *argv[]) |