summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/bench
diff options
context:
space:
mode:
authorLuke Chen <luke.chen@mongodb.com>2020-04-09 19:33:58 +1000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-09 09:53:50 +0000
commit86339cd9b6d38ac3687b1fd6123b9e531c700f50 (patch)
tree86496fe416f9bf22d80c2d5e07573ae0d2ba1af6 /src/third_party/wiredtiger/bench
parent373feb6cc7cf0731b4a7659d4890110644c788f8 (diff)
downloadmongo-86339cd9b6d38ac3687b1fd6123b9e531c700f50.tar.gz
Import wiredtiger: da6c25fee0c5c9b0376df0dc19caa40a553cc5a2 from branch mongodb-4.4r4.4.0-rc0
ref: 5b5d798856..da6c25fee0 for: 4.4.0-rc0 WT-5357 Document WT_SESSION.truncate of a log cursor WT-5527 Free the onpage updates when restoring update chains WT-5675 Prepare support with durable history: workgen changes WT-5778 Simplify __rec_append_orig_value WT-5796 Speed up verification of file's associated history store information WT-5811 Mark ASAN tests as running on a slow machine WT-5909 Coverity unused value issue WT-5930 (4.4-only) Journaled data from 4.2 binaries not recovered at startup WT-5932 Rollback to stable aborting on-disk updates for in-memory database WT-5938 Fix a memory corruption in rollback to stable WT-5941 Document how eviction works in durable history WT-5943 Consider WT_TS_NONE timestamped update also a valid update for rollback to stable WT-5944 Relax test_wt2853_perf performance parameters temporarily WT-5948 Search shouldn't ignore globally visible tombstone of history store WT-5952 Fix freeing updates racing with application threads WT-5953 Fix format CONFIG file "mmap_all" syntax incompatibility WT-5961 Respect write generations when constructing the root addr WT-5965 Ignore leading whitespace in format CONFIG files WT-5967 Remove named snapshot support WT-5978 Unstable prepared updates causing the update chain to be unnecessarily restored WT-5979 Add Evergreen release compatibility test for forward compatibility WT-5983 Remove diagnostic tests left over from WT-5043 WT-5985 Turn off format testing for relaxed isolation levels WT-5987 Turn off history store verification in the WT_SESSION.verify method WT-5988 Do not write log records as part recovery rollback to stable operation WT-5993 Restore disk page header version WT-5994 Turn off format testing for relaxed isolation levels (part 2)
Diffstat (limited to 'src/third_party/wiredtiger/bench')
-rw-r--r--src/third_party/wiredtiger/bench/workgen/runner/example_prepare.py78
-rwxr-xr-xsrc/third_party/wiredtiger/bench/workgen/runner/runner/core.py4
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen.cxx172
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen.h19
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen_int.h31
5 files changed, 267 insertions, 37 deletions
diff --git a/src/third_party/wiredtiger/bench/workgen/runner/example_prepare.py b/src/third_party/wiredtiger/bench/workgen/runner/example_prepare.py
new file mode 100644
index 00000000000..4520f2cb787
--- /dev/null
+++ b/src/third_party/wiredtiger/bench/workgen/runner/example_prepare.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2020 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+
+from runner import *
+from wiredtiger import *
+from workgen import *
+
+conn = wiredtiger_open("WT_TEST", "create,cache_size=500MB")
+s = conn.open_session()
+tname = "table:test"
+config = "key_format=S,value_format=S,"
+s.create(tname, config)
+table = Table(tname)
+table.options.key_size = 20
+table.options.value_size = 10
+
+context = Context()
+op = Operation(Operation.OP_INSERT, table)
+thread = Thread(op * 5000)
+pop_workload = Workload(context, thread)
+print('populate:')
+pop_workload.run(conn)
+
+opread = Operation(Operation.OP_SEARCH, table)
+read_txn = txn(opread * 10, 'read_timestamp')
+# read_timestamp_lag is the lag to the read_timestamp from current time
+read_txn.transaction.read_timestamp_lag = 5
+treader = Thread(read_txn)
+
+opwrite = Operation(Operation.OP_INSERT, table)
+write_txn = txn(opwrite * 10, 'isolation=snapshot')
+# use_prepare_timestamp - Commit the transaction with stable_timestamp.
+write_txn.transaction.use_prepare_timestamp = True
+twriter = Thread(write_txn)
+
+opupdate = Operation(Operation.OP_UPDATE, table)
+update_txn = txn(opupdate * 10, 'isolation=snapshot')
+# use_commit_timestamp - Commit the transaction with commit_timestamp.
+update_txn.transaction.use_commit_timestamp = True
+tupdate = Thread(update_txn)
+
+workload = Workload(context, 10 * twriter + 10 * tupdate + 10 * treader)
+workload.options.run_time = 50
+workload.options.report_interval=500
+# read_timestamp_lag - Number of seconds lag to the oldest_timestamp from current time.
+workload.options.oldest_timestamp_lag=30
+# read_timestamp_lag - Number of seconds lag to the stable_timestamp from current time.
+workload.options.stable_timestamp_lag=10
+# timestamp_advance is the number of seconds to wait before moving oldest and stable timestamp.
+workload.options.timestamp_advance=1
+print('transactional prepare workload:')
+workload.run(conn)
diff --git a/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py b/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py
index ae3de8efa64..158a65d1fbd 100755
--- a/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py
+++ b/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py
@@ -35,7 +35,7 @@ from workgen import Key, Operation, OpList, Table, Transaction, Value
# Put the operation (and any suboperations) within a transaction.
def txn(op, config=None):
t = Transaction(config)
- op._transaction = t
+ op.transaction = t
return op
# sleep --
@@ -301,7 +301,7 @@ def _op_transaction_list(oplist, txn_config):
def op_group_transaction(ops_arg, ops_per_txn, txn_config):
if ops_arg != Operation.OP_NONE:
return txn(ops_arg, txn_config)
- if ops_arg._transaction != None:
+ if ops_arg.transaction != None:
raise Exception('nested transactions not supported')
if ops_arg._repeatgroup != None:
raise Exception('grouping transactions with multipliers not supported')
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.cxx b/src/third_party/wiredtiger/bench/workgen/workgen.cxx
index 2347cf9d6b5..ca0cd4b308d 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen.cxx
+++ b/src/third_party/wiredtiger/bench/workgen/workgen.cxx
@@ -35,7 +35,6 @@
#include "wiredtiger.h"
#include "workgen.h"
#include "workgen_int.h"
-#include "workgen_time.h"
extern "C" {
// Include some specific WT files, as some files included by wt_internal.h
// have some C-ism's that don't work in C++.
@@ -43,12 +42,11 @@ extern "C" {
#include <string.h>
#include <stdint.h>
#include <stdlib.h>
-#include <unistd.h>
#include <errno.h>
-#include <math.h>
#include "error.h"
#include "misc.h"
}
+#define BUF_SIZE 100
#define LATENCY_US_BUCKETS 1000
#define LATENCY_MS_BUCKETS 1000
@@ -102,6 +100,12 @@ extern "C" {
namespace workgen {
+
+struct WorkloadRunnerConnection {
+ WorkloadRunner *runner;
+ WT_CONNECTION *connection;
+};
+
// The number of contexts. Normally there is one context created, but it will
// be possible to use several eventually. More than one is not yet
// implemented, but we must at least guard against the caller creating more
@@ -118,6 +122,48 @@ static void *thread_runner_main(void *arg) {
return (NULL);
}
+static void *thread_workload(void *arg) {
+
+ WorkloadRunnerConnection *runnerConnection = (WorkloadRunnerConnection *) arg;
+ WorkloadRunner *runner = runnerConnection->runner;
+ WT_CONNECTION *connection = runnerConnection->connection;
+
+ try {
+ runner->increment_timestamp(connection);
+ } catch (WorkgenException &wge) {
+ std::cerr << "Exception while incrementing timestamp." << std::endl;
+ }
+
+ return (NULL);
+}
+
+/*
+ * This function will sleep for "timestamp_advance" seconds, increment and set oldest_timestamp,
+ * stable_timestamp with the specified lag until stopping is set to true
+ */
+int WorkloadRunner::increment_timestamp(WT_CONNECTION *conn) {
+ char buf[BUF_SIZE];
+ uint64_t time_us;
+
+ while (!stopping)
+ {
+ if (_workload->options.oldest_timestamp_lag > 0) {
+ time_us = WorkgenTimeStamp::get_timestamp_lag(_workload->options.oldest_timestamp_lag);
+ sprintf(buf, "oldest_timestamp=%" PRIu64, time_us);
+ conn->set_timestamp(conn, buf);
+ }
+
+ if (_workload->options.stable_timestamp_lag > 0) {
+ time_us = WorkgenTimeStamp::get_timestamp_lag(_workload->options.stable_timestamp_lag);
+ sprintf(buf, "stable_timestamp=%" PRIu64, time_us);
+ conn->set_timestamp(conn, buf);
+ }
+
+ WorkgenTimeStamp::sleep(_workload->options.timestamp_advance);
+ }
+ return 0;
+}
+
static void *monitor_main(void *arg) {
Monitor *monitor = (Monitor *)arg;
try {
@@ -715,6 +761,9 @@ int ThreadRunner::op_run(Operation *op) {
uint64_t recno;
uint64_t range;
bool measure_latency, own_cursor, retry_op;
+ timespec start_time;
+ uint64_t time_us;
+ char buf[BUF_SIZE];
track = NULL;
cursor = NULL;
@@ -795,6 +844,7 @@ int ThreadRunner::op_run(Operation *op) {
timespec start;
if (measure_latency)
workgen_epoch(&start);
+
// Whether or not we are measuring latency, we track how many operations
// are in progress, or that complete.
if (track != NULL)
@@ -814,11 +864,22 @@ int ThreadRunner::op_run(Operation *op) {
}
// Retry on rollback until success.
while (retry_op) {
- if (op->_transaction != NULL) {
+ if (op->transaction != NULL) {
if (_in_transaction)
THROW("nested transactions not supported");
- WT_ERR(_session->begin_transaction(_session,
- op->_transaction->_begin_config.c_str()));
+ if (op->transaction->use_commit_timestamp && op->transaction->use_prepare_timestamp)
+ {
+ THROW("Either use_prepare_timestamp or use_commit_timestamp must be set.");
+ }
+ if (op->transaction->read_timestamp_lag > 0) {
+ uint64_t read = WorkgenTimeStamp::get_timestamp_lag(op->transaction->read_timestamp_lag);
+ sprintf(buf, "%s=%" PRIu64, op->transaction->_begin_config.c_str(), read);
+ }
+ else {
+ sprintf(buf, "%s", op->transaction->_begin_config.c_str());
+ }
+ WT_ERR(_session->begin_transaction(_session, buf));
+
_in_transaction = true;
}
if (op->is_table_op()) {
@@ -899,12 +960,28 @@ int ThreadRunner::op_run(Operation *op) {
err:
if (own_cursor)
WT_TRET(cursor->close(cursor));
- if (op->_transaction != NULL) {
- if (ret != 0 || op->_transaction->_rollback)
+ if (op->transaction != NULL) {
+ if (ret != 0 || op->transaction->_rollback)
WT_TRET(_session->rollback_transaction(_session, NULL));
- else if (_in_transaction)
- ret = _session->commit_transaction(_session,
- op->_transaction->_commit_config.c_str());
+ else if (_in_transaction) {
+ // Set prepare, commit and durable timestamp if prepare is set.
+ if (op->transaction->use_prepare_timestamp) {
+ time_us = WorkgenTimeStamp::get_timestamp();
+ sprintf(buf, "prepare_timestamp=%" PRIu64, time_us);
+ ret = _session->prepare_transaction(_session, buf);
+ sprintf(buf, "commit_timestamp=%" PRIu64 ",durable_timestamp=%" PRIu64, time_us, time_us);
+ ret = _session->commit_transaction(_session, buf);
+ }
+ else if (op->transaction->use_commit_timestamp) {
+ uint64_t commit_time_us = WorkgenTimeStamp::get_timestamp();
+ sprintf(buf, "commit_timestamp=%" PRIu64, commit_time_us);
+ ret = _session->commit_transaction(_session, buf);
+ }
+ else {
+ ret = _session->commit_transaction(_session,
+ op->transaction->_commit_config.c_str());
+ }
+ }
_in_transaction = false;
}
return (ret);
@@ -1077,27 +1154,27 @@ void Thread::describe(std::ostream &os) const {
Operation::Operation() :
_optype(OP_NONE), _internal(NULL), _table(), _key(), _value(), _config(),
- _transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
+ transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
init_internal(NULL);
}
Operation::Operation(OpType optype, Table table, Key key, Value value) :
_optype(optype), _internal(NULL), _table(table), _key(key), _value(value),
- _config(), _transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
+ _config(), transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
init_internal(NULL);
size_check();
}
Operation::Operation(OpType optype, Table table, Key key) :
_optype(optype), _internal(NULL), _table(table), _key(key), _value(),
- _config(), _transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
+ _config(), transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
init_internal(NULL);
size_check();
}
Operation::Operation(OpType optype, Table table) :
_optype(optype), _internal(NULL), _table(table), _key(), _value(),
- _config(), _transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
+ _config(), transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
init_internal(NULL);
size_check();
}
@@ -1105,22 +1182,22 @@ Operation::Operation(OpType optype, Table table) :
Operation::Operation(const Operation &other) :
_optype(other._optype), _internal(NULL), _table(other._table),
_key(other._key), _value(other._value), _config(other._config),
- _transaction(other._transaction), _group(other._group),
+ transaction(other.transaction), _group(other._group),
_repeatgroup(other._repeatgroup), _timed(other._timed) {
- // Creation and destruction of _group and _transaction is managed
+ // Creation and destruction of _group and transaction is managed
// by Python.
init_internal(other._internal);
}
Operation::Operation(OpType optype, const char *config) :
_optype(optype), _internal(NULL), _table(), _key(), _value(),
- _config(config), _transaction(NULL), _group(NULL), _repeatgroup(0),
+ _config(config), transaction(NULL), _group(NULL), _repeatgroup(0),
_timed(0.0) {
init_internal(NULL);
}
Operation::~Operation() {
- // Creation and destruction of _group, _transaction is managed by Python.
+ // Creation and destruction of _group, transaction is managed by Python.
delete _internal;
}
@@ -1129,7 +1206,7 @@ Operation& Operation::operator=(const Operation &other) {
_table = other._table;
_key = other._key;
_value = other._value;
- _transaction = other._transaction;
+ transaction = other.transaction;
_group = other._group;
_repeatgroup = other._repeatgroup;
_timed = other._timed;
@@ -1184,7 +1261,7 @@ void Operation::init_internal(OperationInternal *other) {
bool Operation::combinable() const {
return (_group != NULL && _repeatgroup == 1 && _timed == 0.0 &&
- _transaction == NULL && _config == "");
+ transaction == NULL && _config == "");
}
void Operation::create_all() {
@@ -1203,9 +1280,9 @@ void Operation::describe(std::ostream &os) const {
}
if (!_config.empty())
os << ", '" << _config << "'";
- if (_transaction != NULL) {
+ if (transaction != NULL) {
os << ", [";
- _transaction->describe(os);
+ transaction->describe(os);
os << "]";
}
if (_timed != 0.0)
@@ -1439,7 +1516,7 @@ int SleepOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
uint64_t SleepOperationInternal::sync_time_us() const
{
- return (secs_us(_sleepvalue));
+ return (secs_us(_sleepvalue));
}
void TableOperationInternal::parse_config(const std::string &config)
@@ -1857,7 +1934,8 @@ TableInternal::~TableInternal() {}
WorkloadOptions::WorkloadOptions() : max_latency(0),
report_file("workload.stat"), report_interval(0), run_time(0),
sample_file("monitor.json"), sample_interval_ms(0), sample_rate(1),
- warmup(0), _options() {
+ warmup(0), oldest_timestamp_lag(0.0), stable_timestamp_lag(0.0),
+ timestamp_advance(0.0), _options() {
_options.add_int("max_latency", max_latency,
"prints warning if any latency measured exceeds this number of "
"milliseconds. Requires sample_interval to be configured.");
@@ -1881,6 +1959,13 @@ WorkloadOptions::WorkloadOptions() : max_latency(0),
"2 for every second operation, 3 for every third operation etc.");
_options.add_int("warmup", warmup,
"how long to run the workload phase before starting measurements");
+ _options.add_double("oldest_timestamp_lag", oldest_timestamp_lag,
+ "how much lag to the oldest timestamp from epoch time");
+ _options.add_double("stable_timestamp_lag", stable_timestamp_lag,
+ "how much lag to the oldest timestamp from epoch time");
+ _options.add_double("timestamp_advance", timestamp_advance,
+ "how many seconds to wait before moving oldest and stable"
+ "timestamp forward");
}
WorkloadOptions::WorkloadOptions(const WorkloadOptions &other) :
@@ -1917,13 +2002,12 @@ Workload& Workload::operator=(const Workload &other) {
int Workload::run(WT_CONNECTION *conn) {
WorkloadRunner runner(this);
-
return (runner.run(conn));
}
WorkloadRunner::WorkloadRunner(Workload *workload) :
_workload(workload), _trunners(workload->_threads.size()),
- _report_out(&std::cout), _start() {
+ _report_out(&std::cout), _start(), stopping(false) {
ts_clear(_start);
}
WorkloadRunner::~WorkloadRunner() {}
@@ -1934,6 +2018,9 @@ int WorkloadRunner::run(WT_CONNECTION *conn) {
std::ofstream report_out;
_wt_home = conn->get_home(conn);
+
+ if ( (options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) && options->timestamp_advance < 0 )
+ THROW("Workload.options.timestamp_advance must be positive if either Workload.options.oldest_timestamp_lag or Workload.options.stable_timestamp_lag is set");
if (options->sample_interval_ms > 0 && options->sample_rate <= 0)
THROW("Workload.options.sample_rate must be positive");
if (!options->report_file.empty()) {
@@ -1944,7 +2031,7 @@ int WorkloadRunner::run(WT_CONNECTION *conn) {
WT_ERR(create_all(conn, _workload->_context));
WT_ERR(open_all());
WT_ERR(ThreadRunner::cross_check(_trunners));
- WT_ERR(run_all());
+ WT_ERR(run_all(conn));
err:
//TODO: (void)close_all();
_report_out = &std::cout;
@@ -2031,16 +2118,18 @@ void WorkloadRunner::final_report(timespec &totalsecs) {
out << "Run completed: " << totalsecs << " seconds" << std::endl;
}
-int WorkloadRunner::run_all() {
+int WorkloadRunner::run_all(WT_CONNECTION *conn) {
void *status;
std::vector<pthread_t> thread_handles;
Stats counts(false);
WorkgenException *exception;
WorkloadOptions *options = &_workload->options;
+ WorkloadRunnerConnection *runnerConnection;
Monitor monitor(*this);
std::ofstream monitor_out;
std::ofstream monitor_json;
std::ostream &out = *_report_out;
+ pthread_t time_thandle;
WT_DECL_RET;
for (size_t i = 0; i < _trunners.size(); i++)
@@ -2086,6 +2175,22 @@ int WorkloadRunner::run_all() {
thread_handles.push_back(thandle);
}
+ // Start Timestamp increment thread
+ if (options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) {
+
+ runnerConnection = new WorkloadRunnerConnection();
+ runnerConnection->runner = this;
+ runnerConnection->connection = conn;
+
+ if ((ret = pthread_create(&time_thandle, NULL, thread_workload,
+ runnerConnection)) != 0) {
+ std::cerr << "pthread_create failed err=" << ret << std::endl;
+ std::cerr << "Stopping Time threads." << std::endl;
+ (void)pthread_join(time_thandle, &status);
+ delete runnerConnection;
+ }
+ }
+
// Treat warmup separately from report interval so that if we have a
// warmup period we clear and ignore stats after it ends.
if (options->warmup != 0)
@@ -2132,6 +2237,9 @@ int WorkloadRunner::run_all() {
_trunners[i]._stop = true;
if (options->sample_interval_ms > 0)
monitor._stop = true;
+ if (options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) {
+ stopping = true;
+ }
// wait for all threads
exception = NULL;
@@ -2146,6 +2254,12 @@ int WorkloadRunner::run_all() {
exception = &_trunners[i]._exception;
}
+ // Wait for the time increment thread
+ if (options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) {
+ WT_TRET(pthread_join(time_thandle, &status));
+ delete runnerConnection;
+ }
+
workgen_epoch(&now);
if (options->sample_interval_ms > 0) {
WT_TRET(pthread_join(monitor._handle, &status));
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.h b/src/third_party/wiredtiger/bench/workgen/workgen.h
index 382ca65dcfc..b963cf3d47e 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen.h
+++ b/src/third_party/wiredtiger/bench/workgen/workgen.h
@@ -292,7 +292,7 @@ struct Operation {
Key _key;
Value _value;
std::string _config;
- Transaction *_transaction;
+ Transaction *transaction;
std::vector<Operation> *_group;
int _repeatgroup;
double _timed;
@@ -386,11 +386,15 @@ struct Thread {
struct Transaction {
bool _rollback;
+ bool use_commit_timestamp;
+ bool use_prepare_timestamp;
std::string _begin_config;
std::string _commit_config;
+ double read_timestamp_lag;
- Transaction(const char *_config = NULL) : _rollback(false),
- _begin_config(_config == NULL ? "" : _config), _commit_config() {}
+ Transaction(const char *_config = NULL) : _rollback(false), use_commit_timestamp(false), use_prepare_timestamp(false), _begin_config(_config == NULL ? "" : _config), _commit_config(),
+ read_timestamp_lag(0.0)
+ {}
void describe(std::ostream &os) const {
os << "Transaction: ";
@@ -399,6 +403,12 @@ struct Transaction {
os << "begin_config: " << _begin_config;
if (!_commit_config.empty())
os << ", commit_config: " << _commit_config;
+ if (use_commit_timestamp)
+ os << "(use_commit_timestamp) ";
+ if (use_prepare_timestamp)
+ os << "(use_prepare_timestamp) ";
+ if (read_timestamp_lag)
+ os << "(read_timestamp_lag)";
}
};
@@ -414,6 +424,9 @@ struct WorkloadOptions {
int sample_rate;
std::string sample_file;
int warmup;
+ double oldest_timestamp_lag;
+ double stable_timestamp_lag;
+ double timestamp_advance;
WorkloadOptions();
WorkloadOptions(const WorkloadOptions &other);
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen_int.h b/src/third_party/wiredtiger/bench/workgen/workgen_int.h
index ca93e5c2733..d5ed99c8c53 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen_int.h
+++ b/src/third_party/wiredtiger/bench/workgen/workgen_int.h
@@ -30,11 +30,12 @@
#include <vector>
#include <map>
#include <set>
-#ifndef SWIG
extern "C" {
+#include <unistd.h>
#include "workgen_func.h"
+#include <math.h>
}
-#endif
+#include "workgen_time.h"
namespace workgen {
@@ -46,6 +47,28 @@ typedef uint32_t tint_t;
struct ThreadRunner;
struct WorkloadRunner;
+struct WorkgenTimeStamp {
+ WorkgenTimeStamp() {}
+
+ static uint64_t get_timestamp_lag(double seconds) {
+ timespec start_time;
+ workgen_epoch(&start_time);
+
+ return (ts_us(start_time) - secs_us(seconds));
+ }
+
+ static void sleep(double seconds) {
+ usleep(ceil(secs_us(seconds)));
+ }
+
+ static uint64_t get_timestamp() {
+ timespec start_time;
+ workgen_epoch(&start_time);
+
+ return (ts_us(start_time));
+ }
+};
+
// A exception generated by the workgen classes. Methods generally return an
// int errno, so this is useful primarily for notifying the caller about
// failures in constructors.
@@ -250,10 +273,12 @@ struct WorkloadRunner {
std::ostream *_report_out;
std::string _wt_home;
timespec _start;
+ bool stopping;
WorkloadRunner(Workload *);
~WorkloadRunner();
int run(WT_CONNECTION *conn);
+ int increment_timestamp(WT_CONNECTION *conn);
private:
int close_all();
@@ -263,7 +288,7 @@ private:
int open_all();
void open_report_file(std::ofstream &, const char *, const char *);
void report(time_t, time_t, Stats *stats);
- int run_all();
+ int run_all(WT_CONNECTION *conn);
WorkloadRunner(const WorkloadRunner &); // disallowed
WorkloadRunner& operator=(const WorkloadRunner &other); // disallowed