summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Chen <luke.chen@mongodb.com>2021-03-18 16:04:25 +1100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-18 05:26:55 +0000
commit50f5691a67c71f3cf969205037aff2c44f6160e2 (patch)
tree65735c6472fa15d7e2be116aa7ea78ee68ec94d1
parentfad9de4ece9f5399acc804f206e40621930be5af (diff)
downloadmongo-50f5691a67c71f3cf969205037aff2c44f6160e2.tar.gz
Import wiredtiger: 31624ffd8c6eb39794ddd6c41cb48db42a646a87 from branch mongodb-5.0
ref: 98b3599256..31624ffd8c for: 4.9.0 WT-7275 Add timestamp and transaction management to the test framework
-rw-r--r--src/third_party/wiredtiger/dist/test_data.py19
-rw-r--r--src/third_party/wiredtiger/import.data2
-rw-r--r--src/third_party/wiredtiger/src/config/test_config.c13
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h10
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/configuration.h4
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h15
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/random_generator.h11
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h2
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/test.h9
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/thread_context.h105
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h118
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h146
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h22
-rwxr-xr-xsrc/third_party/wiredtiger/test/cppsuite/tests/poc.cxx7
14 files changed, 413 insertions, 70 deletions
diff --git a/src/third_party/wiredtiger/dist/test_data.py b/src/third_party/wiredtiger/dist/test_data.py
index 15d3d2e7150..4f6aab5f5c7 100644
--- a/src/third_party/wiredtiger/dist/test_data.py
+++ b/src/third_party/wiredtiger/dist/test_data.py
@@ -106,6 +106,23 @@ runtime_monitor_config = throttle_config +[
type='category', subconfig=limit_stat)
]
+transaction_config = [
+ Config('min_operation_per_transaction', 1, r'''
+ The minimum number of operations per transaction''', min=1, max=200000),
+ Config('max_operation_per_transaction', 1, r'''
+ The maximum number of operations per transaction''', min=1, max=200000),
+]
+
+timestamp_config = [
+ Config('enable_timestamp', 'true', r'''
+ Enables timestamp management''', type='boolean'),
+ Config('oldest_lag', 0, r'''
+ The duration between the stable and oldest timestamps''', min=0, max=1000000),
+ Config('stable_lag', 0, r'''
+ The duration between the latest and stable timestamps''', min=0, max=1000000),
+]
+
methods = {
-'poc_test' : Method(load_config + workload_config + runtime_monitor_config + test_config),
+ 'poc_test' : Method(load_config + workload_config + runtime_monitor_config + transaction_config
+ + timestamp_config + test_config),
}
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data
index ef42cb5b1f1..c0278929937 100644
--- a/src/third_party/wiredtiger/import.data
+++ b/src/third_party/wiredtiger/import.data
@@ -2,5 +2,5 @@
"vendor": "wiredtiger",
"github": "wiredtiger/wiredtiger.git",
"branch": "mongodb-5.0",
- "commit": "98b3599256c8ce629ba632021edd0ab2b9d695d9"
+ "commit": "31624ffd8c6eb39794ddd6c41cb48db42a646a87"
}
diff --git a/src/third_party/wiredtiger/src/config/test_config.c b/src/third_party/wiredtiger/src/config/test_config.c
index 8012ea026c7..dd2264c32f9 100644
--- a/src/third_party/wiredtiger/src/config/test_config.c
+++ b/src/third_party/wiredtiger/src/config/test_config.c
@@ -10,13 +10,18 @@ static const WT_CONFIG_CHECK confchk_poc_test[] = {
{"cache_size_mb", "int", NULL, "min=0,max=100000000000", NULL, 0},
{"collection_count", "int", NULL, "min=0,max=200000", NULL, 0},
{"duration_seconds", "int", NULL, "min=0,max=1000000", NULL, 0},
+ {"enable_timestamp", "boolean", NULL, NULL, NULL, 0},
{"enable_tracking", "boolean", NULL, NULL, NULL, 0},
{"insert_config", "string", NULL, NULL, NULL, 0},
{"insert_threads", "int", NULL, "min=0,max=20", NULL, 0},
{"key_count", "int", NULL, "min=0,max=1000000", NULL, 0},
{"key_size", "int", NULL, "min=0,max=10000", NULL, 0},
+ {"max_operation_per_transaction", "int", NULL, "min=1,max=200000", NULL, 0},
+ {"min_operation_per_transaction", "int", NULL, "min=1,max=200000", NULL, 0},
+ {"oldest_lag", "int", NULL, "min=0,max=1000000", NULL, 0},
{"rate_per_second", "int", NULL, "min=1,max=1000", NULL, 0},
{"read_threads", "int", NULL, "min=0,max=100", NULL, 0},
+ {"stable_lag", "int", NULL, "min=0,max=1000000", NULL, 0},
{"stat_cache_size", "category", NULL, NULL, confchk_stat_cache_size_subconfigs, 2},
{"update_config", "string", NULL, NULL, NULL, 0},
{"update_threads", "int", NULL, "min=0,max=20", NULL, 0},
@@ -25,11 +30,13 @@ static const WT_CONFIG_CHECK confchk_poc_test[] = {
static const WT_CONFIG_ENTRY config_entries[] = {
{"poc_test",
"cache_size_mb=0,collection_count=1,duration_seconds=0,"
- "enable_tracking=true,insert_config=,insert_threads=0,key_count=0"
- ",key_size=0,rate_per_second=1,read_threads=0,"
+ "enable_timestamp=true,enable_tracking=true,insert_config=,"
+ "insert_threads=0,key_count=0,key_size=0,"
+ "max_operation_per_transaction=1,min_operation_per_transaction=1,"
+ "oldest_lag=0,rate_per_second=1,read_threads=0,stable_lag=0,"
"stat_cache_size=(enabled=false,limit=),update_config=,"
"update_threads=0,value_size=0",
- confchk_poc_test, 14},
+ confchk_poc_test, 19},
{NULL, NULL, NULL, 0}};
/*
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h
index 61f61c9a838..0ada02bdde3 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h
@@ -36,17 +36,25 @@ namespace test_harness {
static const char *CACHE_SIZE_MB = "cache_size_mb";
static const char *COLLECTION_COUNT = "collection_count";
static const char *DURATION_SECONDS = "duration_seconds";
-static const char *ENABLE_TRACKING = "enable_tracking";
static const char *ENABLED = "enabled";
+static const char *ENABLE_TIMESTAMP = "enable_timestamp";
+static const char *ENABLE_TRACKING = "enable_tracking";
static const char *KEY_COUNT = "key_count";
static const char *LIMIT = "limit";
+static const char *MAX_OPERATION_PER_TRANSACTION = "max_operation_per_transaction";
+static const char *MIN_OPERATION_PER_TRANSACTION = "min_operation_per_transaction";
+static const char *OLDEST_LAG = "oldest_lag";
static const char *RATE_PER_SECOND = "rate_per_second";
static const char *READ_THREADS = "read_threads";
+static const char *STABLE_LAG = "stable_lag";
static const char *STAT_CACHE_SIZE = "stat_cache_size";
static const char *VALUE_SIZE = "value_size";
/* WiredTiger API consts. */
+static const char *COMMIT_TS = "commit_timestamp";
static const char *CONNECTION_CREATE = "create";
+static const char *OLDEST_TS = "oldest_timestamp";
+static const char *STABLE_TS = "stable_timestamp";
/* Test harness consts. */
static const char *TABLE_OPERATION_TRACKING = "table:operation_tracking";
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration.h
index b0c40e3a6e3..2e38dc4da52 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration.h
@@ -26,8 +26,8 @@
* OTHER DEALINGS IN THE SOFTWARE.
*/
-#ifndef CONFIGURATION_SETTINGS_H
-#define CONFIGURATION_SETTINGS_H
+#ifndef CONFIGURATION_H
+#define CONFIGURATION_H
extern "C" {
#include "test_util.h"
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h
index f4d50e4778b..f3344ff4a30 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h
@@ -54,7 +54,7 @@ class connection_manager {
instance()
{
static connection_manager _instance;
- return _instance;
+ return (_instance);
}
void
@@ -96,7 +96,18 @@ class connection_manager {
testutil_check(_conn->open_session(_conn, NULL, NULL, &session));
_conn_mutex.unlock();
- return session;
+ return (session);
+ }
+
+ /*
+ * set_timestamp calls into the connection API in a thread safe manner to set global timestamps.
+ */
+ void
+ set_timestamp(const std::string &config)
+ {
+ _conn_mutex.lock();
+ testutil_check(_conn->set_timestamp(_conn, config.c_str()));
+ _conn_mutex.unlock();
}
private:
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/random_generator.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/random_generator.h
index ebcc0033a38..455d4173b86 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/random_generator.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/random_generator.h
@@ -33,7 +33,7 @@
namespace test_harness {
-/* Helper class to generate random values. */
+/* Helper class to generate random values using uniform distributions. */
class random_generator {
public:
/* No copies of the singleton allowed. */
@@ -47,6 +47,7 @@ class random_generator {
return _instance;
}
+ /* Generate a random string of a given length. */
std::string
generate_string(std::size_t length)
{
@@ -58,6 +59,14 @@ class random_generator {
return (random_string);
}
+ /* Generate a random integer between min and max. */
+ int64_t
+ generate_integer(int64_t min, int64_t max)
+ {
+ std::uniform_int_distribution<> dis(min, max);
+ return dis(_generator);
+ }
+
private:
random_generator()
{
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h
index cee7d3185eb..e1a4f3e4360 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h
@@ -141,7 +141,7 @@ class runtime_monitor : public component {
testutil_check(_config->get(STAT_CACHE_SIZE, &nested));
configuration sub_config = configuration(nested);
_stats.push_back(new cache_limit_statistic(&sub_config));
- _running = true;
+ component::load();
}
void
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h
index d6768ce7e48..4bcc4e92689 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h
@@ -61,7 +61,8 @@ class test {
_timestamp_manager = new timestamp_manager(_configuration);
_workload_tracking = new workload_tracking(_configuration, OPERATION_TRACKING_TABLE_CONFIG,
TABLE_OPERATION_TRACKING, SCHEMA_TRACKING_TABLE_CONFIG, TABLE_SCHEMA_TRACKING);
- _workload_generator = new workload_generator(_configuration, _workload_tracking);
+ _workload_generator =
+ new workload_generator(_configuration, _timestamp_manager, _workload_tracking);
_thread_manager = new thread_manager();
/*
* Ordering is not important here, any dependencies between components should be resolved
@@ -95,8 +96,7 @@ class test {
void
run()
{
- int64_t cache_size_mb = 100;
- int64_t duration_seconds = 0;
+ int64_t cache_size_mb = 100, duration_seconds = 0;
bool enable_tracking = false, is_success = true;
/* Build the database creation config string. */
@@ -118,6 +118,7 @@ class test {
/* Sleep duration seconds. */
testutil_check(_configuration->get_int(DURATION_SECONDS, duration_seconds));
+ testutil_assert(duration_seconds >= 0);
std::this_thread::sleep_for(std::chrono::seconds(duration_seconds));
/* End the test. */
@@ -175,8 +176,8 @@ class test {
std::vector<component *> _components;
configuration *_configuration;
runtime_monitor *_runtime_monitor;
- timestamp_manager *_timestamp_manager;
thread_manager *_thread_manager;
+ timestamp_manager *_timestamp_manager;
workload_generator *_workload_generator;
workload_tracking *_workload_tracking;
};
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/thread_context.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/thread_context.h
index 2613989f983..61ee7e01a88 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/thread_context.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/thread_context.h
@@ -29,6 +29,9 @@
#ifndef THREAD_CONTEXT_H
#define THREAD_CONTEXT_H
+#include "random_generator.h"
+#include "workload_tracking.h"
+
namespace test_harness {
/* Define the different thread operations. */
enum class thread_operation {
@@ -45,13 +48,16 @@ enum class thread_operation {
/* Container class for a thread and any data types it may need to interact with the database. */
class thread_context {
public:
- thread_context(std::vector<std::string> collection_names, thread_operation type)
- : _collection_names(collection_names), _running(false), _type(type)
+ thread_context(timestamp_manager *timestamp_manager, workload_tracking *tracking,
+ std::vector<std::string> collection_names, thread_operation type, int64_t max_op,
+ int64_t min_op, int64_t value_size)
+ : _collection_names(collection_names), _current_op_count(0U), _in_txn(false),
+ _running(false), _min_op(min_op), _max_op(max_op), _max_op_count(0),
+ _timestamp_manager(timestamp_manager), _type(type), _tracking(tracking),
+ _value_size(value_size)
{
}
- thread_context(thread_operation type) : _running(false), _type(type) {}
-
void
finish()
{
@@ -61,19 +67,31 @@ class thread_context {
const std::vector<std::string> &
get_collection_names() const
{
- return _collection_names;
+ return (_collection_names);
}
thread_operation
get_thread_operation() const
{
- return _type;
+ return (_type);
+ }
+
+ workload_tracking *
+ get_tracking() const
+ {
+ return (_tracking);
+ }
+
+ int64_t
+ get_value_size() const
+ {
+ return (_value_size);
}
bool
is_running() const
{
- return _running;
+ return (_running);
}
void
@@ -82,10 +100,81 @@ class thread_context {
_running = running;
}
+ void
+ begin_transaction(WT_SESSION *session, const std::string &config)
+ {
+ if (!_in_txn && _timestamp_manager->is_enabled()) {
+ testutil_check(
+ session->begin_transaction(session, config.empty() ? NULL : config.c_str()));
+ /* This randomizes the number of operations to be executed in one transaction. */
+ _max_op_count = random_generator::instance().generate_integer(_min_op, _max_op);
+ _current_op_count = 0;
+ _in_txn = true;
+ }
+ }
+
+ /* Returns true if the current transaction has been committed. */
+ bool
+ commit_transaction(WT_SESSION *session, const std::string &config)
+ {
+ if (!_timestamp_manager->is_enabled())
+ return (true);
+
+ /* A transaction cannot be committed if not started. */
+ testutil_assert(_in_txn);
+ /* The current transaction should be committed if:
+ * - The thread is done working. This is useful when the test is ended and the thread has
+ * not reached the maximum number of operations per transaction.
+ * - The number of operations executed in the current transaction has exceeded the
+ * threshold.
+ */
+ if (!_running || (++_current_op_count > _max_op_count)) {
+ testutil_check(
+ session->commit_transaction(session, config.empty() ? nullptr : config.c_str()));
+ _in_txn = false;
+ }
+
+ return (!_in_txn);
+ }
+
+ /*
+ * Set a commit timestamp if the timestamp manager is enabled and always return the timestamp
+ * that should have been used for the commit.
+ */
+ wt_timestamp_t
+ set_commit_timestamp(WT_SESSION *session)
+ {
+
+ wt_timestamp_t ts = _timestamp_manager->get_next_ts();
+ std::string config;
+
+ if (_timestamp_manager->is_enabled()) {
+ config = std::string(COMMIT_TS) + "=" + _timestamp_manager->decimal_to_hex(ts);
+ testutil_check(session->timestamp_transaction(session, config.c_str()));
+ }
+
+ return (ts);
+ }
+
private:
const std::vector<std::string> _collection_names;
- bool _running;
+ /*
+ * _current_op_count is the current number of operations that have been executed in the current
+ * transaction.
+ */
+ uint64_t _current_op_count;
+ bool _in_txn, _running;
+ /*
+ * _min_op and _max_op are the minimum and maximum number of operations within one transaction.
+ * _max_op_count is the current maximum number of operations that can be executed in the current
+ * transaction. _max_op_count will always be <= _max_op.
+ */
+ int64_t _min_op, _max_op, _max_op_count;
+ timestamp_manager *_timestamp_manager;
const thread_operation _type;
+ workload_tracking *_tracking;
+ /* Temporary member that comes from the test configuration. */
+ int64_t _value_size;
};
} // namespace test_harness
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h
index 10aa0482bf4..a1cf1ad8d1d 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h
@@ -30,23 +30,133 @@
#define TIMESTAMP_MANAGER_H
#include "component.h"
+#include <atomic>
+#include <chrono>
+#include <sstream>
+#include <thread>
namespace test_harness {
/*
* The timestamp monitor class manages global timestamp state for all components in the test
- * harness. It also manages the global timestamps within WiredTiger.
+ * harness. It also manages the global timestamps within WiredTiger. All timestamps are in seconds
+ * unless specified otherwise.
*/
class timestamp_manager : public component {
public:
- timestamp_manager(configuration *config) : component(config) {}
+ timestamp_manager(configuration *config)
+ : /* _periodic_update_s is hardcoded to 1 second for now. */
+ component(config), _increment_ts(0U), _is_enabled(false), _latest_ts(0U), _oldest_lag(0),
+ _oldest_ts(0U), _periodic_update_s(1), _stable_lag(0), _stable_ts(0U)
+ {
+ }
+
+ /* Delete the copy constructor. */
+ timestamp_manager(const timestamp_manager &) = delete;
+
+ void
+ load()
+ {
+ testutil_assert(_config != nullptr);
+ testutil_check(_config->get_int(OLDEST_LAG, _oldest_lag));
+ testutil_assert(_oldest_lag >= 0);
+ testutil_check(_config->get_int(STABLE_LAG, _stable_lag));
+ testutil_assert(_stable_lag >= 0);
+ testutil_check(_config->get_bool(ENABLE_TIMESTAMP, _is_enabled));
+ component::load();
+ }
void
run()
{
- while (_running) {
- /* Do something. */
+ std::string config;
+ /* latest_ts_s represents the time component of the latest timestamp provided. */
+ wt_timestamp_t latest_ts_s;
+
+ while (_is_enabled && _running) {
+ /* Timestamps are checked periodically. */
+ std::this_thread::sleep_for(std::chrono::seconds(_periodic_update_s));
+ latest_ts_s = (_latest_ts >> 32);
+ /*
+ * Keep a time window between the latest and stable ts less than the max defined in the
+ * configuration.
+ */
+ testutil_assert(latest_ts_s >= _stable_ts);
+ if ((latest_ts_s - _stable_ts) > _stable_lag) {
+ _stable_ts = latest_ts_s - _stable_lag;
+ config += std::string(STABLE_TS) + "=" + decimal_to_hex(_stable_ts);
+ }
+
+ /*
+ * Keep a time window between the stable and oldest ts less than the max defined in the
+ * configuration.
+ */
+ testutil_assert(_stable_ts > _oldest_ts);
+ if ((_stable_ts - _oldest_ts) > _oldest_lag) {
+ _oldest_ts = _stable_ts - _oldest_lag;
+ if (!config.empty())
+ config += ",";
+ config += std::string(OLDEST_TS) + "=" + decimal_to_hex(_oldest_ts);
+ }
+
+ /* Save the new timestamps. */
+ if (!config.empty()) {
+ connection_manager::instance().set_timestamp(config);
+ config = "";
+ }
}
}
+
+ bool
+ is_enabled() const
+ {
+ return _is_enabled;
+ }
+
+ /*
+ * Get a unique commit timestamp. The first 32 bits represent the epoch time in seconds. The
+ * last 32 bits represent an increment for uniqueness.
+ */
+ wt_timestamp_t
+ get_next_ts()
+ {
+ uint64_t current_time = get_time_now_s();
+ _increment_ts.fetch_add(1);
+
+ current_time = (current_time << 32) | (_increment_ts & 0x00000000FFFFFFFF);
+ _latest_ts = current_time;
+
+ return (_latest_ts);
+ }
+
+ static const std::string
+ decimal_to_hex(int64_t value)
+ {
+ std::stringstream ss;
+ ss << std::hex << value;
+ std::string res(ss.str());
+ return (res);
+ }
+
+ private:
+ uint64_t
+ get_time_now_s() const
+ {
+ auto now = std::chrono::system_clock::now().time_since_epoch();
+ uint64_t current_time_s =
+ static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(now).count());
+ return (current_time_s);
+ }
+
+ private:
+ bool _is_enabled;
+ const wt_timestamp_t _periodic_update_s;
+ std::atomic<wt_timestamp_t> _increment_ts;
+ wt_timestamp_t _latest_ts, _oldest_ts, _stable_ts;
+ /*
+ * _oldest_lag is the time window between the stable and oldest timestamps.
+ * _stable_lag is the time window between the latest and stable timestamps.
+ */
+ int64_t _oldest_lag, _stable_lag;
};
} // namespace test_harness
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h
index c5f81553d81..0b6cc1b2a84 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h
@@ -30,6 +30,7 @@
#define WORKLOAD_GENERATOR_H
#include <algorithm>
+#include <atomic>
#include <map>
#include "random_generator.h"
@@ -41,8 +42,9 @@ namespace test_harness {
*/
class workload_generator : public component {
public:
- workload_generator(configuration *configuration, workload_tracking *tracking)
- : component(configuration), _tracking(tracking)
+ workload_generator(configuration *configuration, timestamp_manager *timestamp_manager,
+ workload_tracking *tracking)
+ : component(configuration), _timestamp_manager(timestamp_manager), _tracking(tracking)
{
}
@@ -71,8 +73,10 @@ class workload_generator : public component {
{
WT_CURSOR *cursor;
WT_SESSION *session;
+ wt_timestamp_t ts;
int64_t collection_count, key_count, value_size;
- std::string collection_name, home;
+ std::string collection_name, config, generated_value, home;
+ bool ts_enabled = _timestamp_manager->is_enabled();
cursor = nullptr;
collection_count = key_count = value_size = 0;
@@ -85,7 +89,8 @@ class workload_generator : public component {
for (int i = 0; i < collection_count; ++i) {
collection_name = "table:collection" + std::to_string(i);
testutil_check(session->create(session, collection_name.c_str(), DEFAULT_TABLE_SCHEMA));
- testutil_check(_tracking->save(tracking_operation::CREATE, collection_name, 0, ""));
+ ts = _timestamp_manager->get_next_ts();
+ testutil_check(_tracking->save(tracking_operation::CREATE, collection_name, 0, "", ts));
_collection_names.push_back(collection_name);
}
debug_info(
@@ -94,17 +99,27 @@ class workload_generator : public component {
/* Open a cursor on each collection and use the configuration to insert key/value pairs. */
testutil_check(_config->get_int(KEY_COUNT, key_count));
testutil_check(_config->get_int(VALUE_SIZE, value_size));
+ testutil_assert(value_size >= 0);
for (const auto &collection_name : _collection_names) {
/* WiredTiger lets you open a cursor on a collection using the same pointer. When a
* session is closed, WiredTiger APIs close the cursors too. */
testutil_check(
session->open_cursor(session, collection_name.c_str(), NULL, NULL, &cursor));
for (size_t j = 0; j < key_count; ++j) {
- /* Generation of a random string value using the size defined in the test
- * configuration. */
- std::string generated_value =
+ /*
+ * Generation of a random string value using the size defined in the test
+ * configuration.
+ */
+ generated_value =
random_generator::random_generator::instance().generate_string(value_size);
- testutil_check(insert(cursor, collection_name, j + 1, generated_value.c_str()));
+ ts = _timestamp_manager->get_next_ts();
+ if (ts_enabled)
+ testutil_check(session->begin_transaction(session, ""));
+ testutil_check(insert(cursor, collection_name, j + 1, generated_value.c_str(), ts));
+ if (ts_enabled) {
+ config = std::string(COMMIT_TS) + "=" + _timestamp_manager->decimal_to_hex(ts);
+ testutil_check(session->commit_transaction(session, config.c_str()));
+ }
}
}
debug_info("Populate stage done", _trace_level, DEBUG_INFO);
@@ -114,18 +129,30 @@ class workload_generator : public component {
void
run()
{
- int64_t duration_seconds, read_threads;
-
- duration_seconds = read_threads = 0;
+ WT_SESSION *session = nullptr;
+ int64_t duration_seconds, read_threads, min_operation_per_transaction,
+ max_operation_per_transaction, value_size;
/* Populate the database. */
populate();
+ /* Retrieve useful parameters from the test configuration. */
testutil_check(_config->get_int(DURATION_SECONDS, duration_seconds));
+ testutil_assert(duration_seconds >= 0);
testutil_check(_config->get_int(READ_THREADS, read_threads));
+ testutil_check(
+ _config->get_int(MIN_OPERATION_PER_TRANSACTION, min_operation_per_transaction));
+ testutil_check(
+ _config->get_int(MAX_OPERATION_PER_TRANSACTION, max_operation_per_transaction));
+ testutil_assert(max_operation_per_transaction >= min_operation_per_transaction);
+ testutil_check(_config->get_int(VALUE_SIZE, value_size));
+ testutil_assert(value_size >= 0);
+
/* Generate threads to execute read operations on the collections. */
for (int i = 0; i < read_threads; ++i) {
- thread_context *tc = new thread_context(_collection_names, thread_operation::READ);
+ thread_context *tc = new thread_context(_timestamp_manager, _tracking,
+ _collection_names, thread_operation::READ, max_operation_per_transaction,
+ min_operation_per_transaction, value_size);
_workers.push_back(tc);
_thread_manager.add_thread(tc, &execute_operation);
}
@@ -134,11 +161,11 @@ class workload_generator : public component {
void
finish()
{
- debug_info("Workload generator stage done", _trace_level, DEBUG_INFO);
for (const auto &it : _workers) {
it->finish();
}
_thread_manager.join();
+ debug_info("Workload generator: run stage done", _trace_level, DEBUG_INFO);
}
/* Workload threaded operations. */
@@ -155,11 +182,13 @@ class workload_generator : public component {
break;
case thread_operation::REMOVE:
case thread_operation::INSERT:
- case thread_operation::UPDATE:
/* Sleep until it is implemented. */
while (context.is_running())
std::this_thread::sleep_for(std::chrono::seconds(1));
break;
+ case thread_operation::UPDATE:
+ update_operation(context, session);
+ break;
default:
testutil_die(DEBUG_ABORT, "system: thread_operation is unknown : %d",
static_cast<int>(context.get_thread_operation()));
@@ -167,6 +196,51 @@ class workload_generator : public component {
}
}
+ /*
+ * Basic update operation that currently update the same key with a random value in each
+ * collection.
+ */
+ static void
+ update_operation(thread_context &context, WT_SESSION *session)
+ {
+ WT_CURSOR *cursor;
+ WT_DECL_RET;
+ wt_timestamp_t ts;
+ std::vector<WT_CURSOR *> cursors;
+ std::vector<std::string> collection_names;
+ std::string generated_value;
+ bool has_committed = true;
+ int64_t cpt, value_size = context.get_value_size();
+
+ testutil_assert(session != nullptr);
+ /* Get a cursor for each collection in collection_names. */
+ for (const auto &it : context.get_collection_names()) {
+ testutil_check(session->open_cursor(session, it.c_str(), NULL, NULL, &cursor));
+ cursors.push_back(cursor);
+ collection_names.push_back(it);
+ }
+
+ while (context.is_running()) {
+ /* Walk each cursor. */
+ context.begin_transaction(session, "");
+ ts = context.set_commit_timestamp(session);
+ cpt = 0;
+ for (const auto &it : cursors) {
+ generated_value =
+ random_generator::random_generator::instance().generate_string(value_size);
+ /* Key is hard coded for now. */
+ testutil_check(update(context.get_tracking(), it, collection_names[cpt], 1,
+ generated_value.c_str(), ts));
+ ++cpt;
+ }
+ has_committed = context.commit_transaction(session, "");
+ }
+
+ /* Make sure the last operation is committed now the work is finished. */
+ if (!has_committed)
+ context.commit_transaction(session, "");
+ }
+
/* Basic read operation that walks a cursors across all collections. */
static void
read_operation(thread_context &context, WT_SESSION *session)
@@ -175,6 +249,7 @@ class workload_generator : public component {
WT_DECL_RET;
std::vector<WT_CURSOR *> cursors;
+ testutil_assert(session != nullptr);
/* Get a cursor for each collection in collection_names. */
for (const auto &it : context.get_collection_names()) {
testutil_check(session->open_cursor(session, it.c_str(), NULL, NULL, &cursor));
@@ -193,55 +268,68 @@ class workload_generator : public component {
/* WiredTiger APIs wrappers for single operations. */
template <typename K, typename V>
int
- insert(WT_CURSOR *cursor, const std::string &collection_name, K key, V value)
+ insert(WT_CURSOR *cursor, const std::string &collection_name, K key, V value, wt_timestamp_t ts)
{
int error_code;
- if (cursor == nullptr)
- throw std::invalid_argument("Failed to call insert, invalid cursor");
-
+ testutil_assert(cursor != nullptr);
cursor->set_key(cursor, key);
cursor->set_value(cursor, value);
error_code = cursor->insert(cursor);
if (error_code == 0) {
debug_info("key/value inserted", _trace_level, DEBUG_INFO);
- error_code = _tracking->save(tracking_operation::INSERT, collection_name, key, value);
+ error_code =
+ _tracking->save(tracking_operation::INSERT, collection_name, key, value, ts);
} else
debug_info("key/value insertion failed", _trace_level, DEBUG_ERROR);
- return error_code;
+ return (error_code);
}
static int
search(WT_CURSOR *cursor)
{
- if (cursor == nullptr)
- throw std::invalid_argument("Failed to call search, invalid cursor");
+ testutil_assert(cursor != nullptr);
return (cursor->search(cursor));
}
static int
search_near(WT_CURSOR *cursor, int *exact)
{
- if (cursor == nullptr)
- throw std::invalid_argument("Failed to call search_near, invalid cursor");
+ testutil_assert(cursor != nullptr);
return (cursor->search_near(cursor, exact));
}
+ template <typename K, typename V>
static int
- update(WT_CURSOR *cursor)
+ update(workload_tracking *tracking, WT_CURSOR *cursor, const std::string &collection_name,
+ K key, V value, wt_timestamp_t ts)
{
- if (cursor == nullptr)
- throw std::invalid_argument("Failed to call update, invalid cursor");
- return (cursor->update(cursor));
+ int error_code;
+
+ testutil_assert(tracking != nullptr);
+ testutil_assert(cursor != nullptr);
+ cursor->set_key(cursor, key);
+ cursor->set_value(cursor, value);
+ error_code = cursor->update(cursor);
+
+ if (error_code == 0) {
+ debug_info("key/value update", _trace_level, DEBUG_INFO);
+ error_code =
+ tracking->save(tracking_operation::UPDATE, collection_name, key, value, ts);
+ } else
+ debug_info("key/value update failed", _trace_level, DEBUG_ERROR);
+
+ return (error_code);
}
private:
std::vector<std::string> _collection_names;
thread_manager _thread_manager;
- std::vector<thread_context *> _workers;
+ timestamp_manager *_timestamp_manager;
workload_tracking *_tracking;
+ std::vector<thread_context *> _workers;
};
} // namespace test_harness
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h
index 12d543df68e..e8c7d748e79 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h
@@ -49,7 +49,7 @@
namespace test_harness {
/* Tracking operations. */
-enum class tracking_operation { CREATE, DELETE_COLLECTION, DELETE_KEY, INSERT };
+enum class tracking_operation { CREATE, DELETE_COLLECTION, DELETE_KEY, INSERT, UPDATE };
/* Class used to track operations performed on collections */
class workload_tracking : public component {
@@ -57,13 +57,16 @@ class workload_tracking : public component {
workload_tracking(configuration *_config, const std::string &operation_table_config,
const std::string &operation_table_name, const std::string &schema_table_config,
const std::string &schema_table_name)
- : component(_config), _cursor_operations(nullptr), _cursor_schema(nullptr),
+ : component(_config), _cursor_operations(nullptr), _cursor_schema(nullptr), _enabled(false),
_operation_table_config(operation_table_config),
_operation_table_name(operation_table_name), _schema_table_config(schema_table_config),
- _schema_table_name(schema_table_name), _timestamp(0U), _enabled(false)
+ _schema_table_name(schema_table_name)
{
}
+ /* Delete the copy constructor. */
+ workload_tracking(const workload_tracking &) = delete;
+
const std::string &
get_schema_table_name() const
{
@@ -110,7 +113,7 @@ class workload_tracking : public component {
template <typename K, typename V>
int
save(const tracking_operation &operation, const std::string &collection_name, const K &key,
- const V &value)
+ const V &value, wt_timestamp_t ts)
{
WT_CURSOR *cursor;
int error_code = 0;
@@ -123,13 +126,13 @@ class workload_tracking : public component {
case tracking_operation::CREATE:
case tracking_operation::DELETE_COLLECTION:
cursor = _cursor_schema;
- cursor->set_key(cursor, collection_name.c_str(), _timestamp++);
+ cursor->set_key(cursor, collection_name.c_str(), ts);
cursor->set_value(cursor, static_cast<int>(operation));
break;
default:
cursor = _cursor_operations;
- cursor->set_key(cursor, collection_name.c_str(), key, _timestamp++);
+ cursor->set_key(cursor, collection_name.c_str(), key, ts);
cursor->set_value(cursor, static_cast<int>(operation), value);
break;
}
@@ -145,14 +148,13 @@ class workload_tracking : public component {
}
private:
+ WT_CURSOR *_cursor_operations;
+ WT_CURSOR *_cursor_schema;
+ bool _enabled;
const std::string _operation_table_config;
const std::string _operation_table_name;
const std::string _schema_table_config;
const std::string _schema_table_name;
- WT_CURSOR *_cursor_operations;
- WT_CURSOR *_cursor_schema;
- uint64_t _timestamp;
- bool _enabled;
};
} // namespace test_harness
diff --git a/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx b/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx
index 2d8cc319565..cf25e5e9a93 100755
--- a/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx
+++ b/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx
@@ -48,9 +48,10 @@ class poc_test : public test_harness::test {
const std::string poc_test::test::name = "poc_test";
const std::string poc_test::test::default_config =
- "collection_count=2,key_count=5,value_size=10,"
- "read_threads=1,duration_seconds=10,cache_size_mb=1000,"
- "stat_cache_size=(enabled=true,limit=100),rate_per_second=10,enable_tracking=true";
+ "collection_count=2,key_count=5,value_size=10,read_threads=1,duration_seconds=10,"
+ "cache_size_mb=1000,stat_cache_size=(enabled=true,limit=100),rate_per_second=10,"
+ "enable_tracking=true,enable_timestamp=true,oldest_lag=1,stable_lag=1,"
+ "min_operation_per_transaction=1,max_operation_per_transaction=1";
int
main(int argc, char *argv[])