summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/bench/workgen
diff options
context:
space:
mode:
authorLuke Chen <luke.chen@mongodb.com>2018-10-09 14:52:18 -0400
committerLuke Chen <luke.chen@mongodb.com>2018-10-09 14:52:18 -0400
commitfc95450eafb94f080cf6769948358d42770f05ca (patch)
tree614aad734b6854ea9b9c1b2b9835028157c5ccd5 /src/third_party/wiredtiger/bench/workgen
parentaddc24e5dcc8c2053df56697d35162a8776446e5 (diff)
downloadmongo-fc95450eafb94f080cf6769948358d42770f05ca.tar.gz
Import wiredtiger: d235e0e71ef84c3f9d5a870f08feeff9a7c5581e from branch mongodb-4.2
ref: e7d742daa2..d235e0e71e for: 4.1.4 WT-4214 Simplify timestamp handling for timestamp abort test WT-4220 Enable long running prepared support WT-4293 WT_CURSOR.remove can lose a cursor position WT-4297 Enhance steady throughput workload WT-4346 Remove prepared updates from lookaside on reading the page. WT-4351 Ensure resolving prepared transactions use updates from itself
Diffstat (limited to 'src/third_party/wiredtiger/bench/workgen')
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen.cxx423
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen.h42
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen_int.h42
-rwxr-xr-x[-rw-r--r--]src/third_party/wiredtiger/bench/workgen/wtperf.py24
4 files changed, 380 insertions, 151 deletions
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.cxx b/src/third_party/wiredtiger/bench/workgen/workgen.cxx
index 9ae63682f9c..39aacb89dc8 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen.cxx
+++ b/src/third_party/wiredtiger/bench/workgen/workgen.cxx
@@ -257,7 +257,7 @@ int ContextInternal::create_all() {
if (_runtime_alloced < _tint_last) {
// The array references are 1-based, we'll waste one entry.
TableRuntime *new_table_runtime = new TableRuntime[_tint_last + 1];
- for (int i = 0; i < _runtime_alloced; i++)
+ for (uint32_t i = 0; i < _runtime_alloced; i++)
new_table_runtime[i + 1] = _table_runtime[i + 1];
delete _table_runtime;
_table_runtime = new_table_runtime;
@@ -318,12 +318,13 @@ int Monitor::run() {
new_totals.add(tr->_stats, true);
Stats interval(new_totals);
interval.subtract(prev_totals);
- interval.smooth(prev_interval);
int interval_secs = options->sample_interval;
uint64_t cur_reads = interval.read.ops / interval_secs;
uint64_t cur_inserts = interval.insert.ops / interval_secs;
uint64_t cur_updates = interval.update.ops / interval_secs;
+ bool checkpointing = new_totals.checkpoint.ops_in_progress > 0 ||
+ interval.checkpoint.ops > 0;
uint64_t totalsec = ts_sec(t - _wrunner._start);
(*_out) << time_buf
@@ -331,7 +332,7 @@ int Monitor::run() {
<< "," << cur_reads
<< "," << cur_inserts
<< "," << cur_updates
- << "," << 'N' // checkpoint in progress
+ << "," << (checkpointing ? 'Y' : 'N')
<< "," << interval.read.average_latency()
<< "," << interval.read.min_latency
<< "," << interval.read.max_latency
@@ -348,13 +349,22 @@ int Monitor::run() {
(void)strftime(time_buf, sizeof(time_buf),
WORKGEN_TIMESTAMP_JSON, tm);
-#define TRACK_JSON(name, t) \
- "\"" << (name) << "\":{" \
- << "\"ops per sec\":" << ((t).ops / interval_secs) \
- << ",\"average latency\":" << (t).average_latency() \
- << ",\"min latency\":" << (t).min_latency \
- << ",\"max latency\":" << (t).max_latency \
- << "}"
+ // Note: we could allow this to be configurable.
+ int percentiles[4] = {50, 95, 99, 0};
+
+#define TRACK_JSON(f, name, t, percentiles, extra) \
+ do { \
+ int _i; \
+ (f) << "\"" << (name) << "\":{" << extra \
+ << "\"ops per sec\":" << ((t).ops / interval_secs) \
+ << ",\"average latency\":" << (t).average_latency() \
+ << ",\"min latency\":" << (t).min_latency \
+ << ",\"max latency\":" << (t).max_latency; \
+ for (_i = 0; (percentiles)[_i] != 0; _i++) \
+ (f) << ",\"" << (percentiles)[_i] << "% latency\":" \
+ << (t).percentile_latency(percentiles[_i]); \
+ (f) << "}"; \
+ } while(0)
(*_json) << "{";
if (first) {
@@ -362,11 +372,16 @@ int Monitor::run() {
first = false;
}
(*_json) << "\"localTime\":\"" << time_buf
- << "\",\"workgen\":{"
- << TRACK_JSON("read", interval.read) << ","
- << TRACK_JSON("insert", interval.insert) << ","
- << TRACK_JSON("update", interval.update)
- << "}}" << std::endl;
+ << "\",\"workgen\":{";
+ TRACK_JSON(*_json, "read", interval.read, percentiles, "");
+ (*_json) << ",";
+ TRACK_JSON(*_json, "insert", interval.insert, percentiles, "");
+ (*_json) << ",";
+ TRACK_JSON(*_json, "update", interval.update, percentiles, "");
+ (*_json) << ",";
+ TRACK_JSON(*_json, "checkpoint", interval.checkpoint, percentiles,
+ "\"active\":" << (checkpointing ? "1," : "0,"));
+ (*_json) << "}}" << std::endl;
}
uint64_t read_max = interval.read.max_latency;
@@ -553,7 +568,7 @@ void ThreadRunner::op_create_all(Operation *op, size_t &keysize,
tint_t tint;
op->create_all();
- if (op->_optype != Operation::OP_NONE) {
+ if (op->is_table_op()) {
op->kv_compute_max(true, false);
if (OP_HAS_VALUE(op))
op->kv_compute_max(false, op->_table.options.random_value);
@@ -678,7 +693,7 @@ int ThreadRunner::op_run(Operation *op) {
&_throttle_limit));
_throttle_ops = 0;
}
- if (op->_optype != Operation::OP_NONE)
+ if (op->is_table_op())
++_throttle_ops;
}
@@ -691,6 +706,10 @@ int ThreadRunner::op_run(Operation *op) {
// (and most likely when the threads are first beginning). Any
// WT_NOTFOUND returns are allowed and get their own statistic bumped.
switch (op->_optype) {
+ case Operation::OP_CHECKPOINT:
+ recno = 0;
+ track = &_stats.checkpoint;
+ break;
case Operation::OP_INSERT:
track = &_stats.insert;
if (op->_key._keytype == Key::KEYGEN_APPEND ||
@@ -700,6 +719,10 @@ int ThreadRunner::op_run(Operation *op) {
else
recno = op_get_key_recno(op, range, tint);
break;
+ case Operation::OP_NONE:
+ case Operation::OP_NOOP:
+ recno = 0;
+ break;
case Operation::OP_REMOVE:
track = &_stats.remove;
recno = op_get_key_recno(op, range, tint);
@@ -712,12 +735,11 @@ int ThreadRunner::op_run(Operation *op) {
track = &_stats.update;
recno = op_get_key_recno(op, range, tint);
break;
- case Operation::OP_NONE:
+ case Operation::OP_SLEEP:
recno = 0;
break;
}
- if ((op->_flags & WORKGEN_OP_REOPEN) != 0 &&
- op->_optype != Operation::OP_NONE) {
+ if ((op->_internal->_flags & WORKGEN_OP_REOPEN) != 0) {
WT_ERR(_session->open_cursor(_session, op->_table._uri.c_str(), NULL,
NULL, &cursor));
own_cursor = true;
@@ -732,6 +754,10 @@ int ThreadRunner::op_run(Operation *op) {
timespec start;
if (measure_latency)
workgen_epoch(&start);
+ // Whether or not we are measuring latency, we track how many operations
+ // are in progress, or that complete.
+ if (track != NULL)
+ track->begin();
if (op->_transaction != NULL) {
if (_in_transaction)
@@ -740,7 +766,7 @@ int ThreadRunner::op_run(Operation *op) {
op->_transaction->_begin_config.c_str());
_in_transaction = true;
}
- if (op->_optype != Operation::OP_NONE) {
+ if (op->is_table_op()) {
op->kv_gen(this, true, 100, recno, _keybuf);
cursor->set_key(cursor, _keybuf);
if (OP_HAS_VALUE(op)) {
@@ -771,15 +797,18 @@ int ThreadRunner::op_run(Operation *op) {
ret = 0; // WT_NOTFOUND allowed.
}
cursor->reset(cursor);
- }
+ } else
+ WT_ERR(op->_internal->run(this, _session));
+
if (measure_latency) {
timespec stop;
workgen_epoch(&stop);
- track->incr_with_latency(ts_us(stop - start));
+ track->complete_with_latency(ts_us(stop - start));
} else if (track != NULL)
- track->incr();
+ track->complete();
if (op->_group != NULL)
+ VERBOSE(*this, "GROUP operation " << op->_repeatgroup << " times");
for (int count = 0; !_stop && count < op->_repeatgroup; count++)
for (std::vector<Operation>::iterator i = op->_group->begin();
i != op->_group->end(); i++)
@@ -932,45 +961,51 @@ void Thread::describe(std::ostream &os) const {
}
Operation::Operation() :
- _optype(OP_NONE), _table(), _key(), _value(), _config(), _transaction(NULL),
- _group(NULL), _repeatgroup(0), _flags(0),
- _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ _optype(OP_NONE), _internal(NULL), _table(), _key(), _value(), _config(),
+ _transaction(NULL), _group(NULL), _repeatgroup(0) {
+ init_internal(NULL);
}
Operation::Operation(OpType optype, Table table, Key key, Value value) :
- _optype(optype), _table(table), _key(key), _value(value), _config(),
- _transaction(NULL), _group(NULL), _repeatgroup(0), _flags(0),
- _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ _optype(optype), _internal(NULL), _table(table), _key(key), _value(value),
+ _config(), _transaction(NULL), _group(NULL), _repeatgroup(0) {
+ init_internal(NULL);
size_check();
}
Operation::Operation(OpType optype, Table table, Key key) :
- _optype(optype), _table(table), _key(key), _value(), _config(),
- _transaction(NULL), _group(NULL), _repeatgroup(0), _flags(0),
- _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ _optype(optype), _internal(NULL), _table(table), _key(key), _value(),
+ _config(), _transaction(NULL), _group(NULL), _repeatgroup(0) {
+ init_internal(NULL);
size_check();
}
Operation::Operation(OpType optype, Table table) :
- _optype(optype), _table(table), _key(), _value(), _config(),
- _transaction(NULL), _group(NULL), _repeatgroup(0), _flags(0),
- _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ _optype(optype), _internal(NULL), _table(table), _key(), _value(),
+ _config(), _transaction(NULL), _group(NULL), _repeatgroup(0) {
+ init_internal(NULL);
size_check();
}
Operation::Operation(const Operation &other) :
- _optype(other._optype), _table(other._table), _key(other._key),
- _value(other._value), _config(other._config),
+ _optype(other._optype), _internal(NULL), _table(other._table),
+ _key(other._key), _value(other._value), _config(other._config),
_transaction(other._transaction), _group(other._group),
- _repeatgroup(other._repeatgroup), _flags(other._flags),
- _keysize(other._keysize), _valuesize(other._valuesize),
- _keymax(other._keymax), _valuemax(other._valuemax) {
+ _repeatgroup(other._repeatgroup) {
// Creation and destruction of _group and _transaction is managed
// by Python.
+ init_internal(other._internal);
+}
+
+Operation::Operation(OpType optype, const char *config) :
+ _optype(optype), _internal(NULL), _table(), _key(), _value(),
+ _config(config), _transaction(NULL), _group(NULL), _repeatgroup(0) {
+ init_internal(NULL);
}
Operation::~Operation() {
// Creation and destruction of _group, _transaction is managed by Python.
+ delete _internal;
}
Operation& Operation::operator=(const Operation &other) {
@@ -981,28 +1016,62 @@ Operation& Operation::operator=(const Operation &other) {
_transaction = other._transaction;
_group = other._group;
_repeatgroup = other._repeatgroup;
- _keysize = other._keysize;
- _valuesize = other._valuesize;
- _keymax = other._keymax;
- _valuemax = other._valuemax;
+ delete _internal;
+ _internal = NULL;
+ init_internal(other._internal);
return (*this);
}
-void Operation::create_all() {
- size_check();
+void Operation::init_internal(OperationInternal *other) {
+ ASSERT(_internal == NULL);
- _flags = 0;
- if (!_config.empty()) {
- if (_config == "reopen")
- _flags |= WORKGEN_OP_REOPEN;
+ switch (_optype) {
+ case OP_CHECKPOINT:
+ if (other == NULL)
+ _internal = new CheckpointOperationInternal();
+ else
+ _internal = new CheckpointOperationInternal(
+ *(CheckpointOperationInternal *)other);
+ break;
+ case OP_INSERT:
+ case OP_REMOVE:
+ case OP_SEARCH:
+ case OP_UPDATE:
+ if (other == NULL)
+ _internal = new TableOperationInternal();
+ else
+ _internal = new TableOperationInternal(
+ *(TableOperationInternal *)other);
+ break;
+ case OP_NONE:
+ case OP_NOOP:
+ if (other == NULL)
+ _internal = new OperationInternal();
+ else
+ _internal = new OperationInternal(*other);
+ break;
+ case OP_SLEEP:
+ if (other == NULL)
+ _internal = new SleepOperationInternal();
else
- THROW("operation has illegal config: \"" << _config << "\"");
+ _internal = new SleepOperationInternal(
+ *(SleepOperationInternal *)other);
+ break;
+ default:
+ ASSERT(false);
}
}
+void Operation::create_all() {
+ size_check();
+
+ _internal->_flags = 0;
+ _internal->parse_config(_config);
+}
+
void Operation::describe(std::ostream &os) const {
os << "Operation: " << _optype;
- if (_optype != OP_NONE) {
+ if (is_table_op()) {
os << ", "; _table.describe(os);
os << ", "; _key.describe(os);
os << ", "; _value.describe(os);
@@ -1029,33 +1098,44 @@ void Operation::describe(std::ostream &os) const {
}
void Operation::get_static_counts(Stats &stats, int multiplier) {
- switch (_optype) {
- case OP_NONE:
- break;
- case OP_INSERT:
- stats.insert.ops += multiplier;
- break;
- case OP_REMOVE:
- stats.remove.ops += multiplier;
- break;
- case OP_SEARCH:
- stats.read.ops += multiplier;
- break;
- case OP_UPDATE:
- stats.update.ops += multiplier;
- break;
- default:
- ASSERT(false);
- }
+ if (is_table_op())
+ switch (_optype) {
+ case OP_INSERT:
+ stats.insert.ops += multiplier;
+ break;
+ case OP_REMOVE:
+ stats.remove.ops += multiplier;
+ break;
+ case OP_SEARCH:
+ stats.read.ops += multiplier;
+ break;
+ case OP_UPDATE:
+ stats.update.ops += multiplier;
+ break;
+ default:
+ ASSERT(false);
+ }
+ else if (_optype == OP_CHECKPOINT)
+ stats.checkpoint.ops += multiplier;
+
if (_group != NULL)
for (std::vector<Operation>::iterator i = _group->begin();
i != _group->end(); i++)
i->get_static_counts(stats, multiplier * _repeatgroup);
}
+bool Operation::is_table_op() const {
+ return (_optype == OP_INSERT || _optype == OP_REMOVE ||
+ _optype == OP_SEARCH || _optype == OP_UPDATE);
+}
+
void Operation::kv_compute_max(bool iskey, bool has_random) {
uint64_t max;
int size;
+ TableOperationInternal *internal;
+
+ ASSERT(is_table_op());
+ internal = (TableOperationInternal *)_internal;
size = iskey ? _key._size : _value._size;
if (size == 0)
@@ -1076,31 +1156,40 @@ void Operation::kv_compute_max(bool iskey, bool has_random) {
max = 0;
if (iskey) {
- _keysize = size;
- _keymax = max;
+ internal->_keysize = size;
+ internal->_keymax = max;
} else {
- _valuesize = size;
- _valuemax = max;
+ internal->_valuesize = size;
+ internal->_valuemax = max;
}
}
void Operation::kv_size_buffer(bool iskey, size_t &maxsize) const {
+ TableOperationInternal *internal;
+
+ ASSERT(is_table_op());
+ internal = (TableOperationInternal *)_internal;
+
if (iskey) {
- if ((size_t)_keysize > maxsize)
- maxsize = _keysize;
+ if ((size_t)internal->_keysize > maxsize)
+ maxsize = internal->_keysize;
} else {
- if ((size_t)_valuesize > maxsize)
- maxsize = _valuesize;
+ if ((size_t)internal->_valuesize > maxsize)
+ maxsize = internal->_valuesize;
}
}
void Operation::kv_gen(ThreadRunner *runner, bool iskey,
uint64_t compressibility, uint64_t n, char *result) const {
- uint64_t max;
- int size;
+ TableOperationInternal *internal;
+ uint_t max;
+ uint_t size;
- size = iskey ? _keysize : _valuesize;
- max = iskey ? _keymax : _valuemax;
+ ASSERT(is_table_op());
+ internal = (TableOperationInternal *)_internal;
+
+ size = iskey ? internal->_keysize : internal->_valuesize;
+ max = iskey ? internal->_keymax : internal->_valuemax;
if (n > max)
THROW((iskey ? "Key" : "Value") << " (" << n
<< ") too large for size (" << size << ")");
@@ -1124,13 +1213,13 @@ void Operation::kv_gen(ThreadRunner *runner, bool iskey,
* That means that 75% of the string will be random numbers, and 25
* will be easily compressible zero-fill.
*/
- uint64_t random_len = size - ((size * compressibility) / 100);
+ uint_t random_len = size - ((size * compressibility) / 100);
/* Never overwrite the record number identifier */
if (random_len > size - 20)
random_len = size - 20;
- for (int i = 0; i < random_len; ++i)
+ for (uint64_t i = 0; i < random_len; ++i)
/*
* TODO: It'd be nice to use workgen_rand here, but this class
* is without the context of a runner thread, so it's not easy
@@ -1141,22 +1230,60 @@ void Operation::kv_gen(ThreadRunner *runner, bool iskey,
}
void Operation::size_check() const {
- if (_optype != OP_NONE && _key._size == 0 && _table.options.key_size == 0)
- THROW("operation requires a key size");
- if (OP_HAS_VALUE(this) && _value._size == 0 &&
- _table.options.value_size == 0)
- THROW("operation requires a value size");
+ if (is_table_op()) {
+ if (_key._size == 0 && _table.options.key_size == 0)
+ THROW("operation requires a key size");
+ if (OP_HAS_VALUE(this) && _value._size == 0 &&
+ _table.options.value_size == 0)
+ THROW("operation requires a value size");
+ }
}
-Track::Track(bool latency_tracking) : ops(0), latency_ops(0), latency(0),
- min_latency(0), max_latency(0), us(NULL), ms(NULL), sec(NULL) {
- track_latency(latency_tracking);
+int CheckpointOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
+{
+ return (session->checkpoint(session, NULL));
+}
+
+void SleepOperationInternal::parse_config(const std::string &config)
+{
+ int amount = 0;
+ const char *configp;
+ char *endp;
+
+ configp = config.c_str();
+ _sleepvalue = strtod(configp, &endp);
+ if (configp == endp || *endp != '\0' || _sleepvalue < 0.0)
+ THROW("sleep operation requires a configuration string as "
+ "a non-negative float, e.g. '1.5'");
+}
+
+int SleepOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
+{
+ (void)session; /* not used */
+ sleep(_sleepvalue);
+ return (0);
}
-Track::Track(const Track &other) : ops(other.ops),
- latency_ops(other.latency_ops), latency(other.latency),
- min_latency(other.min_latency), max_latency(other.max_latency),
+void TableOperationInternal::parse_config(const std::string &config)
+{
+ if (!config.empty()) {
+ if (config == "reopen")
+ _flags |= WORKGEN_OP_REOPEN;
+ else
+ THROW("table operation has illegal config: \"" << config << "\"");
+ }
+}
+
+Track::Track(bool latency_tracking) : ops_in_progress(0), ops(0),
+ latency_ops(0), latency(0), bucket_ops(0), min_latency(0), max_latency(0),
us(NULL), ms(NULL), sec(NULL) {
+ track_latency(latency_tracking);
+}
+
+Track::Track(const Track &other) : ops_in_progress(other.ops_in_progress),
+ ops(other.ops), latency_ops(other.latency_ops), latency(other.latency),
+ bucket_ops(other.bucket_ops), min_latency(other.min_latency),
+ max_latency(other.max_latency), us(NULL), ms(NULL), sec(NULL) {
if (other.us != NULL) {
us = new uint32_t[LATENCY_US_BUCKETS];
ms = new uint32_t[LATENCY_MS_BUCKETS];
@@ -1176,6 +1303,7 @@ Track::~Track() {
}
void Track::add(Track &other, bool reset) {
+ ops_in_progress += other.ops_in_progress;
ops += other.ops;
latency_ops += other.latency_ops;
latency += other.latency;
@@ -1198,6 +1326,7 @@ void Track::add(Track &other, bool reset) {
}
void Track::assign(const Track &other) {
+ ops_in_progress = other.ops_in_progress;
ops = other.ops;
latency_ops = other.latency_ops;
latency = other.latency;
@@ -1231,10 +1360,16 @@ uint64_t Track::average_latency() const {
return (latency / latency_ops);
}
+void Track::begin() {
+ ops_in_progress++;
+}
+
void Track::clear() {
+ ops_in_progress = 0;
ops = 0;
latency_ops = 0;
latency = 0;
+ bucket_ops = 0;
min_latency = 0;
max_latency = 0;
if (us != NULL) {
@@ -1244,13 +1379,15 @@ void Track::clear() {
}
}
-void Track::incr() {
+void Track::complete() {
+ --ops_in_progress;
ops++;
}
-void Track::incr_with_latency(uint64_t usecs) {
+void Track::complete_with_latency(uint64_t usecs) {
ASSERT(us != NULL);
+ --ops_in_progress;
ops++;
latency_ops++;
latency += usecs;
@@ -1277,7 +1414,51 @@ void Track::incr_with_latency(uint64_t usecs) {
sec[LATENCY_SEC_BUCKETS - 1]++;
}
+// Return the latency for which the given percent is lower than it.
+// E.g. for percent == 95, returns the latency for which 95% of latencies
+// are faster (lower), and 5% are slower (higher).
+uint64_t Track::percentile_latency(int percent) const {
+ // Get the total number of operations in the latency buckets.
+ // We can't reliably use latency_ops, because this struct was
+ // added up from Track structures that were being copied while
+ // being updated.
+ uint64_t total = 0;
+ for (int i = 0; i < LATENCY_SEC_BUCKETS; i++)
+ total += sec[i];
+ for (int i = 0; i < LATENCY_MS_BUCKETS; i++)
+ total += ms[i];
+ for (int i = 0; i < LATENCY_US_BUCKETS; i++)
+ total += us[i];
+ if (total == 0)
+ return (0);
+
+ // optimized for percent values over 50, we start counting from above.
+ uint64_t n = 0;
+ uint64_t k = (100 - percent) * total / 100;
+ if (k == 0)
+ return (0);
+ for (int i = LATENCY_SEC_BUCKETS - 1; i >= 0; --i) {
+ n += sec[i];
+ if (n >= k)
+ return (sec_to_us(i));
+ }
+ for (int i = LATENCY_MS_BUCKETS - 1; i >= 0; --i) {
+ n += ms[i];
+ if (n >= k)
+ return (ms_to_us(i));
+ }
+ for (int i = LATENCY_US_BUCKETS - 1; i >= 0; --i) {
+ n += us[i];
+ if (n >= k)
+ return (100 * i);
+ }
+ // We should have accounted for all the buckets.
+ ASSERT(false);
+ return (0);
+}
+
void Track::subtract(const Track &other) {
+ ops_in_progress -= other.ops_in_progress;
ops -= other.ops;
latency_ops -= other.latency_ops;
latency -= other.latency;
@@ -1294,19 +1475,6 @@ void Track::subtract(const Track &other) {
}
}
-// If there are no entries in this Track, take them from
-// a previous Track. Used to smooth graphs. We don't worry
-// about latency buckets here.
-void Track::smooth(const Track &other) {
- if (latency_ops == 0) {
- ops = other.ops;
- latency = other.latency;
- latency_ops = other.latency_ops;
- min_latency = other.min_latency;
- max_latency = other.max_latency;
- }
-}
-
void Track::track_latency(bool newval) {
if (newval) {
if (us == NULL) {
@@ -1351,18 +1519,20 @@ void Track::_get_sec(long *result) {
memset(result, 0, sizeof(long) * LATENCY_SEC_BUCKETS);
}
-Stats::Stats(bool latency) : insert(latency), not_found(latency),
- read(latency), remove(latency), update(latency), truncate(latency) {
+Stats::Stats(bool latency) : checkpoint(latency), insert(latency),
+ not_found(latency), read(latency), remove(latency), update(latency),
+ truncate(latency) {
}
-Stats::Stats(const Stats &other) : insert(other.insert),
- not_found(other.not_found), read(other.read), remove(other.remove),
- update(other.update), truncate(other.truncate) {
+Stats::Stats(const Stats &other) : checkpoint(other.checkpoint),
+ insert(other.insert), not_found(other.not_found), read(other.read),
+ remove(other.remove), update(other.update), truncate(other.truncate) {
}
Stats::~Stats() {}
void Stats::add(Stats &other, bool reset) {
+ checkpoint.add(other.checkpoint, reset);
insert.add(other.insert, reset);
not_found.add(other.not_found, reset);
read.add(other.read, reset);
@@ -1372,6 +1542,7 @@ void Stats::add(Stats &other, bool reset) {
}
void Stats::assign(const Stats &other) {
+ checkpoint.assign(other.checkpoint);
insert.assign(other.insert);
not_found.assign(other.not_found);
read.assign(other.read);
@@ -1381,6 +1552,7 @@ void Stats::assign(const Stats &other) {
}
void Stats::clear() {
+ checkpoint.clear();
insert.clear();
not_found.clear();
read.clear();
@@ -1398,10 +1570,12 @@ void Stats::describe(std::ostream &os) const {
os << ", updates " << update.ops;
os << ", truncates " << truncate.ops;
os << ", removes " << remove.ops;
+ os << ", checkpoints " << checkpoint.ops;
}
void Stats::final_report(std::ostream &os, timespec &totalsecs) const {
uint64_t ops = 0;
+ ops += checkpoint.ops;
ops += read.ops;
ops += not_found.ops;
ops += insert.ops;
@@ -1420,6 +1594,7 @@ void Stats::final_report(std::ostream &os, timespec &totalsecs) const {
FINAL_OUTPUT(os, update.ops, update, ops, totalsecs);
FINAL_OUTPUT(os, truncate.ops, truncate, ops, totalsecs);
FINAL_OUTPUT(os, remove.ops, remove, ops, totalsecs);
+ FINAL_OUTPUT(os, checkpoint.ops, checkpoint, ops, totalsecs);
}
void Stats::report(std::ostream &os) const {
@@ -1430,19 +1605,12 @@ void Stats::report(std::ostream &os) const {
os << ", " << insert.ops << " inserts, ";
os << update.ops << " updates, ";
os << truncate.ops << " truncates, ";
- os << remove.ops << " removes";
-}
-
-void Stats::smooth(const Stats &other) {
- insert.smooth(other.insert);
- not_found.smooth(other.not_found);
- read.smooth(other.read);
- remove.smooth(other.remove);
- update.smooth(other.update);
- truncate.smooth(other.truncate);
+ os << remove.ops << " removes, ";
+ os << checkpoint.ops << " checkpoints";
}
void Stats::subtract(const Stats &other) {
+ checkpoint.subtract(other.checkpoint);
insert.subtract(other.insert);
not_found.subtract(other.not_found);
read.subtract(other.read);
@@ -1452,6 +1620,7 @@ void Stats::subtract(const Stats &other) {
}
void Stats::track_latency(bool latency) {
+ checkpoint.track_latency(latency);
insert.track_latency(latency);
not_found.track_latency(latency);
read.track_latency(latency);
@@ -1507,7 +1676,7 @@ TableInternal::~TableInternal() {}
WorkloadOptions::WorkloadOptions() : max_latency(0),
report_file("workload.stat"), report_interval(0), run_time(0),
- sample_file("sample.json"), sample_interval(0), sample_rate(1), warmup(0),
+ sample_file("monitor.json"), sample_interval(0), sample_rate(1), warmup(0),
_options() {
_options.add_int("max_latency", max_latency,
"prints warning if any latency measured exceeds this number of "
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.h b/src/third_party/wiredtiger/bench/workgen/workgen.h
index 7de03a90f17..cc93409b388 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen.h
+++ b/src/third_party/wiredtiger/bench/workgen/workgen.h
@@ -30,6 +30,9 @@
#include <vector>
#include <map>
+// For convenience: A type exposed to Python that cannot be negative.
+typedef unsigned int uint_t;
+
namespace workgen {
struct ContextInternal;
@@ -81,9 +84,11 @@ struct Track {
// Threads maintain the total thread operation and total latency they've
// experienced.
- uint64_t ops; // Total operations */
+ uint64_t ops_in_progress; // Total operations not completed */
+ uint64_t ops; // Total operations completed */
uint64_t latency_ops; // Total ops sampled for latency
uint64_t latency; // Total latency */
+ uint64_t bucket_ops; // Computed for percentile_latency
// Minimum/maximum latency, shared with the monitor thread, that is, the
// monitor thread clears it so it's recalculated again for each period.
@@ -98,10 +103,11 @@ struct Track {
void add(Track&, bool reset = false);
void assign(const Track&);
uint64_t average_latency() const;
+ void begin();
void clear();
- void incr();
- void incr_with_latency(uint64_t usecs);
- void smooth(const Track&);
+ void complete();
+ void complete_with_latency(uint64_t usecs);
+ uint64_t percentile_latency(int percent) const;
void subtract(const Track&);
void track_latency(bool);
bool track_latency() const { return (us != NULL); }
@@ -120,6 +126,7 @@ private:
};
struct Stats {
+ Track checkpoint;
Track insert;
Track not_found;
Track read;
@@ -139,7 +146,6 @@ struct Stats {
void final_report(std::ostream &os, timespec &totalsecs) const;
void report(std::ostream &os) const;
#endif
- void smooth(const Stats&);
void subtract(const Stats&);
void track_latency(bool);
bool track_latency() const { return (insert.track_latency()); }
@@ -170,11 +176,11 @@ struct Context {
// properties are prevented, only existing properties can be set.
//
struct TableOptions {
- int key_size;
- int value_size;
- uint64_t value_compressibility;
+ uint_t key_size;
+ uint_t value_size;
+ uint_t value_compressibility;
bool random_value;
- int range;
+ uint_t range;
TableOptions();
TableOptions(const TableOptions &other);
@@ -276,8 +282,10 @@ struct Value {
struct Operation {
enum OpType {
- OP_NONE, OP_INSERT, OP_REMOVE, OP_SEARCH, OP_UPDATE };
+ OP_CHECKPOINT, OP_INSERT, OP_NONE, OP_NOOP, OP_REMOVE, OP_SEARCH,
+ OP_SLEEP, OP_UPDATE };
OpType _optype;
+ OperationInternal *_internal;
Table _table;
Key _key;
@@ -287,28 +295,22 @@ struct Operation {
std::vector<Operation> *_group;
int _repeatgroup;
-#ifndef SWIG
-#define WORKGEN_OP_REOPEN 0x0001 // reopen cursor for each op
- uint32_t _flags;
-
- int _keysize; // derived from Key._size and Table.options.key_size
- int _valuesize;
- uint64_t _keymax;
- uint64_t _valuemax;
-#endif
-
Operation();
Operation(OpType optype, Table table, Key key, Value value);
Operation(OpType optype, Table table, Key key);
Operation(OpType optype, Table table);
+ // Constructor with string applies to NOOP, SLEEP, CHECKPOINT
+ Operation(OpType optype, const char *config);
Operation(const Operation &other);
~Operation();
void describe(std::ostream &os) const;
#ifndef SWIG
Operation& operator=(const Operation &other);
+ void init_internal(OperationInternal *other);
void create_all();
void get_static_counts(Stats &stats, int multiplier);
+ bool is_table_op() const;
void kv_compute_max(bool iskey, bool has_random);
void kv_gen(ThreadRunner *runner, bool iskey, uint64_t compressibility,
uint64_t n, char *result) const;
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen_int.h b/src/third_party/wiredtiger/bench/workgen/workgen_int.h
index c38f709efa1..dbcde8472b5 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen_int.h
+++ b/src/third_party/wiredtiger/bench/workgen/workgen_int.h
@@ -176,6 +176,48 @@ struct ContextInternal {
int create_all();
};
+struct OperationInternal {
+#define WORKGEN_OP_REOPEN 0x0001 // reopen cursor for each op
+ uint32_t _flags;
+
+ OperationInternal() : _flags(0) {}
+ OperationInternal(const OperationInternal &other) : _flags(other._flags) {}
+ virtual ~OperationInternal() {}
+ virtual void parse_config(const std::string &config) {}
+ virtual int run(ThreadRunner *runner, WT_SESSION *session) {
+ (void)runner; (void)session; return (0); }
+};
+
+struct CheckpointOperationInternal : OperationInternal {
+ CheckpointOperationInternal() : OperationInternal() {}
+ CheckpointOperationInternal(const CheckpointOperationInternal &other) {}
+ virtual int run(ThreadRunner *runner, WT_SESSION *session);
+};
+
+struct TableOperationInternal : OperationInternal {
+ uint_t _keysize; // derived from Key._size and Table.options.key_size
+ uint_t _valuesize;
+ uint_t _keymax;
+ uint_t _valuemax;
+
+ TableOperationInternal() : OperationInternal(), _keysize(0), _valuesize(0),
+ _keymax(0),_valuemax(0) {}
+ TableOperationInternal(const TableOperationInternal &other) :
+ _keysize(other._keysize), _valuesize(other._valuesize),
+ _keymax(other._keymax), _valuemax(other._valuemax) {}
+ virtual void parse_config(const std::string &config);
+};
+
+struct SleepOperationInternal : OperationInternal {
+ float _sleepvalue;
+
+ SleepOperationInternal() : OperationInternal(), _sleepvalue(0) {}
+ SleepOperationInternal(const SleepOperationInternal &other) :
+ _sleepvalue(other._sleepvalue) {}
+ virtual void parse_config(const std::string &config);
+ virtual int run(ThreadRunner *runner, WT_SESSION *session);
+};
+
struct TableInternal {
tint_t _tint;
uint32_t _context_count;
diff --git a/src/third_party/wiredtiger/bench/workgen/wtperf.py b/src/third_party/wiredtiger/bench/workgen/wtperf.py
index 9da8c37fd3a..b059b31f8db 100644..100755
--- a/src/third_party/wiredtiger/bench/workgen/wtperf.py
+++ b/src/third_party/wiredtiger/bench/workgen/wtperf.py
@@ -76,7 +76,8 @@ class Translator:
self.error(msg)
raise TranslateException(errtype)
- supported_opt_list = [ 'close_conn', 'compression', 'compact',
+ supported_opt_list = [ 'checkpoint_interval', 'checkpoint_threads',
+ 'close_conn', 'compact', 'compression',
'conn_config', 'create', 'icount',
'key_sz', 'log_like_table', 'pareto',
'populate_ops_per_txn', 'populate_threads',
@@ -219,13 +220,14 @@ class Translator:
str(factor) + ' to compensate for log_like operations.\n'
return (new_throttle, comment)
- def parse_threads(self, threads_config):
+ def parse_threads(self, threads_config, checkpoint_threads):
opts = self.options
tdecls = ''
tlist = self.split_config_parens(threads_config)
table_count = self.get_int_opt('table_count', 1)
log_like_table = self.get_boolean_opt('log_like_table', False)
txn_config = self.get_string_opt('transaction_config', '')
+ checkpoint_interval = self.get_int_opt('checkpoint_interval', 120)
run_ops = self.get_int_opt('run_ops', -1)
if log_like_table:
tdecls += 'log_name = "table:log"\n'
@@ -317,6 +319,18 @@ class Translator:
tnames += str(topts.count) + ' * '
tnames += thread_name + ' + '
+ if checkpoint_threads != 0:
+ thread_name = 'checkpoint_thread'
+
+ tdecls += 'ops = Operation(Operation.OP_SLEEP, "' + \
+ str(checkpoint_interval) + \
+ '") + \\\n Operation(Operation.OP_CHECKPOINT, "")\n'
+ tdecls += thread_name + ' = Thread(ops)\n'
+ tdecls += '\n'
+ if checkpoint_threads > 1:
+ tnames += str(checkpoint_threads) + ' * '
+ tnames += thread_name + ' + '
+
tnames = tnames.rstrip(' +')
return (tdecls, tnames)
@@ -552,8 +566,10 @@ class Translator:
s += self.translate_populate()
thread_config = self.get_string_opt('threads', '')
- if thread_config != '':
- (t_create, t_var) = self.parse_threads(thread_config)
+ checkpoint_threads = self.get_int_opt('checkpoint_threads', 0)
+ if thread_config != '' or checkpoint_threads != 0:
+ (t_create, t_var) = self.parse_threads(thread_config,
+ checkpoint_threads)
s += '\n' + t_create
if reopen_connection:
s += '\n# reopen the connection\n'