summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/test
diff options
context:
space:
mode:
authorLuke Chen <luke.chen@mongodb.com>2021-06-11 16:14:43 +1000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-11 06:40:00 +0000
commit82ccd6bca77e4cbcefe3c5ce26f507b6dd3dcc18 (patch)
tree9716869319c3f7049114c1301af55e351e8d096d /src/third_party/wiredtiger/test
parentcbdc41e8cfa00acf42fe594e7f541b144e655050 (diff)
downloadmongo-82ccd6bca77e4cbcefe3c5ce26f507b6dd3dcc18.tar.gz
Import wiredtiger: 65035cf84e7090a120d5d0ceb703c6fd2fc9f504 from branch mongodb-5.0
ref: bfb4d5ee40..65035cf84e for: 5.0.0-rc2 WT-7383 Add framework for new hs_cleanup test and refactor workload generator and database operation
Diffstat (limited to 'src/third_party/wiredtiger/test')
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/configs/config_hs_cleanup_default.txt4
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/configs/config_poc_test_stress.txt62
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/test.h24
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h8
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/util/api_const.h7
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/util/debug_utils.h7
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_model.h51
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_operation.h349
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/workload/random_generator.h7
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.h60
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h87
-rw-r--r--src/third_party/wiredtiger/test/cppsuite/tests/hs_cleanup.cxx58
-rwxr-xr-xsrc/third_party/wiredtiger/test/cppsuite/tests/run.cxx8
13 files changed, 549 insertions, 183 deletions
diff --git a/src/third_party/wiredtiger/test/cppsuite/configs/config_hs_cleanup_default.txt b/src/third_party/wiredtiger/test/cppsuite/configs/config_hs_cleanup_default.txt
new file mode 100644
index 00000000000..f46526e554e
--- /dev/null
+++ b/src/third_party/wiredtiger/test/cppsuite/configs/config_hs_cleanup_default.txt
@@ -0,0 +1,4 @@
+# Configuration for hs_cleanup.
+# need to be defined.
+duration_seconds=5,
+cache_size_mb=250
diff --git a/src/third_party/wiredtiger/test/cppsuite/configs/config_poc_test_stress.txt b/src/third_party/wiredtiger/test/cppsuite/configs/config_poc_test_stress.txt
index 34b6f9b89fe..deccbc85a62 100644
--- a/src/third_party/wiredtiger/test/cppsuite/configs/config_poc_test_stress.txt
+++ b/src/third_party/wiredtiger/test/cppsuite/configs/config_poc_test_stress.txt
@@ -1,13 +1,16 @@
-# Sets up a basic database with 2 collections and 50000 keys and run thread for 10 seconds.
-# All components are enabled.
# Used as a stress test for the framework.
-duration_seconds=10,
-cache_size_mb=5000,
+duration_seconds=600,
+cache_size_mb=2000,
enable_logging=true,
+statistics_config=
+(
+ type=all,
+ enable_logging=true
+),
checkpoint_manager=
(
enabled=true,
- op_rate=5s
+ op_rate=30s
),
runtime_monitor=
(
@@ -17,17 +20,48 @@ runtime_monitor=
limit=100
)
),
+timestamp_manager=
+(
+ enabled=true,
+ oldest_lag=10,
+ op_rate=1s,
+ stable_lag=10
+),
workload_generator=
(
- collection_count=2,
- key_count=50000,
- key_size=10,
- ops_per_transaction=
+ populate_config=
+ (
+ collection_count=200,
+ key_count_per_collection=5000,
+ key_size=100,
+ thread_count=20,
+ value_size=2000
+ ),
+ insert_config=
+ (
+ key_size=100,
+ op_rate=1s,
+ ops_per_transaction=(max=100,min=0),
+ thread_count=5,
+ value_size=2000
+ ),
+ read_config=
(
- min=5,
- max=50
+ op_rate=1s,
+ ops_per_transaction=(max=2000,min=100),
+ thread_count=10
),
- read_threads=1,
- update_threads=1,
- value_size=2000
+ update_config=
+ (
+ key_size=100,
+ op_rate=15ms,
+ ops_per_transaction=(max=100,min=20),
+ thread_count=10,
+ value_size=2000
+ )
),
+# Tracking is currently disabled as verification can't handle rollbacks.
+workload_tracking=
+(
+ enabled=false
+)
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h
index 2434704f6f9..b1c011a1d67 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h
@@ -106,14 +106,28 @@ class test : public database_operation {
run()
{
int64_t cache_size_mb, duration_seconds;
- bool enable_logging;
-
+ bool enable_logging, statistics_logging;
+ configuration *statistics_config;
+ std::string statistics_type;
/* Build the database creation config string. */
std::string db_create_config = CONNECTION_CREATE;
- /* Get the cache size, and turn logging on or off. */
+ /* Get the cache size. */
cache_size_mb = _config->get_int(CACHE_SIZE_MB);
- db_create_config += ",statistics=(fast),cache_size=" + std::to_string(cache_size_mb) + "MB";
+
+ /* Get the statistics configuration for this run. */
+ statistics_config = _config->get_subconfig(STATISTICS_CONFIG);
+ statistics_type = statistics_config->get_string(TYPE);
+ statistics_logging = statistics_config->get_bool(ENABLE_LOGGING);
+
+ /* Don't forget to delete. */
+ delete statistics_config;
+
+ db_create_config += ",statistics=(" + statistics_type + ")";
+ db_create_config += statistics_logging ? "," + std::string(STATISTICS_LOG) : "";
+ db_create_config += ",cache_size=" + std::to_string(cache_size_mb) + "MB";
+
+ /* Enable or disable write ahead logging. */
enable_logging = _config->get_bool(ENABLE_LOGGING);
db_create_config += ",log=(enabled=" + std::string(enable_logging ? "true" : "false") + ")";
@@ -135,6 +149,8 @@ class test : public database_operation {
/* The test will run for the duration as defined in the config. */
duration_seconds = _config->get_int(DURATION_SECONDS);
testutil_assert(duration_seconds >= 0);
+ debug_print("Waiting {" + std::to_string(duration_seconds) + "} for testing to complete.",
+ DEBUG_INFO);
std::this_thread::sleep_for(std::chrono::seconds(duration_seconds));
/* End the test by calling finish on all known components. */
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h
index adc5b734c05..24f0e3e416d 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h
@@ -65,14 +65,15 @@ class timestamp_manager : public component {
wt_timestamp_t latest_ts_s;
/* Timestamps are checked periodically. */
- latest_ts_s = (_latest_ts >> 32);
+ latest_ts_s = (get_next_ts() >> 32);
/*
* Keep a time window between the latest and stable ts less than the max defined in the
* configuration.
*/
testutil_assert(latest_ts_s >= _stable_ts);
if ((latest_ts_s - _stable_ts) > _stable_lag) {
- _stable_ts = latest_ts_s - _stable_lag;
+ debug_print("Timestamp_manager: Stable timestamp expired.", DEBUG_INFO);
+ _stable_ts = latest_ts_s;
config += std::string(STABLE_TS) + "=" + decimal_to_hex(_stable_ts);
}
@@ -82,6 +83,7 @@ class timestamp_manager : public component {
*/
testutil_assert(_stable_ts >= _oldest_ts);
if ((_stable_ts - _oldest_ts) > _oldest_lag) {
+ debug_print("Timestamp_manager: Oldest timestamp expired.", DEBUG_INFO);
_oldest_ts = _stable_ts - _oldest_lag;
if (!config.empty())
config += ",";
@@ -131,7 +133,7 @@ class timestamp_manager : public component {
}
private:
- std::atomic<wt_timestamp_t> _increment_ts;
+ std::atomic<wt_timestamp_t> _increment_ts{0};
wt_timestamp_t _latest_ts = 0U, _oldest_ts = 0U, _stable_ts = 0U;
/*
* _oldest_lag is the time window between the stable and oldest timestamps.
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/util/api_const.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/util/api_const.h
index 146e98e38d6..92e7f080101 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/util/api_const.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/util/api_const.h
@@ -45,9 +45,8 @@ static const char *COLLECTION_COUNT = "collection_count";
static const char *DURATION_SECONDS = "duration_seconds";
static const char *ENABLED = "enabled";
static const char *ENABLE_LOGGING = "enable_logging";
-static const char *INTERVAL = "interval";
static const char *INSERT_CONFIG = "insert_config";
-static const char *KEY_COUNT = "key_count";
+static const char *KEY_COUNT_PER_COLLECTION = "key_count_per_collection";
static const char *KEY_SIZE = "key_size";
static const char *LIMIT = "limit";
static const char *MAX = "max";
@@ -55,11 +54,14 @@ static const char *MIN = "min";
static const char *OLDEST_LAG = "oldest_lag";
static const char *OP_RATE = "op_rate";
static const char *OPS_PER_TRANSACTION = "ops_per_transaction";
+static const char *POPULATE_CONFIG = "populate_config";
static const char *READ_CONFIG = "read_config";
static const char *STABLE_LAG = "stable_lag";
static const char *STAT_CACHE_SIZE = "stat_cache_size";
static const char *STAT_DB_SIZE = "stat_db_size";
+static const char *STATISTICS_CONFIG = "statistics_config";
static const char *THREAD_COUNT = "thread_count";
+static const char *TYPE = "type";
static const char *UPDATE_CONFIG = "update_config";
static const char *VALUE_SIZE = "value_size";
@@ -68,6 +70,7 @@ static const char *COMMIT_TS = "commit_timestamp";
static const char *CONNECTION_CREATE = "create";
static const char *OLDEST_TS = "oldest_timestamp";
static const char *STABLE_TS = "stable_timestamp";
+static const char *STATISTICS_LOG = "statistics_log=(json,wait=1)";
/* Test harness consts. */
static const char *DEFAULT_FRAMEWORK_SCHEMA = "key_format=S,value_format=S";
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/util/debug_utils.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/util/debug_utils.h
index a2694f6987c..9fac3ce527c 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/util/debug_utils.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/util/debug_utils.h
@@ -35,10 +35,11 @@
namespace test_harness {
#define DEBUG_ERROR 0
-#define DEBUG_INFO 1
-#define DEBUG_TRACE 2
+#define DEBUG_WARN 1
+#define DEBUG_INFO 2
+#define DEBUG_TRACE 3
-static int64_t _trace_level = 0;
+static int64_t _trace_level = DEBUG_WARN;
/* Used to print out traces for debugging purpose. */
static void
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_model.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_model.h
index c9562954bfd..f6b8b152f22 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_model.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_model.h
@@ -29,6 +29,7 @@
#ifndef DATABASE_MODEL_H
#define DATABASE_MODEL_H
+#include <atomic>
#include <map>
#include <string>
@@ -56,6 +57,41 @@ struct collection_t {
/* Representation of the collections in memory. */
class database {
public:
+ /*
+ * Add a new collection following the standard naming pattern. Currently this is the only way to
+ * add collections which is supported by all components.
+ */
+ std::string
+ add_collection()
+ {
+ std::lock_guard<std::mutex> lg(_mtx);
+ std::string collection_name = build_collection_name(_next_collection_id);
+ _collections[collection_name] = {};
+ ++_next_collection_id;
+ return (collection_name);
+ }
+
+ /*
+ * Retrieve the current collection count, collection names are indexed from 0 so when using this
+ * take care to avoid an off by one error.
+ */
+ uint64_t
+ get_collection_count() const
+ {
+ return (_next_collection_id);
+ }
+
+ /*
+ * Get a single collection name by id.
+ */
+ std::string
+ get_collection_name(uint64_t id)
+ {
+ if (_next_collection_id <= id)
+ testutil_die(id, "requested the id, %lu, of a collection that doesn't exist", id);
+ return (build_collection_name(id));
+ }
+
std::vector<std::string>
get_collection_names()
{
@@ -75,14 +111,6 @@ class database {
return (_collections.at(collection_name).keys);
}
- void
- add_collection(const std::string &collection_name)
- {
- std::lock_guard<std::mutex> lg(_mtx);
- testutil_assert(_collections.find(collection_name) == _collections.end());
- _collections[collection_name] = {};
- }
-
value_t
get_record(const std::string &collection_name, const char *key)
{
@@ -119,6 +147,13 @@ class database {
}
private:
+ /* Take a const id, not a reference as we're copying in an atomic. */
+ std::string
+ build_collection_name(const uint64_t id)
+ {
+ return (std::string("table:collection_" + std::to_string(id)));
+ }
+ std::atomic<uint64_t> _next_collection_id{0};
std::map<std::string, collection_t> _collections;
std::mutex _mtx;
};
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_operation.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_operation.h
index 007d39da345..274173a099f 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_operation.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_operation.h
@@ -29,9 +29,14 @@
#ifndef DATABASE_OPERATION_H
#define DATABASE_OPERATION_H
+#include <map>
+#include <thread>
+
#include "database_model.h"
#include "workload_tracking.h"
#include "thread_context.h"
+#include "random_generator.h"
+#include "../thread_manager.h"
namespace test_harness {
class database_operation {
@@ -49,80 +54,84 @@ class database_operation {
* - Store in memory the created collections.
*/
virtual void
- populate(database &database, timestamp_manager *timestamp_manager, configuration *config,
+ populate(database &database, timestamp_manager *tsm, configuration *config,
workload_tracking *tracking)
{
- WT_CURSOR *cursor;
WT_SESSION *session;
- wt_timestamp_t ts;
- int64_t collection_count, key_count, key_cpt, key_size, value_size;
- std::string collection_name, cfg, home;
- key_value_t generated_key, generated_value;
- bool ts_enabled = timestamp_manager->enabled();
-
- cursor = nullptr;
- collection_count = key_count = key_size = value_size = 0;
+ std::vector<std::string> collection_names;
+ int64_t collection_count, key_count, key_size, thread_count, value_size;
+ std::string collection_name;
+ thread_manager tm;
/* Get a session. */
session = connection_manager::instance().create_session();
- /* Create n collections as per the configuration and store each collection name. */
- collection_count = config->get_int(COLLECTION_COUNT);
- for (size_t i = 0; i < collection_count; ++i) {
- collection_name = "table:collection" + std::to_string(i);
- database.add_collection(collection_name);
- testutil_check(
- session->create(session, collection_name.c_str(), DEFAULT_FRAMEWORK_SCHEMA));
- ts = timestamp_manager->get_next_ts();
- tracking->save_schema_operation(
- tracking_operation::CREATE_COLLECTION, collection_name, ts);
- }
- debug_print(std::to_string(collection_count) + " collections created", DEBUG_TRACE);
- /* Open a cursor on each collection and use the configuration to insert key/value pairs. */
- key_count = config->get_int(KEY_COUNT);
+ /* Get our configuration values, validating that they make sense. */
+ collection_count = config->get_int(COLLECTION_COUNT);
+ key_count = config->get_int(KEY_COUNT_PER_COLLECTION);
value_size = config->get_int(VALUE_SIZE);
+ thread_count = config->get_int(THREAD_COUNT);
+ testutil_assert(collection_count % thread_count == 0);
testutil_assert(value_size > 0);
key_size = config->get_int(KEY_SIZE);
testutil_assert(key_size > 0);
+
/* Keys must be unique. */
testutil_assert(key_count <= pow(10, key_size));
- for (const auto &collection_name : database.get_collection_names()) {
- key_cpt = 0;
- /*
- * WiredTiger lets you open a cursor on a collection using the same pointer. When a
- * session is closed, WiredTiger APIs close the cursors too.
- */
+ /* Create n collections as per the configuration and store each collection name. */
+ for (int64_t i = 0; i < collection_count; ++i) {
+ /* FIXME-T-F: Should we just give collection creation power to the database? */
+ collection_name = database.add_collection();
testutil_check(
- session->open_cursor(session, collection_name.c_str(), NULL, NULL, &cursor));
- for (size_t i = 0; i < key_count; ++i) {
- /* Generation of a unique key. */
- generated_key = number_to_string(key_size, key_cpt);
- ++key_cpt;
- /*
- * Generation of a random string value using the size defined in the test
- * configuration.
- */
- generated_value =
- random_generator::random_generator::instance().generate_string(value_size);
- ts = timestamp_manager->get_next_ts();
- if (ts_enabled)
- testutil_check(session->begin_transaction(session, ""));
- insert(cursor, tracking, collection_name, generated_key.c_str(),
- generated_value.c_str(), ts);
- if (ts_enabled) {
- cfg = std::string(COMMIT_TS) + "=" + timestamp_manager->decimal_to_hex(ts);
- testutil_check(session->commit_transaction(session, cfg.c_str()));
- }
+ session->create(session, collection_name.c_str(), DEFAULT_FRAMEWORK_SCHEMA));
+ tracking->save_schema_operation(
+ tracking_operation::CREATE_COLLECTION, collection_name, tsm->get_next_ts());
+ collection_names.push_back(collection_name);
+ }
+ debug_print(
+ "Populate: " + std::to_string(collection_count) + " collections created.", DEBUG_INFO);
+
+ /*
+ * Spawn thread_count threads to populate the database, theoretically we should be IO bound
+ * here.
+ */
+ for (int64_t i = 0; i < thread_count; ++i) {
+ int64_t collections_per_thread = collection_count / thread_count;
+ std::vector<std::string> thread_collections;
+ for (size_t j = i * collections_per_thread;
+ j < i * collections_per_thread + collections_per_thread; j++) {
+ debug_print("Populate: adding collection: " + collection_names[j] + " to thread " +
+ std::to_string(i),
+ DEBUG_TRACE);
+ thread_collections.push_back(collection_names[j]);
}
+ tm.add_thread(populate_worker, i, thread_collections,
+ connection_manager::instance().create_session(), tsm, tracking, key_count, key_size,
+ value_size);
}
- debug_print("Populate stage done", DEBUG_TRACE);
+
+ /* Wait for our populate threads to finish and then join them. */
+ debug_print("Populate: waiting for threads to complete.", DEBUG_INFO);
+ tm.join();
+
+ debug_print("Populate: finished.", DEBUG_INFO);
+ }
+
+ /* Basic insert operation that adds a new key every rate tick. */
+ virtual void
+ insert_operation(thread_context *tc)
+ {
+ debug_print(type_string(tc->type) + " thread {" + std::to_string(tc->id) + "} commencing.",
+ DEBUG_INFO);
}
/* Basic read operation that walks a cursors across all collections. */
virtual void
read_operation(thread_context *tc)
{
+ debug_print(type_string(tc->type) + " thread {" + std::to_string(tc->id) + "} commencing.",
+ DEBUG_INFO);
WT_CURSOR *cursor;
std::vector<WT_CURSOR *> cursors;
@@ -130,83 +139,146 @@ class database_operation {
for (const auto &it : tc->database.get_collection_names()) {
testutil_check(tc->session->open_cursor(tc->session, it.c_str(), NULL, NULL, &cursor));
cursors.push_back(cursor);
+ debug_print("Adding collection to read thread: " + it, DEBUG_TRACE);
}
- while (!cursors.empty() && tc->running()) {
+ while (tc->running()) {
/* Walk each cursor. */
for (const auto &it : cursors) {
if (it->next(it) != 0)
it->reset(it);
}
+ tc->sleep();
}
}
/*
- * Basic update operation that updates all the keys to a random value in each collection.
+ * Basic update operation that uses a random cursor to update values in a randomly chosen
+ * collection.
*/
virtual void
update_operation(thread_context *tc)
{
- WT_CURSOR *cursor;
+ debug_print(type_string(tc->type) + " thread {" + std::to_string(tc->id) + "} commencing.",
+ DEBUG_INFO);
+
+ /* A structure that's used to track which cursors we've opened for which collection. */
+ struct collection_cursors {
+ const std::string collection_name;
+ WT_CURSOR *random_cursor;
+ WT_CURSOR *update_cursor;
+ };
+
WT_DECL_RET;
wt_timestamp_t ts;
- std::vector<WT_CURSOR *> cursors;
- std::vector<std::string> collection_names = tc->database.get_collection_names();
+ std::map<uint64_t, collection_cursors> collections;
key_value_t key, generated_value;
+ std::string collection_name;
const char *key_tmp;
- uint64_t i = 0;
+ uint64_t collection_count = 0, collection_id = 0;
bool using_timestamps = tc->timestamp_manager->enabled();
- /* Get a cursor for each collection in collection_names. */
- for (const auto &it : collection_names) {
- testutil_check(tc->session->open_cursor(tc->session, it.c_str(), NULL, NULL, &cursor));
- cursors.push_back(cursor);
- }
-
/*
- * Update each collection while the test is running.
+ * Loop while the test is running.
*/
- while (tc->running() && !collection_names.empty()) {
- if (i >= collection_names.size())
- i = 0;
- ret = cursors[i]->next(cursors[i]);
- /* If we have reached the end of the collection, reset. */
- if (ret == WT_NOTFOUND) {
- testutil_check(cursors[i]->reset(cursors[i]));
- ++i;
- } else if (ret != 0)
- /* Stop updating in case of an error. */
- testutil_die(DEBUG_ERROR, "update_operation: cursor->next() failed: %d", ret);
- else {
- testutil_check(cursors[i]->get_key(cursors[i], &key_tmp));
+ while (tc->running()) {
+ /*
+ * Sleep the period defined by the op_rate in the configuration. Do this at the start of
+ * the loop as it could be skipped by a subsequent continue call.
+ */
+ tc->sleep();
+
+ /* Pick a random collection to update if there are any, taking care to subtract -1. */
+ collection_count = tc->database.get_collection_count();
+ if (collection_count == 0)
+ continue;
+
+ collection_id =
+ random_generator::instance().generate_integer<uint64_t>(0, collection_count - 1);
+
+ if (collections.find(collection_id) == collections.end()) {
+ WT_CURSOR *random_cursor = nullptr, *update_cursor = nullptr;
+ /* Retrieve the collection name associated with our id. */
+ collection_name = std::move(tc->database.get_collection_name(collection_id));
+ debug_print("Thread {" + std::to_string(tc->id) +
+ "} Creating cursor for collection: " + collection_name,
+ DEBUG_TRACE);
+
+ /* Open a random cursor for that collection. */
+ tc->session->open_cursor(tc->session, collection_name.c_str(), nullptr,
+ "next_random=true", &random_cursor);
/*
- * The retrieved key needs to be passed inside the update function. However, the
- * update API doesn't guarantee our buffer will still be valid once it is called, as
- * such we copy the buffer and then pass it into the API.
+ * We can't call update on a random cursor so we open two cursors here, one to do
+ * the randomized next and one to subsequently update the key.
*/
- key = key_value_t(key_tmp);
- generated_value =
- random_generator::random_generator::instance().generate_string(tc->value_size);
+ tc->session->open_cursor(
+ tc->session, collection_name.c_str(), nullptr, nullptr, &update_cursor);
- /* Start a transaction. */
- if (!tc->transaction.active())
- tc->transaction.begin(tc->session, "");
+ collections.emplace(
+ collection_id, collection_cursors{collection_name, random_cursor, update_cursor});
+ }
- ts = tc->timestamp_manager->get_next_ts();
- if (using_timestamps)
- tc->transaction.set_commit_timestamp(
- tc->session, timestamp_manager::decimal_to_hex(ts));
+ /* Start a transaction. */
+ if (!tc->transaction.active())
+ tc->transaction.begin(tc->session, "");
- update(tc->tracking, cursors[i], collection_names[i], key.c_str(),
- generated_value.c_str(), ts);
+ /* Get the random cursor associated with the collection. */
+ auto collection = collections[collection_id];
+ /* Call next to pick a new random record. */
+ ret = collection.random_cursor->next(collection.random_cursor);
+ if (ret == WT_NOTFOUND)
+ continue;
+ else if (ret != 0)
+ testutil_die(ret, "unhandled error returned by cursor->next()");
- /* Commit the current transaction. */
- tc->transaction.op_count++;
- if (tc->transaction.can_commit())
- tc->transaction.commit(tc->session, "");
- }
- tc->sleep();
+ /* Get the record's key. */
+ testutil_check(collection.random_cursor->get_key(collection.random_cursor, &key_tmp));
+
+ /*
+ * The retrieved key needs to be passed inside the update function. However, the update
+ * API doesn't guarantee our buffer will still be valid once it is called, as such we
+ * copy the buffer and then pass it into the API.
+ */
+ key = key_value_t(key_tmp);
+
+ /* Generate a new value for the record. */
+ generated_value =
+ random_generator::random_generator::instance().generate_string(tc->value_size);
+
+ /*
+ * Get a timestamp to apply to the update. We still do this even if timestamps aren't
+ * enabled as it will return WT_TS_NONE, which is then inserted into the tracking table.
+ */
+ ts = tc->timestamp_manager->get_next_ts();
+ if (using_timestamps)
+ tc->transaction.set_commit_timestamp(
+ tc->session, timestamp_manager::decimal_to_hex(ts));
+
+ /*
+ * Update the record but take care to handle WT_ROLLBACK as we may conflict with another
+ * running transaction. Here we call the pre-defined wrappers as they also update the
+ * tracking table, which is later used for validation.
+ *
+ * Additionally first get the update_cursor.
+ */
+ ret = update(tc->tracking, collection.update_cursor, collection.collection_name,
+ key.c_str(), generated_value.c_str(), ts);
+
+ /* Increment the current op count for the current transaction. */
+ tc->transaction.op_count++;
+
+ /*
+ * If the wiredtiger API has returned rollback, comply. This will need to rollback
+ * tracking table operations in the future but currently won't.
+ */
+ if (ret == WT_ROLLBACK)
+ tc->transaction.rollback(tc->session, "");
+
+ /* Commit the current transaction if we're able to. */
+ if (tc->transaction.can_commit())
+ tc->transaction.commit(tc->session, "");
}
+
/* Make sure the last operation is committed now the work is finished. */
if (tc->transaction.active())
tc->transaction.commit(tc->session, "");
@@ -215,34 +287,52 @@ class database_operation {
private:
/* WiredTiger APIs wrappers for single operations. */
template <typename K, typename V>
- void
+ static int
insert(WT_CURSOR *cursor, workload_tracking *tracking, const std::string &collection_name,
const K &key, const V &value, wt_timestamp_t ts)
{
+ WT_DECL_RET;
testutil_assert(cursor != nullptr);
cursor->set_key(cursor, key);
cursor->set_value(cursor, value);
- testutil_check(cursor->insert(cursor));
- debug_print("key/value inserted", DEBUG_TRACE);
+ ret = cursor->insert(cursor);
+ if (ret != 0) {
+ if (ret == WT_ROLLBACK)
+ return (ret);
+ else
+ testutil_die(ret, "unhandled error while trying to insert a key.");
+ }
+
+ debug_print("key/value inserted", DEBUG_TRACE);
tracking->save_operation(tracking_operation::INSERT, collection_name, key, value, ts);
+ return (0);
}
template <typename K, typename V>
- static void
+ static int
update(workload_tracking *tracking, WT_CURSOR *cursor, const std::string &collection_name,
K key, V value, wt_timestamp_t ts)
{
+ WT_DECL_RET;
testutil_assert(tracking != nullptr);
testutil_assert(cursor != nullptr);
cursor->set_key(cursor, key);
cursor->set_value(cursor, value);
- testutil_check(cursor->update(cursor));
- debug_print("key/value updated", DEBUG_TRACE);
+ ret = cursor->update(cursor);
+ if (ret != 0) {
+ if (ret == WT_ROLLBACK)
+ return (ret);
+ else
+ testutil_die(ret, "unhandled error while trying to update a key");
+ }
+
+ debug_print("key/value updated", DEBUG_TRACE);
tracking->save_operation(tracking_operation::UPDATE, collection_name, key, value, ts);
+ return (0);
}
/*
@@ -259,6 +349,57 @@ class database_operation {
str = s.append(value_str);
return (str);
}
+
+ private:
+ static void
+ populate_worker(uint64_t worker_id, std::vector<std::string> collections, WT_SESSION *session,
+ timestamp_manager *tsm, workload_tracking *tracking, int64_t key_count, int64_t key_size,
+ int64_t value_size)
+ {
+ WT_DECL_RET;
+ WT_CURSOR *cursor;
+ std::string cfg;
+ wt_timestamp_t ts;
+ key_value_t generated_key, generated_value;
+
+ for (const auto &next_collection : collections) {
+ /*
+ * WiredTiger lets you open a cursor on a collection using the same pointer. When a
+ * session is closed, WiredTiger APIs close the cursors too.
+ */
+ testutil_check(
+ session->open_cursor(session, next_collection.c_str(), NULL, NULL, &cursor));
+ for (uint64_t i = 0; i < key_count; ++i) {
+ /* Generation of a unique key. */
+ generated_key = number_to_string(key_size, i);
+ /*
+ * Generation of a random string value using the size defined in the test
+ * configuration.
+ */
+ generated_value =
+ random_generator::random_generator::instance().generate_string(value_size);
+ ts = tsm->get_next_ts();
+
+ /* Start a txn. */
+ testutil_check(session->begin_transaction(session, nullptr));
+
+ ret = insert(cursor, tracking, next_collection, generated_key.c_str(),
+ generated_value.c_str(), ts);
+
+ /* This may require some sort of "stuck" mechanism but for now is fine. */
+ if (ret == WT_ROLLBACK)
+ testutil_die(-1, "Got a rollback in populate, this is currently not handled.");
+
+ if (tsm->enabled())
+ cfg = std::string(COMMIT_TS) + "=" + timestamp_manager::decimal_to_hex(ts);
+ else
+ cfg = "";
+
+ testutil_check(session->commit_transaction(session, cfg.c_str()));
+ }
+ }
+ debug_print("Populate: thread {" + std::to_string(worker_id) + "} finished", DEBUG_TRACE);
+ }
};
} // namespace test_harness
#endif
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/random_generator.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/random_generator.h
index 7df4d7da3fb..dc32de8e61a 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/random_generator.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/random_generator.h
@@ -60,10 +60,11 @@ class random_generator {
}
/* Generate a random integer between min and max. */
- int64_t
- generate_integer(int64_t min, int64_t max)
+ template <typename T>
+ T
+ generate_integer(T min, T max)
{
- std::uniform_int_distribution<> dis(min, max);
+ std::uniform_int_distribution<T> dis(min, max);
return dis(_generator);
}
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.h
index 63291ac9756..7f51cc90e47 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.h
@@ -52,6 +52,22 @@ class transaction_context {
return (_in_txn);
}
+ /* Begin a transaction. */
+ void
+ begin(WT_SESSION *session, const std::string &config)
+ {
+ if (!_in_txn) {
+ testutil_check(
+ session->begin_transaction(session, config.empty() ? nullptr : config.c_str()));
+ /* This randomizes the number of operations to be executed in one transaction. */
+ _target_op_count =
+ random_generator::instance().generate_integer<int64_t>(_min_op_count, _max_op_count);
+ op_count = 0;
+ _in_txn = true;
+ } else
+ testutil_die(EINVAL, "Begin called on a currently running transaction.");
+ }
+
/*
* The current transaction can be committed if: A transaction has started and the number of
* operations executed in the current transaction has exceeded the threshold.
@@ -69,21 +85,18 @@ class transaction_context {
testutil_assert(_in_txn);
testutil_check(
session->commit_transaction(session, config.empty() ? nullptr : config.c_str()));
+ op_count = 0;
_in_txn = false;
}
void
- begin(WT_SESSION *session, const std::string &config)
+ rollback(WT_SESSION *session, const std::string &config)
{
- if (!_in_txn) {
- testutil_check(
- session->begin_transaction(session, config.empty() ? nullptr : config.c_str()));
- /* This randomizes the number of operations to be executed in one transaction. */
- _target_op_count =
- random_generator::instance().generate_integer(_min_op_count, _max_op_count);
- op_count = 0;
- _in_txn = true;
- }
+ testutil_assert(_in_txn);
+ testutil_check(
+ session->rollback_transaction(session, config.empty() ? nullptr : config.c_str()));
+ op_count = 0;
+ _in_txn = false;
}
/*
@@ -114,13 +127,30 @@ class transaction_context {
bool _in_txn = false;
};
+enum thread_type { READ, INSERT, UPDATE };
+
+static std::string
+type_string(thread_type type)
+{
+ switch (type) {
+ case thread_type::INSERT:
+ return ("insert");
+ case thread_type::READ:
+ return ("read");
+ case thread_type::UPDATE:
+ return ("update");
+ default:
+ testutil_die(EINVAL, "unexpected thread_type: %d", static_cast<int>(type));
+ }
+}
+
/* Container class for a thread and any data types it may need to interact with the database. */
class thread_context {
public:
- thread_context(configuration *config, timestamp_manager *timestamp_manager,
- workload_tracking *tracking, database &db)
- : database(db), timestamp_manager(timestamp_manager), tracking(tracking),
- transaction(transaction_context(config))
+ thread_context(uint64_t id, thread_type type, configuration *config,
+ timestamp_manager *timestamp_manager, workload_tracking *tracking, database &db)
+ : id(id), type(type), database(db), timestamp_manager(timestamp_manager),
+ tracking(tracking), transaction(transaction_context(config))
{
session = connection_manager::instance().create_session();
_throttle = throttle(config);
@@ -157,6 +187,8 @@ class thread_context {
test_harness::database &database;
int64_t key_size = 0;
int64_t value_size = 0;
+ const uint64_t id;
+ const thread_type type;
private:
throttle _throttle;
diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h
index 1980cf6ac6c..10d6f4b9aa6 100644
--- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h
+++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h
@@ -31,6 +31,7 @@
#include <algorithm>
#include <atomic>
+#include <functional>
#include <map>
#include "core/throttle.h"
@@ -41,6 +42,37 @@
namespace test_harness {
/*
+ * Helper class to enable scalable operation types in the database_operation.
+ */
+class operation_config {
+ public:
+ operation_config(configuration *config, thread_type type)
+ : config(config), type(type), thread_count(config->get_int(THREAD_COUNT))
+ {
+ }
+
+ /* Returns a function pointer to the member function of the supplied database operation. */
+ std::function<void(test_harness::thread_context *)>
+ get_func(database_operation *dbo)
+ {
+ switch (type) {
+ case thread_type::INSERT:
+ return (std::bind(&database_operation::insert_operation, dbo, std::placeholders::_1));
+ case thread_type::READ:
+ return (std::bind(&database_operation::read_operation, dbo, std::placeholders::_1));
+ case thread_type::UPDATE:
+ return (std::bind(&database_operation::update_operation, dbo, std::placeholders::_1));
+ default:
+ /* This may cause a separate testutil_die in type_string but that should be okay. */
+ testutil_die(EINVAL, "unexpected thread_type: %s", type_string(type).c_str());
+ }
+ }
+ configuration *config;
+ const thread_type type;
+ const int64_t thread_count;
+};
+
+/*
* Class that can execute operations based on a given configuration.
*/
class workload_generator : public component {
@@ -67,38 +99,41 @@ class workload_generator : public component {
void
run() override final
{
- configuration *read_config, *update_config, *insert_config;
+ configuration *populate_config;
+ std::vector<operation_config> operation_configs;
+ uint64_t thread_id = 0;
+
+ /* Retrieve useful parameters from the test configuration. */
+ operation_configs.push_back(
+ operation_config(_config->get_subconfig(INSERT_CONFIG), thread_type::INSERT));
+ operation_configs.push_back(
+ operation_config(_config->get_subconfig(READ_CONFIG), thread_type::READ));
+ operation_configs.push_back(
+ operation_config(_config->get_subconfig(UPDATE_CONFIG), thread_type::UPDATE));
+ populate_config = _config->get_subconfig(POPULATE_CONFIG);
/* Populate the database. */
- _database_operation->populate(_database, _timestamp_manager, _config, _tracking);
+ _database_operation->populate(_database, _timestamp_manager, populate_config, _tracking);
_db_populated = true;
-
- /* Retrieve useful parameters from the test configuration. */
- update_config = _config->get_subconfig(UPDATE_CONFIG);
- insert_config = _config->get_subconfig(INSERT_CONFIG);
- read_config = _config->get_subconfig(READ_CONFIG);
+ delete populate_config;
/* Generate threads to execute read operations on the collections. */
- for (size_t i = 0; i < read_config->get_int(THREAD_COUNT) && _running; ++i) {
- thread_context *tc =
- new thread_context(read_config, _timestamp_manager, _tracking, _database);
- _workers.push_back(tc);
- _thread_manager.add_thread(
- &database_operation::read_operation, _database_operation, tc);
- }
-
- /* Generate threads to execute update operations on the collections. */
- for (size_t i = 0; i < update_config->get_int(THREAD_COUNT) && _running; ++i) {
- thread_context *tc =
- new thread_context(update_config, _timestamp_manager, _tracking, _database);
- _workers.push_back(tc);
- _thread_manager.add_thread(
- &database_operation::update_operation, _database_operation, tc);
+ for (auto &it : operation_configs) {
+ debug_print("Workload_generator: Creating " + std::to_string(it.thread_count) + " " +
+ type_string(it.type) + " threads.",
+ DEBUG_INFO);
+ for (size_t i = 0; i < it.thread_count && _running; ++i) {
+ thread_context *tc = new thread_context(
+ thread_id++, it.type, it.config, _timestamp_manager, _tracking, _database);
+ _workers.push_back(tc);
+ _thread_manager.add_thread(it.get_func(_database_operation), tc);
+ }
+ /*
+ * Don't forget to delete the config we created earlier. While we do pass the config
+ * into the thread context it is not saved, so we are safe to do this.
+ */
+ delete it.config;
}
-
- delete read_config;
- delete update_config;
- delete insert_config;
}
void
diff --git a/src/third_party/wiredtiger/test/cppsuite/tests/hs_cleanup.cxx b/src/third_party/wiredtiger/test/cppsuite/tests/hs_cleanup.cxx
new file mode 100644
index 00000000000..178ae35df09
--- /dev/null
+++ b/src/third_party/wiredtiger/test/cppsuite/tests/hs_cleanup.cxx
@@ -0,0 +1,58 @@
+/*-
+ * Public Domain 2014-present MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include "test_harness/test.h"
+
+/*
+ * Class that defines operations that do nothing as an example.
+ * This shows how database operations can be overriden and customized.
+ */
+class hs_cleanup : public test_harness::test {
+ public:
+ hs_cleanup(const std::string &config, const std::string &name) : test(config, name) {}
+
+ void
+ populate(test_harness::database &database, test_harness::timestamp_manager *_timestamp_manager,
+ test_harness::configuration *_config, test_harness::workload_tracking *tracking)
+ override final
+ {
+ std::cout << "populate: nothing done." << std::endl;
+ }
+
+ void
+ read_operation(test_harness::thread_context *context) override final
+ {
+ std::cout << "read_operation: nothing done." << std::endl;
+ }
+
+ void
+ update_operation(test_harness::thread_context *context) override final
+ {
+ std::cout << "update_operation: nothing done." << std::endl;
+ }
+};
diff --git a/src/third_party/wiredtiger/test/cppsuite/tests/run.cxx b/src/third_party/wiredtiger/test/cppsuite/tests/run.cxx
index ae0d55fb39b..8a7acfee2bd 100755
--- a/src/third_party/wiredtiger/test/cppsuite/tests/run.cxx
+++ b/src/third_party/wiredtiger/test/cppsuite/tests/run.cxx
@@ -34,6 +34,7 @@
#include "test_harness/test.h"
#include "example_test.cxx"
+#include "hs_cleanup.cxx"
#include "poc_test.cxx"
std::string
@@ -87,7 +88,8 @@ print_help()
std::cout << "\t-h Output a usage message and exit." << std::endl;
std::cout << "\t-C Configuration. Cannot be used with -f." << std::endl;
std::cout << "\t-f File that contains the configuration. Cannot be used with -C." << std::endl;
- std::cout << "\t-l Trace level from 0 (default) to 2." << std::endl;
+ std::cout << "\t-l Trace level from 0 to 3. "
+ "1 is the default level, all warnings and errors are logged." << std::endl;
std::cout << "\t-t Test name to be executed." << std::endl;
}
@@ -114,6 +116,8 @@ run_test(const std::string &test_name, const std::string &config)
poc_test(config, test_name).run();
else if (test_name == "example_test")
example_test(config, test_name).run();
+ else if (test_name == "hs_cleanup")
+ hs_cleanup(config, test_name).run();
else {
test_harness::debug_print("Test not found: " + test_name, DEBUG_ERROR);
error_code = -1;
@@ -130,7 +134,7 @@ main(int argc, char *argv[])
{
std::string cfg, config_filename, test_name, current_test_name;
int64_t error_code = 0;
- const std::vector<std::string> all_tests = {"example_test", "poc_test"};
+ const std::vector<std::string> all_tests = {"example_test", "hs_cleanup", "poc_test"};
/* Set the program name for error messages. */
(void)testutil_set_progname(argv);