diff options
Diffstat (limited to 'src/third_party/wiredtiger/bench/workgen/workgen.cxx')
-rw-r--r-- | src/third_party/wiredtiger/bench/workgen/workgen.cxx | 168 |
1 files changed, 138 insertions, 30 deletions
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.cxx b/src/third_party/wiredtiger/bench/workgen/workgen.cxx index ce9debcca2f..31e21e6f6c9 100644 --- a/src/third_party/wiredtiger/bench/workgen/workgen.cxx +++ b/src/third_party/wiredtiger/bench/workgen/workgen.cxx @@ -240,7 +240,8 @@ Context& Context::operator=(const Context &other) { } ContextInternal::ContextInternal() : _tint(), _table_names(), - _recno(NULL), _recno_alloced(0), _tint_last(0), _context_count(0) { + _table_runtime(NULL), _runtime_alloced(0), _tint_last(0), + _context_count(0) { uint32_t count; if ((count = workgen_atomic_add32(&context_count, 1)) != 1) THROW("multiple Contexts not supported"); @@ -248,20 +249,20 @@ ContextInternal::ContextInternal() : _tint(), _table_names(), } ContextInternal::~ContextInternal() { - if (_recno != NULL) - delete _recno; + if (_table_runtime != NULL) + delete _table_runtime; } int ContextInternal::create_all() { - if (_recno_alloced != _tint_last) { + if (_runtime_alloced != _tint_last) { // The array references are 1-based, we'll waste one entry. - uint64_t *new_recno = new uint64_t[_tint_last + 1]; - memcpy(new_recno, _recno, sizeof(uint64_t) * _recno_alloced); - memset(&new_recno[_recno_alloced], 0, - sizeof(uint64_t) * (_tint_last - _recno_alloced + 1)); - delete _recno; - _recno = new_recno; - _recno_alloced = _tint_last; + TableRuntime *new_table_runtime = new TableRuntime[_tint_last + 1]; + memcpy(new_table_runtime, _table_runtime, sizeof(uint64_t) * _runtime_alloced); + memset(&new_table_runtime[_runtime_alloced], 0, + sizeof(uint64_t) * (_tint_last - _runtime_alloced + 1)); + delete _table_runtime; + _table_runtime = new_table_runtime; + _runtime_alloced = _tint_last; } return (0); } @@ -301,7 +302,9 @@ int Monitor::run() { workgen_version(version, sizeof(version)); Stats prev_interval; while (!_stop) { - for (int i = 0; i < options->sample_interval && !_stop; i++) + int waitsecs = (first && options->warmup > 0) ? options->warmup : + options->sample_interval; + for (int i = 0; i < waitsecs && !_stop; i++) sleep(1); if (_stop) break; @@ -387,6 +390,22 @@ int Monitor::run() { return (0); } +ParetoOptions ParetoOptions::DEFAULT; +ParetoOptions::ParetoOptions(int param_arg) : param(param_arg), range_low(0.0), + range_high(1.0), _options() { + _options.add_int("param", param, + "0 is disabled, otherwise a range from 1 (most aggressive) to " + "100 (least aggressive)"); + _options.add_double("range_low", range_low, + "between 0.0 and 1.0, starting range of the pareto distribution"); + _options.add_double("range_high", range_high, + "between 0.0 and 1.0, ending range of the pareto distribution"); +} +ParetoOptions::ParetoOptions(const ParetoOptions &other) : + param(other.param), range_low(other.range_low), + range_high(other.range_high), _options(other._options) {} +ParetoOptions::~ParetoOptions() {} + ThreadRunner::ThreadRunner() : _errno(0), _exception(), _thread(NULL), _context(NULL), _icontext(NULL), _workload(NULL), _wrunner(NULL), _rand_state(NULL), @@ -536,9 +555,12 @@ void ThreadRunner::op_create_all(Operation *op, size_t &keysize, op->create_all(); if (op->_optype != Operation::OP_NONE) { - op->kv_compute_max(true); + op->kv_compute_max(true, false); if (OP_HAS_VALUE(op)) - op->kv_compute_max(false); + op->kv_compute_max(false, op->_table.options.random_value); + if (op->_key._keytype == Key::KEYGEN_PARETO && + op->_key._pareto.param == 0) + THROW("Key._pareto value must be set if KEYGEN_PARETO specified"); op->kv_size_buffer(true, keysize); op->kv_size_buffer(false, valuesize); @@ -575,17 +597,66 @@ void ThreadRunner::op_create_all(Operation *op, size_t &keysize, op_create_all(&*i, keysize, valuesize); } -uint64_t ThreadRunner::op_get_key_recno(Operation *op, tint_t tint) { + +#define PARETO_SHAPE 1.5 + +// Return a value within the interval [ 0, recno_max ) +// that is weighted toward lower numbers with pareto_param at 0 (the minimum), +// and more evenly distributed with pareto_param at 100 (the maximum). +// +static uint64_t +pareto_calculation(uint32_t randint, uint64_t recno_max, + ParetoOptions &pareto) { + double S1, S2, U; + uint32_t result; + double r; + + r = (double)randint; + if (pareto.range_high != 1.0 || pareto.range_low != 0.0) { + if (pareto.range_high <= pareto.range_low || + pareto.range_high > 1.0 || pareto.range_low < 0.0) + THROW("Pareto illegal range"); + r = (pareto.range_low * (double)UINT32_MAX) + + r * (pareto.range_high - pareto.range_low); + } + S1 = (-1 / PARETO_SHAPE); + S2 = recno_max * (pareto.param / 100.0) * (PARETO_SHAPE - 1); + U = 1 - r / (double)UINT32_MAX; // interval [0, 1) + result = (uint64_t)((pow(U, S1) - 1) * S2); + + // This Pareto calculation chooses out of range values less than 20% + // of the time, depending on pareto_param. For param of 0, it is + // never out of range, for param of 100, 19.2%. For the default + // pareto_param of 20, it will be out of range 2.7% of the time. + // Out of range values are channelled into the first key, + // making it "hot". Unfortunately, that means that using a higher + // param can get a lot lumped into the first bucket. + // + // XXX This matches the behavior of wtperf, we may consider instead + // retrying (modifying the random number) until we get a good value. + // + if (result > recno_max) + result = 0; + return (result); +} + +uint64_t ThreadRunner::op_get_key_recno(Operation *op, uint64_t range, + tint_t tint) { uint64_t recno_count; - uint32_t rand; + uint32_t rval; (void)op; - recno_count = _icontext->_recno[tint]; + if (range > 0) + recno_count = range; + else + recno_count = _icontext->_table_runtime[tint]._max_recno; if (recno_count == 0) // The file has no entries, returning 0 forces a WT_NOTFOUND return. return (0); - rand = workgen_random(_rand_state); - return (rand % recno_count + 1); // recnos are one-based. + rval = workgen_random(_rand_state); + if (op->_key._keytype == Key::KEYGEN_PARETO) + rval = pareto_calculation(rval, recno_count, op->_key._pareto); + return (rval % recno_count + 1); // recnos are one-based. } int ThreadRunner::op_run(Operation *op) { @@ -594,12 +665,14 @@ int ThreadRunner::op_run(Operation *op) { WT_CURSOR *cursor; WT_DECL_RET; uint64_t recno; + uint64_t range; bool measure_latency, own_cursor; track = NULL; cursor = NULL; recno = 0; own_cursor = false; + range = op->_table.options.range; if (_throttle != NULL) { if (_throttle_ops >= _throttle_limit && !_in_transaction) { WT_ERR(_throttle->throttle(_throttle_ops, @@ -621,19 +694,24 @@ int ThreadRunner::op_run(Operation *op) { switch (op->_optype) { case Operation::OP_INSERT: track = &_stats.insert; - recno = workgen_atomic_add64(&_icontext->_recno[tint], 1); + if (op->_key._keytype == Key::KEYGEN_APPEND || + op->_key._keytype == Key::KEYGEN_AUTO) + recno = workgen_atomic_add64( + &_icontext->_table_runtime[tint]._max_recno, 1); + else + recno = op_get_key_recno(op, range, tint); break; case Operation::OP_REMOVE: track = &_stats.remove; - recno = op_get_key_recno(op, tint); + recno = op_get_key_recno(op, range, tint); break; case Operation::OP_SEARCH: track = &_stats.read; - recno = op_get_key_recno(op, tint); + recno = op_get_key_recno(op, range, tint); break; case Operation::OP_UPDATE: track = &_stats.update; - recno = op_get_key_recno(op, tint); + recno = op_get_key_recno(op, range, tint); break; case Operation::OP_NONE: recno = 0; @@ -651,6 +729,7 @@ int ThreadRunner::op_run(Operation *op) { track->track_latency() && (track->ops % _workload->options.sample_rate == 0); + VERBOSE(*this, "OP " << op->_optype << " " << op->_table._uri.c_str() << ", recno=" << recno); timespec start; if (measure_latency) workgen_epoch(&start); @@ -663,10 +742,13 @@ int ThreadRunner::op_run(Operation *op) { _in_transaction = true; } if (op->_optype != Operation::OP_NONE) { - op->kv_gen(true, recno, _keybuf); + op->kv_gen(true, 0, recno, _keybuf); cursor->set_key(cursor, _keybuf); if (OP_HAS_VALUE(op)) { - op->kv_gen(false, recno, _valuebuf); + uint32_t r = 0; + if (op->_table.options.random_value) + r = workgen_random(_rand_state); + op->kv_gen(false, r, recno, _valuebuf); cursor->set_value(cursor, _valuebuf); } switch (op->_optype) { @@ -969,7 +1051,7 @@ void Operation::get_static_counts(Stats &stats, int multiplier) { i->get_static_counts(stats, multiplier * _repeatgroup); } -void Operation::kv_compute_max(bool iskey) { +void Operation::kv_compute_max(bool iskey, bool has_random) { uint64_t max; int size; @@ -981,6 +1063,14 @@ void Operation::kv_compute_max(bool iskey) { THROW("Key.size too small for table '" << _table._uri << "'"); if (!iskey && size < 1) THROW("Value.size too small for table '" << _table._uri << "'"); + if (has_random) { + if (iskey) + THROW("Random keys not allowed"); + size -= RANDOMIZER_SIZE; + if (size < 1) + THROW("Value.size with random values too small for table '" + << _table._uri << "'"); + } if (size > 1) max = power64(10, (size - 1)) - 1; @@ -1006,7 +1096,8 @@ void Operation::kv_size_buffer(bool iskey, size_t &maxsize) const { } } -void Operation::kv_gen(bool iskey, uint64_t n, char *result) const { +void Operation::kv_gen(bool iskey, uint32_t randomizer, uint64_t n, + char *result) const { uint64_t max; int size; @@ -1015,6 +1106,12 @@ void Operation::kv_gen(bool iskey, uint64_t n, char *result) const { if (n > max) THROW((iskey ? "Key" : "Value") << " (" << n << ") too large for size (" << size << ")"); + if (randomizer != 0) { + randomizer %= 1000; + snprintf(result, 6, ":%3.3d:", randomizer); + n -= RANDOMIZER_SIZE; + result += RANDOMIZER_SIZE; + } workgen_u64_to_string_zf(n, result, size); } @@ -1338,14 +1435,20 @@ void Stats::track_latency(bool latency) { truncate.track_latency(latency); } -TableOptions::TableOptions() : key_size(0), value_size(0), _options() { +TableOptions::TableOptions() : key_size(0), value_size(0), + random_value(false), range(0), _options() { _options.add_int("key_size", key_size, "default size of the key, unless overridden by Key.size"); _options.add_int("value_size", value_size, "default size of the value, unless overridden by Value.size"); + _options.add_bool("random_value", random_value, + "generate random content for the value"); + _options.add_int("range", range, + "if zero, keys are inserted at the end and reads/updates are in the current range, if non-zero, inserts/reads/updates are at a random key between 0 and the given range"); } TableOptions::TableOptions(const TableOptions &other) : key_size(other.key_size), value_size(other.value_size), + random_value(other.random_value), range(other.range), _options(other._options) {} TableOptions::~TableOptions() {} @@ -1376,7 +1479,7 @@ TableInternal::~TableInternal() {} WorkloadOptions::WorkloadOptions() : max_latency(0), report_file("workload.stat"), report_interval(0), run_time(0), - sample_file("sample.json"), sample_interval(0), sample_rate(1), + sample_file("sample.json"), sample_interval(0), sample_rate(1), warmup(0), _options() { _options.add_int("max_latency", max_latency, "prints warning if any latency measured exceeds this number of " @@ -1399,6 +1502,8 @@ WorkloadOptions::WorkloadOptions() : max_latency(0), _options.add_int("sample_rate", sample_rate, "how often the latency of operations is measured. 1 for every operation, " "2 for every second operation, 3 for every third operation etc."); + _options.add_int("warmup", warmup, + "how long to run the workload phase before starting measurements"); } WorkloadOptions::WorkloadOptions(const WorkloadOptions &other) : @@ -1569,7 +1674,8 @@ int WorkloadRunner::run_all() { workgen_epoch(&_start); timespec end = _start + options->run_time; - timespec next_report = _start + options->report_interval; + timespec next_report = _start + + ((options->warmup > 0) ? options->warmup : options->report_interval); // Start all threads if (options->sample_interval > 0) { @@ -1653,6 +1759,8 @@ int WorkloadRunner::run_all() { if (exception == NULL && !_trunners[i]._exception._str.empty()) exception = &_trunners[i]._exception; } + + workgen_epoch(&now); if (options->sample_interval > 0) { WT_TRET(pthread_join(monitor._handle, &status)); if (monitor._errno != 0) |