diff options
Diffstat (limited to 'src/third_party/wiredtiger/bench')
9 files changed, 299 insertions, 32 deletions
diff --git a/src/third_party/wiredtiger/bench/workgen/runner/example_prepare.py b/src/third_party/wiredtiger/bench/workgen/runner/example_prepare.py index fe4bd4fa491..555a695a719 100644 --- a/src/third_party/wiredtiger/bench/workgen/runner/example_prepare.py +++ b/src/third_party/wiredtiger/bench/workgen/runner/example_prepare.py @@ -51,24 +51,28 @@ print('populate:') pop_workload.run(conn) opread = Operation(Operation.OP_SEARCH, table) -read_txn = txn(opread, 'read_timestamp') +read_txn = txn(opread * 5, 'read_timestamp') # read_timestamp_lag is the lag to the read_timestamp from current time read_txn.transaction.read_timestamp_lag = 2 treader = Thread(read_txn) opwrite = Operation(Operation.OP_INSERT, table) -write_txn = txn(opwrite * 10, 'isolation=snapshot') +write_txn = txn(opwrite * 5, 'isolation=snapshot') # use_prepare_timestamp - Commit the transaction with stable_timestamp. write_txn.transaction.use_prepare_timestamp = True twriter = Thread(write_txn) +# Thread.options.session_config - Session configuration. +twriter.options.session_config="isolation=snapshot" opupdate = Operation(Operation.OP_UPDATE, table) -update_txn = txn(opupdate, 'isolation=snapshot') +update_txn = txn(opupdate * 5, 'isolation=snapshot') # use_commit_timestamp - Commit the transaction with commit_timestamp. update_txn.transaction.use_commit_timestamp = True tupdate = Thread(update_txn) +# Thread.options.session_config - Session configuration. +tupdate.options.session_config="isolation=snapshot" -workload = Workload(context, 30 * twriter + 20 * tupdate + 10 * treader) +workload = Workload(context, 30 * twriter + 30 * tupdate + 30 * treader) workload.options.run_time = 50 workload.options.report_interval=500 # read_timestamp_lag - Number of seconds lag to the oldest_timestamp from current time. @@ -85,6 +89,6 @@ run_time = end_time - start_time print('Workload took %d minutes' %(run_time//60)) -latency_filename = homedir + "/latency.out" +latency_filename = os.path.join(context.args.home, "latency.out") latency.workload_latency(workload, latency_filename) conn.close() diff --git a/src/third_party/wiredtiger/bench/workgen/runner/prepare_stress.py b/src/third_party/wiredtiger/bench/workgen/runner/prepare_stress.py new file mode 100755 index 00000000000..a79edf71af8 --- /dev/null +++ b/src/third_party/wiredtiger/bench/workgen/runner/prepare_stress.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2020 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +# This file is derived from evict-btree-hs.py, see that file for its purpose and +# derivation. This benchmark is designed to stress the cache effects of prepared transactions, +# in particular eviction of data in general and prepared transactions in particular. +# These are the ways that this workload differs from evict-btree-hs.py: +# - Insert operations use a prepare call, and commit with a durable timestamp +# (see use_prepare_timestamp) +# - Update operations commit with a commit timestamp (see_commit_timestamp). +# - Read transactions use a "read_timestamp" that lags the current time (see read_timestamp_lag). +# - The system-wide "oldest_timestamp" and "stable_timestamp" advance periodically, but lag the +# current time (see oldest_timestamp_lag and stable_timestamp_lag and timestamp_advance). +# - Sessions and transactions use snapshot isolation. + +################################################################################################### +# These wtperf constants were used to originally generate this python file, which has been since +# edited. The table_count, icount, and other variables have been changed below. +''' +# wtperf options file: evict btree configuration +conn_config="cache_size=40G,checkpoint=(wait=60,log_size=2GB),eviction=(threads_min=12, +threads_max=12),log=(enabled=true),session_max=600,eviction_target=60,statistics=(fast), +statistics_log=(wait=1,json)" + +# 1B records * (key=12 + value=138) is about 150G total data size +key_sz=12 +value_sz=138 +log_like_table=true +table_config="type=file" +icount=1000000000 +report_interval=5 +run_time=3600 +# Scans every 10 minutes for all the scan specific tables. +# .4B records * (key=12 + value=138) is about 60G total data size for scan +# Running on a machine with 64G physical memory, this exhausts both the +# WT cache and the system cache. +scan_interval=600 +scan_pct=100 +scan_table_count=20 +scan_icount=400000000 +populate_threads=5 +table_count=100 +threads=((count=400,reads=1),(count=20,inserts=1,throttle=500),(count=10,updates=1,throttle=500)) +# Add throughput/latency monitoring +max_latency=50000 +sample_interval=5 +''' +################################################################################################### + +from runner import * +from wiredtiger import * +from workgen import * +import time + +context = Context() +conn_config = "cache_size=1G,checkpoint=(wait=60,log_size=2GB),\ + eviction=(threads_min=12,threads_max=12),log=(enabled=true),session_max=800,\ + debug_mode=(table_logging=true),\ + eviction_target=60,statistics=(fast),statistics_log=(wait=1,json)"# explicitly added +conn = context.wiredtiger_open("create," + conn_config) +s = conn.open_session("") + +wtperf_table_config = "key_format=S,value_format=S," +\ + "exclusive=true,allocation_size=4kb," +\ + "internal_page_max=64kb,leaf_page_max=4kb,split_pct=100," +compress_table_config = "" +table_config = "type=file" +tables = [] +table_count = 1 +for i in range(0, table_count): + tname = "table:test" + str(i) + table = Table(tname) + s.create(tname, wtperf_table_config +\ + compress_table_config + table_config) + table.options.key_size = 200 + table.options.value_size = 5000 + tables.append(table) + +populate_threads = 40 +icount = 500000 + +start_time = time.time() + +# If there are multiple tables to be filled during populate, +# the icount is split between them all. +pop_ops = Operation(Operation.OP_INSERT, tables[0]) +pop_ops = txn(pop_ops, 'isolation=snapshot') +pop_ops = op_multi_table(pop_ops, tables) +nops_per_thread = icount // (populate_threads * table_count) +pop_thread = Thread(pop_ops * nops_per_thread) +pop_thread.options.session_config="isolation=snapshot" +pop_workload = Workload(context, populate_threads * pop_thread) +pop_workload.run(conn) + +# Log like file, requires that logging be enabled in the connection config. +log_name = "table:log" +s.create(log_name, wtperf_table_config + "key_format=S,value_format=S," +\ + compress_table_config + table_config + ",log=(enabled=true)") +log_table = Table(log_name) + +# Read operation with read_timestamp_lag +ops = Operation(Operation.OP_SEARCH, tables[0],Key(Key.KEYGEN_PARETO, 0, ParetoOptions(1))) +ops = txn(ops, 'read_timestamp') +ops.transaction.read_timestamp_lag = 2 +ops = op_multi_table(ops, tables, False) +ops = op_log_like(ops, log_table, 0) +thread0 = Thread(ops) + +# Insert operations with snapshot isolation level and prepare_timestamp. +ops = Operation(Operation.OP_INSERT, tables[0]) +ops = txn(ops, 'isolation=snapshot') +# use_prepare_timestamp - Commit the transaction with prepare, commit and durable timestamp. +ops.transaction.use_prepare_timestamp = True +ops = op_multi_table(ops, tables, False) +ops = op_log_like(ops, log_table, 0) +thread1 = Thread(ops) +# Thread.options.session_config - Session configuration. +thread1.options.session_config="isolation=snapshot" +# These operations include log_like operations, which will increase the number +# of insert/update operations by a factor of 2.0. This may cause the +# actual operations performed to be above the throttle. + +# Insert operations with snapshot isolation level and sets commit timestamp. +ops = Operation(Operation.OP_UPDATE, tables[0]) +ops = txn(ops, 'isolation=snapshot') +# use_commit_timestamp - Commit the transaction with commit_timestamp. +ops.transaction.use_commit_timestamp = True +ops = op_multi_table(ops, tables, False) +ops = op_log_like(ops, log_table, 0) +thread2 = Thread(ops) +# Thread.options.session_config - Session configuration. +thread2.options.session_config="isolation=snapshot" +# These operations include log_like operations, which will increase the number +# of insert/update operations by a factor of 2.0. This may cause the +# actual operations performed to be above the throttle. +thread2.options.throttle=500 +thread2.options.throttle_burst=1.0 + +# Long running transactions. There is a 0.1 second sleep after a series of search and update +# operations. The sleep op is repeated 10000 times and this will make these transcations to at +# least run for ~17 minutes. +search_op = Operation(Operation.OP_SEARCH, tables[0], Key(Key.KEYGEN_PARETO, 0, ParetoOptions(1))) +update_op = Operation(Operation.OP_UPDATE, tables[0], Key(Key.KEYGEN_PARETO, 0, ParetoOptions(1))) +ops = txn(((search_op + update_op) * 1000 + sleep(0.1)) * 10000, 'isolation=snapshot') +ops.transaction.use_commit_timestamp = True +thread3 = Thread(ops) +thread3.options.session_config="isolation=snapshot" + +ops = Operation(Operation.OP_SLEEP, "0.1") + \ + Operation(Operation.OP_LOG_FLUSH, "") +logging_thread = Thread(ops) +logging_thread.options.session_config="isolation=snapshot" + +workload = Workload(context, 50 * thread0 + 50 * thread1 +\ + 10 * thread2 + 100 * thread3 + logging_thread) +workload.options.report_interval=5 +workload.options.run_time=500 +workload.options.max_latency=50000 +# oldest_timestamp_lag - Number of seconds lag to the oldest_timestamp from current time. +workload.options.oldest_timestamp_lag=30 +# stable_timestamp_lag - Number of seconds lag to the stable_timestamp from current time. +workload.options.stable_timestamp_lag=10 +# timestamp_advance is the number of seconds to wait before moving oldest and stable timestamp. +workload.options.timestamp_advance=1 +workload.run(conn) + +end_time = time.time() +run_time = end_time - start_time + +print('Workload took %d minutes' %(run_time//60)) + +latency_filename = os.path.join(context.args.home, "latency.out") +latency.workload_latency(workload, latency_filename) +conn.close() diff --git a/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py b/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py index 158a65d1fbd..5cf875a073c 100755 --- a/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py +++ b/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py @@ -34,7 +34,9 @@ from workgen import Key, Operation, OpList, Table, Transaction, Value # txn -- # Put the operation (and any suboperations) within a transaction. def txn(op, config=None): - t = Transaction(config) + t = Transaction() + if config != None: + t._begin_config = config op.transaction = t return op @@ -159,13 +161,39 @@ def _op_get_group_list(op): result.extend(grouplist) return result +# This function is used by op_copy to modify a "tree" of operations to change the table +# and/or key for each operation to a given value. It operates on the current operation, +# and recursively on any in its groiup list. +def _op_copy_mod(op, table, key): + if op._optype != Operation.OP_NONE: + if table != None: + op._table = table + if key != None: + op._key = key + if op._group != None: + newgroup = [] + for subop in _op_get_group_list(op): + newgroup.append(_op_copy_mod(subop, table, key)) + op._group = OpList(newgroup) + return op + +# This is a convenient function that copies an operation and all its +# "sub-operations", as well as any attached transaction. +def op_copy(src, table=None, key=None): + # Copy constructor does a deep copy, including subordinate + # operations and any attached transaction. + op = Operation(src) + if table != None or key != None: + _op_copy_mod(op, table, key) + return op + def _op_multi_table_as_list(ops_arg, tables, pareto_tables, multiplier): result = [] if ops_arg._optype != Operation.OP_NONE: if pareto_tables <= 0: for table in tables: for i in range(0, multiplier): - result.append(Operation(ops_arg._optype, table, ops_arg._key, ops_arg._value)) + result.append(op_copy(ops_arg, table=table)) else: # Use the multiplier unless the length of the list will be large. # In any case, make sure there's at least a multiplier of 3, to @@ -186,12 +214,21 @@ def _op_multi_table_as_list(ops_arg, tables, pareto_tables, multiplier): key = Key(ops_arg._key) key._pareto.range_low = (1.0 * i)/count key._pareto.range_high = (1.0 * (i + 1))/count - result.append(Operation(ops_arg._optype, table, key, ops_arg._value)) + result.append(op_copy(ops_arg, table=table, key=key)) else: - for op in _op_get_group_list(ops_arg): - for o in _op_multi_table_as_list(op, tables, pareto_tables, \ - multiplier): - result.append(Operation(o)) + copy = op_copy(ops_arg, table=tables[1]) + if ops_arg.transaction == None: + for op in _op_get_group_list(ops_arg): + for o in _op_multi_table_as_list(op, tables, pareto_tables, \ + multiplier): + result.append(Operation(o)) + elif pareto_tables <= 0: + entries = len(tables) * multiplier + for i in range(0, entries): + copy = op_copy(ops_arg, table=tables[i]) + result.append(copy) + else: + raise Exception('(pareto, range partition, transaction) combination not supported') return result # A convenient way to build a list of operations @@ -268,8 +305,14 @@ def _optype_is_write(optype): optype == Operation.OP_REMOVE # Emulate wtperf's log_like option. For all operations, add a second -# insert operation going to a log table. +# insert operation going to a log table. Ops_per_txn is only checked +# for zero vs non-zero, non-zero says don't add new transactions. +# If we have ops_per_txn, wtperf.py ensures that op_group_transactions was previous called +# to insert needed transactions. def op_log_like(op, log_table, ops_per_txn): + if op.transaction != None: + # Any non-zero number indicates that we already have a transaction around this. + ops_per_txn = 1 if op._optype != Operation.OP_NONE: if _optype_is_write(op._optype): op += _op_log_op(op, log_table) @@ -279,10 +322,13 @@ def op_log_like(op, log_table, ops_per_txn): oplist = [] for op2 in _op_get_group_list(op): if op2._optype == Operation.OP_NONE: - oplist.append(op_log_like(op2, log_table)) + oplist.append(op_log_like(op2, log_table, ops_per_txn)) elif ops_per_txn == 0 and _optype_is_write(op2._optype): op2 += _op_log_op(op2, log_table) - oplist.append(txn(op2)) # txn for each action. + if op2.transaction == None: + oplist.append(txn(op2)) # txn for each action. + else: + oplist.append(op2) # already have a txn else: oplist.append(op2) if _optype_is_write(op2._optype): diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.cxx b/src/third_party/wiredtiger/bench/workgen/workgen.cxx index ca0cd4b308d..739b50ebb27 100644 --- a/src/third_party/wiredtiger/bench/workgen/workgen.cxx +++ b/src/third_party/wiredtiger/bench/workgen/workgen.cxx @@ -513,7 +513,7 @@ int ThreadRunner::create_all(WT_CONNECTION *conn) { ASSERT(_session == NULL); if (_thread->options.synchronized) _thread->_op.synchronized_check(); - WT_RET(conn->open_session(conn, NULL, NULL, &_session)); + WT_RET(conn->open_session(conn, NULL, _thread->options.session_config.c_str(), &_session)); _table_usage.clear(); _stats.track_latency(_workload->options.sample_interval_ms > 0); WT_RET(workgen_random_alloc(_session, &_rand_state)); @@ -1096,9 +1096,10 @@ int Throttle::throttle(uint64_t op_count, uint64_t *op_limit) { return (0); } -ThreadOptions::ThreadOptions() : name(), throttle(0.0), throttle_burst(1.0), +ThreadOptions::ThreadOptions() : name(), session_config(), throttle(0.0), throttle_burst(1.0), synchronized(false), _options() { _options.add_string("name", name, "name of the thread"); + _options.add_string("session_config", session_config, "session config which is passed to open_session"); _options.add_double("throttle", throttle, "Limit to this number of operations per second"); _options.add_double("throttle_burst", throttle_burst, @@ -1106,7 +1107,7 @@ ThreadOptions::ThreadOptions() : name(), throttle(0.0), throttle_burst(1.0), "to having large bursts with lulls (10.0 or larger)"); } ThreadOptions::ThreadOptions(const ThreadOptions &other) : - name(other.name), throttle(other.throttle), + name(other.name), session_config(other.session_config), throttle(other.throttle), throttle_burst(other.throttle_burst), synchronized(other.synchronized), _options(other._options) {} ThreadOptions::~ThreadOptions() {} diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.h b/src/third_party/wiredtiger/bench/workgen/workgen.h index b963cf3d47e..61e2b76d5c7 100644 --- a/src/third_party/wiredtiger/bench/workgen/workgen.h +++ b/src/third_party/wiredtiger/bench/workgen/workgen.h @@ -232,7 +232,7 @@ struct ParetoOptions { ~ParetoOptions(); void describe(std::ostream &os) const { - os << "parameter " << param; + os << "Pareto: parameter " << param; if (range_low != 0.0 || range_high != 1.0) { os << "range [" << range_low << "-" << range_high << "]"; } @@ -266,7 +266,12 @@ struct Key { ~Key() {} void describe(std::ostream &os) const { - os << "Key: type " << _keytype << ", size " << _size; } + os << "Key: type " << _keytype << ", size " << _size; + if (_pareto.param != ParetoOptions::DEFAULT.param) { + os << ", "; + _pareto.describe(os); + } + } }; struct Value { @@ -330,6 +335,7 @@ struct Operation { // struct ThreadOptions { std::string name; + std::string session_config; double throttle; double throttle_burst; bool synchronized; @@ -342,6 +348,7 @@ struct ThreadOptions { os << "throttle " << throttle; os << ", throttle_burst " << throttle_burst; os << ", synchronized " << synchronized; + os << ", session_config " << session_config; } std::string help() const { return _options.help(); } @@ -392,23 +399,30 @@ struct Transaction { std::string _commit_config; double read_timestamp_lag; - Transaction(const char *_config = NULL) : _rollback(false), use_commit_timestamp(false), use_prepare_timestamp(false), _begin_config(_config == NULL ? "" : _config), _commit_config(), - read_timestamp_lag(0.0) - {} + Transaction() : _rollback(false), use_commit_timestamp(false), + use_prepare_timestamp(false), _begin_config(""), _commit_config(), read_timestamp_lag(0.0) + {} + + Transaction(const Transaction &other) : _rollback(other._rollback), + use_commit_timestamp(other.use_commit_timestamp), + use_prepare_timestamp(other.use_prepare_timestamp), + _begin_config(other._begin_config), _commit_config(other._commit_config), + read_timestamp_lag(other.read_timestamp_lag) + {} void describe(std::ostream &os) const { os << "Transaction: "; if (_rollback) os << "(rollback) "; + if (use_commit_timestamp) + os << "(use_commit_timestamp) "; + if (use_prepare_timestamp) + os << "(use_prepare_timestamp) "; os << "begin_config: " << _begin_config; if (!_commit_config.empty()) os << ", commit_config: " << _commit_config; - if (use_commit_timestamp) - os << "(use_commit_timestamp) "; - if (use_prepare_timestamp) - os << "(use_prepare_timestamp) "; - if (read_timestamp_lag) - os << "(read_timestamp_lag)"; + if (read_timestamp_lag != 0.0) + os << ", read_timestamp_lag: " << read_timestamp_lag; } }; diff --git a/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-50r50u.wtperf b/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-50r50u.wtperf index 74f835e78ff..a829c7aafa0 100644 --- a/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-50r50u.wtperf +++ b/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-50r50u.wtperf @@ -9,6 +9,7 @@ conn_config="cache_size=16G,checkpoint=(wait=60,log_size=2GB),session_max=20000, create=false compression="snappy" sess_config="isolation=snapshot" +table_config="type=file" table_count=2 # close_conn as false allows this test to close/finish faster, but if running # as the set, the next test will need to run recovery. diff --git a/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-80r20u.wtperf b/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-80r20u.wtperf index 8b56a86e022..f1e912448d0 100644 --- a/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-80r20u.wtperf +++ b/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-80r20u.wtperf @@ -12,6 +12,7 @@ compression="snappy" # as the set, the next test will need to run recovery. close_conn=false sess_config="isolation=snapshot" +table_config="type=file" table_count=2 key_sz=40 value_sz=120 diff --git a/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-rdonly.wtperf b/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-rdonly.wtperf index 8e25334ea07..eada818dec3 100644 --- a/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-rdonly.wtperf +++ b/src/third_party/wiredtiger/bench/wtperf/runners/500m-btree-rdonly.wtperf @@ -9,6 +9,7 @@ conn_config="cache_size=16G,checkpoint=(wait=60,log_size=2GB),session_max=20000, create=false compression="snappy" sess_config="isolation=snapshot" +table_config="type=file" table_count=2 key_sz=40 value_sz=120 diff --git a/src/third_party/wiredtiger/bench/wtperf/wtperf_opt.i b/src/third_party/wiredtiger/bench/wtperf/wtperf_opt.i index cf310d33046..2706558c887 100644 --- a/src/third_party/wiredtiger/bench/wtperf/wtperf_opt.i +++ b/src/third_party/wiredtiger/bench/wtperf/wtperf_opt.i @@ -170,10 +170,10 @@ DEF_OPT_AS_UINT32(scan_table_count, 0, "that tables are shared with other operations") DEF_OPT_AS_CONFIG_STRING(sess_config, "", "session configuration string") DEF_OPT_AS_UINT32(session_count_idle, 0, "number of idle sessions to create. Default 0.") +/* The following table configuration is based on the configuration MongoDB uses for collections. */ DEF_OPT_AS_CONFIG_STRING(table_config, - "key_format=S,value_format=S,type=lsm,exclusive=true," - "allocation_size=4kb,internal_page_max=64kb,leaf_page_max=4kb," - "split_pct=100", + "key_format=S,value_format=S,type=file,exclusive=true," + "leaf_value_max=64MB,memory_page_max=10m,split_pct=90,checksum=on", "table configuration string") DEF_OPT_AS_UINT32(table_count, 1, "number of tables to run operations over. Keys are divided evenly " |