From f7f28d5da8d685796ba7634460287268dfd524a4 Mon Sep 17 00:00:00 2001 From: Etienne Petrel Date: Tue, 21 Sep 2021 04:36:31 +0000 Subject: Import wiredtiger: 7813885d2cf3b975d03bfcb7499defb16cae26fc from branch mongodb-master ref: b97d0cdf23..7813885d2c for: 5.1.0 WT-8121 Create a long running stress test which inserts a large amount of data over a long period --- src/third_party/wiredtiger/dist/test_data.py | 5 +- src/third_party/wiredtiger/import.data | 2 +- .../wiredtiger/src/config/test_config.c | 33 +++++ .../cppsuite/configs/burst_inserts_default.txt | 4 + .../test/cppsuite/configs/burst_inserts_stress.txt | 41 ++++++ .../wiredtiger/test/cppsuite/test_harness/test.h | 4 +- .../test_harness/workload/database_operation.cxx | 47 +++--- .../test_harness/workload/thread_context.cxx | 28 ++-- .../test_harness/workload/thread_context.h | 8 +- .../test/cppsuite/tests/burst_inserts.cxx | 162 +++++++++++++++++++++ .../wiredtiger/test/cppsuite/tests/run.cxx | 6 +- 11 files changed, 296 insertions(+), 44 deletions(-) create mode 100644 src/third_party/wiredtiger/test/cppsuite/configs/burst_inserts_default.txt create mode 100644 src/third_party/wiredtiger/test/cppsuite/configs/burst_inserts_stress.txt create mode 100644 src/third_party/wiredtiger/test/cppsuite/tests/burst_inserts.cxx diff --git a/src/third_party/wiredtiger/dist/test_data.py b/src/third_party/wiredtiger/dist/test_data.py index 62789154113..c074877056a 100644 --- a/src/third_party/wiredtiger/dist/test_data.py +++ b/src/third_party/wiredtiger/dist/test_data.py @@ -214,7 +214,10 @@ test_config = [ ] methods = { + 'base_test' : Method(test_config), + 'burst_inserts' : Method(test_config + [ + Config("burst_duration", 90, r''' + How long the insertions will occur for.''')]), 'example_test' : Method(test_config), 'hs_cleanup' : Method(test_config), - 'base_test' : Method(test_config), } diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index bf8e1e6be95..5e10fd648f9 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-master", - "commit": "b97d0cdf23ad071e6dc02902293942ec3f9d07a3" + "commit": "7813885d2cf3b975d03bfcb7499defb16cae26fc" } diff --git a/src/third_party/wiredtiger/src/config/test_config.c b/src/third_party/wiredtiger/src/config/test_config.c index 135cf6f01a2..30bf6d8a3c3 100644 --- a/src/third_party/wiredtiger/src/config/test_config.c +++ b/src/third_party/wiredtiger/src/config/test_config.c @@ -85,6 +85,20 @@ static const WT_CONFIG_CHECK confchk_base_test[] = { {"workload_tracking", "category", NULL, NULL, confchk_workload_tracking_subconfigs, 2}, {NULL, NULL, NULL, NULL, NULL, 0}}; +static const WT_CONFIG_CHECK confchk_burst_inserts[] = { + {"burst_duration", "string", NULL, NULL, NULL, 0}, + {"cache_size_mb", "int", NULL, "min=0,max=100000000000", NULL, 0}, + {"checkpoint_manager", "category", NULL, NULL, confchk_checkpoint_manager_subconfigs, 2}, + {"compression_enabled", "boolean", NULL, NULL, NULL, 0}, + {"duration_seconds", "int", NULL, "min=0,max=1000000", NULL, 0}, + {"enable_logging", "boolean", NULL, NULL, NULL, 0}, + {"runtime_monitor", "category", NULL, NULL, confchk_runtime_monitor_subconfigs, 5}, + {"statistics_config", "category", NULL, NULL, confchk_statistics_config_subconfigs, 2}, + {"timestamp_manager", "category", NULL, NULL, confchk_timestamp_manager_subconfigs, 4}, + {"workload_generator", "category", NULL, NULL, confchk_workload_generator_subconfigs, 6}, + {"workload_tracking", "category", NULL, NULL, confchk_workload_tracking_subconfigs, 2}, + {NULL, NULL, NULL, NULL, NULL, 0}}; + static const WT_CONFIG_CHECK confchk_example_test[] = { {"cache_size_mb", "int", NULL, "min=0,max=100000000000", NULL, 0}, {"checkpoint_manager", "category", NULL, NULL, confchk_checkpoint_manager_subconfigs, 2}, @@ -130,6 +144,25 @@ static const WT_CONFIG_ENTRY config_entries[] = { "min=0),thread_count=0,value_size=5))," "workload_tracking=(enabled=true,op_rate=1s)", confchk_base_test, 10}, + {"burst_inserts", + "burst_duration=90,cache_size_mb=0," + "checkpoint_manager=(enabled=false,op_rate=1s)," + "compression_enabled=false,duration_seconds=0," + "enable_logging=false,runtime_monitor=(enabled=true,op_rate=1s," + "postrun_statistics=[],stat_cache_size=(enabled=false,limit=0)," + "stat_db_size=(enabled=false,limit=0))," + "statistics_config=(enable_logging=true,type=all)," + "timestamp_manager=(enabled=true,oldest_lag=1,op_rate=1s," + "stable_lag=1),workload_generator=(enabled=true," + "insert_config=(key_size=5,op_rate=1s,ops_per_transaction=(max=1," + "min=0),thread_count=0,value_size=5),op_rate=1s," + "populate_config=(collection_count=1,key_count_per_collection=0," + "key_size=5,thread_count=1,value_size=5),read_config=(op_rate=1s," + "ops_per_transaction=(max=1,min=0),thread_count=0)," + "update_config=(key_size=5,op_rate=1s,ops_per_transaction=(max=1," + "min=0),thread_count=0,value_size=5))," + "workload_tracking=(enabled=true,op_rate=1s)", + confchk_burst_inserts, 11}, {"example_test", "cache_size_mb=0,checkpoint_manager=(enabled=false,op_rate=1s)," "compression_enabled=false,duration_seconds=0," diff --git a/src/third_party/wiredtiger/test/cppsuite/configs/burst_inserts_default.txt b/src/third_party/wiredtiger/test/cppsuite/configs/burst_inserts_default.txt new file mode 100644 index 00000000000..1f8efba08a3 --- /dev/null +++ b/src/third_party/wiredtiger/test/cppsuite/configs/burst_inserts_default.txt @@ -0,0 +1,4 @@ +# Configuration for burst_inserts. +# need to be defined. +duration_seconds=5, +cache_size_mb=250 diff --git a/src/third_party/wiredtiger/test/cppsuite/configs/burst_inserts_stress.txt b/src/third_party/wiredtiger/test/cppsuite/configs/burst_inserts_stress.txt new file mode 100644 index 00000000000..7dabeb9af7b --- /dev/null +++ b/src/third_party/wiredtiger/test/cppsuite/configs/burst_inserts_stress.txt @@ -0,0 +1,41 @@ +# Used as a stress test for the framework. +duration_seconds=14400, +cache_size_mb=2000, +checkpoint_manager= +( + enabled=true, + op_rate=60s +), +runtime_monitor= +( + enabled=false +), +timestamp_manager= +( + enabled=true, + oldest_lag=10, + stable_lag=10 +), +workload_generator= +( + populate_config= + ( + collection_count=160, + key_count_per_collection=500, + key_size=200, + thread_count=40, + value_size=20000 + ), + insert_config= + ( + key_size=200, + op_rate=10s, + ops_per_transaction=(max=2000,min=1000), + thread_count=40, + value_size=20000 + ) +), +workload_tracking= +( + enabled=false, +) diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h index 875255e5969..8c5e2d17434 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h @@ -80,10 +80,12 @@ class test : public database_operation { timestamp_manager *get_timestamp_manager(); thread_manager *get_thread_manager(); + protected: + configuration *_config; + private: const test_args &_args; std::vector _components; - configuration *_config; checkpoint_manager *_checkpoint_manager = nullptr; runtime_monitor *_runtime_monitor = nullptr; thread_manager *_thread_manager = nullptr; diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_operation.cxx b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_operation.cxx index fc1f39dee23..2f85cab681e 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_operation.cxx +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/database_operation.cxx @@ -49,16 +49,16 @@ populate_worker(thread_context *tc) * is closed, WiredTiger APIs close the cursors too. */ scoped_cursor cursor = tc->session.open_scoped_cursor(coll.name.c_str()); - for (uint64_t i = 0; i < tc->key_count; ++i) { - /* Start a txn. */ + uint64_t j = 0; + while (j < tc->key_count) { tc->transaction.begin(); - if (tc->insert(cursor, coll.id, i)) { - /* We failed to insert, rollback our transaction and retry. */ + if (tc->insert(cursor, coll.id, j)) { + if (tc->transaction.commit()) { + ++j; + } + } else { tc->transaction.rollback(); - --i; - continue; } - tc->transaction.commit(); } } logger::log_msg(LOG_TRACE, "Populate: thread {" + std::to_string(tc->id) + "} finished"); @@ -161,35 +161,35 @@ database_operation::insert_operation(thread_context *tc) /* Collection cursor. */ auto &cc = ccv[counter]; while (tc->transaction.active() && tc->running()) { - /* Insert a key value pair. */ - bool rollback_required = tc->insert(cc.cursor, cc.coll.id, start_key + added_count); - if (!rollback_required) { + /* Insert a key value pair, rolling back the transaction if required. */ + if (!tc->insert(cc.cursor, cc.coll.id, start_key + added_count)) { + added_count = 0; + tc->transaction.rollback(); + } else { added_count++; if (tc->transaction.can_commit()) { - rollback_required = tc->transaction.commit(); - if (!rollback_required) + if (tc->transaction.commit()) { /* * We need to inform the database model that we've added these keys as some * other thread may rely on the key_count data. Only do so if we * successfully committed. */ cc.coll.increase_key_count(added_count); + } else { + added_count = 0; + } } } - if (rollback_required) { - added_count = 0; - tc->transaction.rollback(); - } - /* Sleep the duration defined by the op_rate. */ tc->sleep(); } /* Reset our cursor to avoid pinning content. */ testutil_check(cc.cursor->reset(cc.cursor.get())); counter++; - if (counter >= collections_per_thread) + if (counter == collections_per_thread) counter = 0; + testutil_assert(counter < collections_per_thread); } /* Make sure the last transaction is rolled back now the work is finished. */ if (tc->transaction.active()) @@ -278,17 +278,16 @@ database_operation::update_operation(thread_context *tc) /* Choose a random key to update. */ uint64_t key_id = random_generator::instance().generate_integer(0, coll.get_key_count() - 1); - bool rollback_required = tc->update(cursor, coll.id, tc->key_to_string(key_id)); + if (!tc->update(cursor, coll.id, tc->key_to_string(key_id))) { + tc->transaction.rollback(); + } /* Reset our cursor to avoid pinning content. */ testutil_check(cursor->reset(cursor.get())); /* Commit the current transaction if we're able to. */ - if (!rollback_required && tc->transaction.can_commit()) - rollback_required = tc->transaction.commit(); - - if (rollback_required) - tc->transaction.rollback(); + if (tc->transaction.can_commit()) + tc->transaction.commit(); } /* Make sure the last operation is rolled back now the work is finished. */ diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.cxx b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.cxx index 6216f8f25b5..f087d45cd06 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.cxx +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.cxx @@ -29,6 +29,7 @@ #include "../core/configuration.h" #include "../timestamp_manager.h" #include "../util/api_const.h" +#include "../util/logger.h" #include "workload_tracking.h" #include "random_generator.h" #include "thread_context.h" @@ -81,22 +82,23 @@ transaction_context::try_begin(const std::string &config) begin(config); } -/* It's possible to receive rollback in commit which is handled internally. */ +/* + * It's possible to receive rollback in commit, when this happens the API will rollback the + * transaction internally. + */ bool transaction_context::commit(const std::string &config) { WT_DECL_RET; - testutil_assert(_in_txn); + testutil_assert(_in_txn && !_needs_rollback); if ((ret = _session->commit_transaction(_session, config.empty() ? nullptr : config.c_str())) != 0) { logger::log_msg(LOG_WARN, "Failed to commit transaction in commit, received error code: " + std::to_string(ret)); - _needs_rollback = true; - } else { - _op_count = 0; - _in_txn = false; } - return (_needs_rollback); + _op_count = 0; + _in_txn = false; + return (ret == 0); } void @@ -200,7 +202,7 @@ thread_context::update(scoped_cursor &cursor, uint64_t collection_id, const std: if (ret != 0) { if (ret == WT_ROLLBACK) { transaction.set_needs_rollback(true); - return (true); + return (false); } else testutil_die(ret, "unhandled error while trying to update a key"); } @@ -209,13 +211,13 @@ thread_context::update(scoped_cursor &cursor, uint64_t collection_id, const std: if (ret != 0) { if (ret == WT_ROLLBACK) { transaction.set_needs_rollback(true); - return (true); + return (false); } else testutil_die( ret, "unhandled error while trying to save an update to the tracking table"); } transaction.add_op(); - return (false); + return (true); } bool @@ -250,7 +252,7 @@ thread_context::insert( if (ret != 0) { if (ret == WT_ROLLBACK) { transaction.set_needs_rollback(true); - return (true); + return (false); } else testutil_die(ret, "unhandled error while trying to insert a key"); } @@ -259,13 +261,13 @@ thread_context::insert( if (ret != 0) { if (ret == WT_ROLLBACK) { transaction.set_needs_rollback(true); - return (true); + return (false); } else testutil_die( ret, "unhandled error while trying to save an insert to the tracking table"); } transaction.add_op(); - return (false); + return (true); } void diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.h index e530ce9e111..61e1b99f28a 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload/thread_context.h @@ -72,7 +72,7 @@ class transaction_context { /* Begin a transaction if we are not currently in one. */ void try_begin(const std::string &config = ""); /* - * Commit a transaction and return true if a rollback is required. + * Commit a transaction and return true if the commit was successful. */ bool commit(const std::string &config = ""); /* Rollback a transaction, failure will abort the test. */ @@ -136,7 +136,8 @@ class thread_context { /* * Generic update function, takes a collection_id and key, will generate the value. * - * Returns true if a rollback is required. + * Return true if the operation was successful, a return value of false implies the transaction + * needs to be rolled back. */ bool update(scoped_cursor &cursor, uint64_t collection_id, const std::string &key); @@ -144,7 +145,8 @@ class thread_context { * Generic insert function, takes a collection_id and key_id, will generate the value. If a * timestamp is not specified, the timestamp manager will generate one. * - * Returns true if a rollback is required. + * Return true if the operation was successful, a return value of false implies the transaction + * needs to be rolled back. */ bool insert( scoped_cursor &cursor, uint64_t collection_id, uint64_t key_id, wt_timestamp_t ts = 0); diff --git a/src/third_party/wiredtiger/test/cppsuite/tests/burst_inserts.cxx b/src/third_party/wiredtiger/test/cppsuite/tests/burst_inserts.cxx new file mode 100644 index 00000000000..9d5318e7acb --- /dev/null +++ b/src/third_party/wiredtiger/test/cppsuite/tests/burst_inserts.cxx @@ -0,0 +1,162 @@ +/*- + * Public Domain 2014-present MongoDB, Inc. + * Public Domain 2008-2014 WiredTiger, Inc. + * + * This is free and unencumbered software released into the public domain. + * + * Anyone is free to copy, modify, publish, use, compile, sell, or + * distribute this software, either in source code form or as a compiled + * binary, for any purpose, commercial or non-commercial, and by any + * means. + * + * In jurisdictions that recognize copyright laws, the author or authors + * of this software dedicate any and all copyright interest in the + * software to the public domain. We make this dedication for the benefit + * of the public at large and to the detriment of our heirs and + * successors. We intend this dedication to be an overt act of + * relinquishment in perpetuity of all present and future rights to this + * software under copyright law. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR + * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +#include "test_harness/test.h" +#include "test_harness/workload/random_generator.h" +#include "test_harness/timestamp_manager.h" + +using namespace test_harness; + +/* + * This test inserts and reads a large quantity of data in bursts, this is intended to simulate a + * mongod instance loading a large amount of data over a long period of time. + */ +class burst_inserts : public test { + public: + burst_inserts(const test_args &args) : test(args) + { + _burst_duration = _config->get_int("burst_duration"); + logger::log_msg(LOG_INFO, "Burst duration set to: " + std::to_string(_burst_duration)); + } + + /* + * Insert operation that inserts continuously for insert_duration with no throttling. It then + * sleeps for op_rate. + */ + void + insert_operation(thread_context *tc) override final + { + logger::log_msg( + LOG_INFO, type_string(tc->type) + " thread {" + std::to_string(tc->id) + "} commencing."); + + /* Helper struct which stores a pointer to a collection and a cursor associated with it. */ + struct collection_cursor { + collection_cursor( + collection &coll, scoped_cursor &&write_cursor, scoped_cursor &&read_cursor) + : coll(coll), write_cursor(std::move(write_cursor)), + read_cursor(std::move(read_cursor)) + { + } + collection &coll; + scoped_cursor read_cursor; + scoped_cursor write_cursor; + }; + + /* Collection cursor vector. */ + std::vector ccv; + uint64_t collection_count = tc->db.get_collection_count(); + uint64_t collections_per_thread = collection_count / tc->thread_count; + /* Must have unique collections for each thread. */ + testutil_assert(collection_count % tc->thread_count == 0); + int thread_offset = tc->id * collections_per_thread; + for (int i = thread_offset; i < thread_offset + collections_per_thread && tc->running(); + ++i) { + collection &coll = tc->db.get_collection(i); + /* + * Create a reading cursor that will read random documents for every next call. This + * will help generate cache pressure. + */ + ccv.push_back({coll, std::move(tc->session.open_scoped_cursor(coll.name.c_str())), + std::move(tc->session.open_scoped_cursor(coll.name.c_str(), "next_random=true"))}); + } + + uint64_t counter = 0; + while (tc->running()) { + uint64_t start_key = ccv[counter].coll.get_key_count(); + uint64_t added_count = 0; + bool committed = true; + auto &cc = ccv[counter]; + auto burst_start = std::chrono::system_clock::now(); + while (tc->running() && + std::chrono::system_clock::now() - burst_start < + std::chrono::seconds(_burst_duration)) { + tc->transaction.try_begin(); + cc.write_cursor->set_key( + cc.write_cursor.get(), tc->key_to_string(start_key + added_count).c_str()); + cc.write_cursor->search(cc.write_cursor.get()); + + /* A return value of true implies the insert was successful. */ + if (!tc->insert(cc.write_cursor, cc.coll.id, start_key + added_count)) { + tc->transaction.rollback(); + added_count = 0; + continue; + } + added_count++; + + /* Walk our random reader intended to generate cache pressure. */ + int ret = 0; + if ((ret = cc.read_cursor->next(cc.read_cursor.get())) != 0) { + if (ret == WT_NOTFOUND) { + cc.read_cursor->reset(cc.read_cursor.get()); + } else if (ret == WT_ROLLBACK) { + tc->transaction.rollback(); + added_count = 0; + continue; + } else { + testutil_die(ret, "Unhandled error in cursor->next()"); + } + } + + if (tc->transaction.can_commit()) { + if (tc->transaction.commit()) { + cc.coll.increase_key_count(added_count); + start_key = cc.coll.get_key_count(); + } + added_count = 0; + } + + /* Sleep as currently this loop is too fast. */ + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + /* Close out our current txn. */ + if (tc->transaction.active()) { + if (tc->transaction.commit()) { + logger::log_msg(LOG_TRACE, + "Committed an insertion of " + std::to_string(added_count) + " keys."); + cc.coll.increase_key_count(added_count); + start_key = cc.coll.get_key_count(); + } + added_count = 0; + } + + testutil_check(cc.write_cursor->reset(cc.write_cursor.get())); + testutil_check(cc.read_cursor->reset(cc.read_cursor.get())); + counter++; + if (counter == collections_per_thread) + counter = 0; + testutil_assert(counter < collections_per_thread); + tc->sleep(); + } + /* Make sure the last transaction is rolled back now the work is finished. */ + if (tc->transaction.active()) + tc->transaction.rollback(); + } + + private: + int _burst_duration = 0; +}; diff --git a/src/third_party/wiredtiger/test/cppsuite/tests/run.cxx b/src/third_party/wiredtiger/test/cppsuite/tests/run.cxx index 4e436ff7af3..0c231e6568d 100755 --- a/src/third_party/wiredtiger/test/cppsuite/tests/run.cxx +++ b/src/third_party/wiredtiger/test/cppsuite/tests/run.cxx @@ -35,6 +35,7 @@ #include "base_test.cxx" #include "example_test.cxx" +#include "burst_inserts.cxx" #include "hs_cleanup.cxx" std::string @@ -114,6 +115,8 @@ run_test(const std::string &test_name, const std::string &config, const std::str example_test(test_harness::test_args{config, test_name, wt_open_config}).run(); else if (test_name == "hs_cleanup") hs_cleanup(test_harness::test_args{config, test_name, wt_open_config}).run(); + else if (test_name == "burst_inserts") + burst_inserts(test_harness::test_args{config, test_name, wt_open_config}).run(); else { test_harness::logger::log_msg(LOG_ERROR, "Test not found: " + test_name); error_code = -1; @@ -136,7 +139,8 @@ main(int argc, char *argv[]) { std::string cfg, config_filename, current_cfg, current_test_name, test_name, wt_open_config; int64_t error_code = 0; - const std::vector all_tests = {"example_test", "hs_cleanup", "base_test"}; + const std::vector all_tests = { + "example_test", "burst_inserts", "hs_cleanup", "base_test"}; /* Set the program name for error messages. */ (void)testutil_set_progname(argv); -- cgit v1.2.1