summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/bench
diff options
context:
space:
mode:
authorLuke Chen <luke.chen@mongodb.com>2021-07-19 14:42:54 +1000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-19 05:16:25 +0000
commitc379240d1fbbcfeb4153895d01a1fa8602649a48 (patch)
tree6f51e07cce0433db55f53e549b9c9518a5ceff12 /src/third_party/wiredtiger/bench
parente7e03817009d1b921e21b34cf9bb10a0ea692dd9 (diff)
downloadmongo-c379240d1fbbcfeb4153895d01a1fa8602649a48.tar.gz
Import wiredtiger: c0d466d05f48b392e7c38a1ffc4a1f5c30b58d6c from branch mongodb-master
ref: 038e0a6839..c0d466d05f for: 5.1.0 WT-7629 Run clang format on .cxx files in the codebase.
Diffstat (limited to 'src/third_party/wiredtiger/bench')
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen.cxx1109
1 files changed, 631 insertions, 478 deletions
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.cxx b/src/third_party/wiredtiger/bench/workgen/workgen.cxx
index 5ac782da18e..2f7c0c4f5c4 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen.cxx
+++ b/src/third_party/wiredtiger/bench/workgen/workgen.cxx
@@ -26,8 +26,8 @@
* OTHER DEALINGS IN THE SOFTWARE.
*/
-#define __STDC_LIMIT_MACROS // needed to get UINT64_MAX in C++
-#define __STDC_FORMAT_MACROS // needed to get PRIuXX macros in C++
+#define __STDC_LIMIT_MACROS // needed to get UINT64_MAX in C++
+#define __STDC_FORMAT_MACROS // needed to get PRIuXX macros in C++
#include <iomanip>
#include <iostream>
#include <fstream>
@@ -52,55 +52,51 @@ extern "C" {
#define LATENCY_MS_BUCKETS 1000
#define LATENCY_SEC_BUCKETS 100
-#define THROTTLE_PER_SEC 20 // times per sec we will throttle
+#define THROTTLE_PER_SEC 20 // times per sec we will throttle
-#define MIN(a, b) ((a) < (b) ? (a) : (b))
-#define MAX(a, b) ((a) < (b) ? (b) : (a))
-#define TIMESPEC_DOUBLE(ts) ((double)(ts).tv_sec + ts.tv_nsec * 0.000000001)
-#define PCT(n, total) ((total) == 0 ? 0 : ((n) * 100) / (total))
-#define OPS_PER_SEC(ops, ts) (int) ((ts) == 0 ? 0.0 : \
- (ops) / TIMESPEC_DOUBLE(ts))
+#define MIN(a, b) ((a) < (b) ? (a) : (b))
+#define MAX(a, b) ((a) < (b) ? (b) : (a))
+#define TIMESPEC_DOUBLE(ts) ((double)(ts).tv_sec + ts.tv_nsec * 0.000000001)
+#define PCT(n, total) ((total) == 0 ? 0 : ((n)*100) / (total))
+#define OPS_PER_SEC(ops, ts) (int)((ts) == 0 ? 0.0 : (ops) / TIMESPEC_DOUBLE(ts))
// Get the value of a STL container, even if it is not present
-#define CONTAINER_VALUE(container, idx, dfault) \
+#define CONTAINER_VALUE(container, idx, dfault) \
(((container).count(idx) > 0) ? (container)[idx] : (dfault))
-#define CROSS_USAGE(a, b) \
- (((a & USAGE_READ) != 0 && (b & USAGE_WRITE) != 0) || \
- ((a & USAGE_WRITE) != 0 && (b & USAGE_READ) != 0))
-
-#define ASSERT(cond) \
- do { \
- if (!(cond)) { \
- fprintf(stderr, "%s:%d: ASSERT failed: %s\n", \
- __FILE__, __LINE__, #cond); \
- abort(); \
- } \
- } while(0)
-
-#define THROW_ERRNO(e, args) \
- do { \
- std::stringstream __sstm; \
- __sstm << args; \
- WorkgenException __wge(e, __sstm.str().c_str()); \
- throw(__wge); \
- } while(0)
-
-#define THROW(args) THROW_ERRNO(0, args)
-
-#define VERBOSE(runner, args) \
- do { \
- if ((runner)._context->_verbose) \
- std::cout << args << std::endl; \
- } while(0)
-
-#define OP_HAS_VALUE(op) \
- ((op)->_optype == Operation::OP_INSERT || \
- (op)->_optype == Operation::OP_UPDATE)
+#define CROSS_USAGE(a, b) \
+ (((a & USAGE_READ) != 0 && (b & USAGE_WRITE) != 0) || \
+ ((a & USAGE_WRITE) != 0 && (b & USAGE_READ) != 0))
+
+#define ASSERT(cond) \
+ do { \
+ if (!(cond)) { \
+ fprintf(stderr, "%s:%d: ASSERT failed: %s\n", __FILE__, __LINE__, #cond); \
+ abort(); \
+ } \
+ } while (0)
+
+#define THROW_ERRNO(e, args) \
+ do { \
+ std::stringstream __sstm; \
+ __sstm << args; \
+ WorkgenException __wge(e, __sstm.str().c_str()); \
+ throw(__wge); \
+ } while (0)
+
+#define THROW(args) THROW_ERRNO(0, args)
+
+#define VERBOSE(runner, args) \
+ do { \
+ if ((runner)._context->_verbose) \
+ std::cout << args << std::endl; \
+ } while (0)
+
+#define OP_HAS_VALUE(op) \
+ ((op)->_optype == Operation::OP_INSERT || (op)->_optype == Operation::OP_UPDATE)
namespace workgen {
-
struct WorkloadRunnerConnection {
WorkloadRunner *runner;
WT_CONNECTION *connection;
@@ -112,7 +108,9 @@ struct WorkloadRunnerConnection {
// than one.
static uint32_t context_count = 0;
-static void *thread_runner_main(void *arg) {
+static void *
+thread_runner_main(void *arg)
+{
ThreadRunner *runner = (ThreadRunner *)arg;
try {
runner->_errno = runner->run();
@@ -122,9 +120,11 @@ static void *thread_runner_main(void *arg) {
return (NULL);
}
-static void *thread_workload(void *arg) {
+static void *
+thread_workload(void *arg)
+{
- WorkloadRunnerConnection *runnerConnection = (WorkloadRunnerConnection *) arg;
+ WorkloadRunnerConnection *runnerConnection = (WorkloadRunnerConnection *)arg;
WorkloadRunner *runner = runnerConnection->runner;
WT_CONNECTION *connection = runnerConnection->connection;
@@ -137,8 +137,10 @@ static void *thread_workload(void *arg) {
return (NULL);
}
-static void *thread_idle_table_cycle_workload(void *arg) {
- WorkloadRunnerConnection *runnerConnection = (WorkloadRunnerConnection *) arg;
+static void *
+thread_idle_table_cycle_workload(void *arg)
+{
+ WorkloadRunnerConnection *runnerConnection = (WorkloadRunnerConnection *)arg;
WT_CONNECTION *connection = runnerConnection->connection;
WorkloadRunner *runner = runnerConnection->runner;
@@ -151,7 +153,9 @@ static void *thread_idle_table_cycle_workload(void *arg) {
return (NULL);
}
-int WorkloadRunner::check_timing(const char *name, uint64_t last_interval) {
+int
+WorkloadRunner::check_timing(const char *name, uint64_t last_interval)
+{
WorkloadOptions *options = &_workload->options;
int msg_err;
const char *str;
@@ -169,11 +173,13 @@ int WorkloadRunner::check_timing(const char *name, uint64_t last_interval) {
<< last_interval << " s which is longer than configured acceptable maximum of "
<< options->max_idle_table_cycle << " s. Diff is "
<< (last_interval - options->max_idle_table_cycle) << " s." << std::endl;
- }
- return (msg_err);
+ }
+ return (msg_err);
}
-int WorkloadRunner::start_table_idle_cycle(WT_CONNECTION *conn) {
+int
+WorkloadRunner::start_table_idle_cycle(WT_CONNECTION *conn)
+{
WT_SESSION *session;
WT_CURSOR *cursor;
uint64_t start, stop, last_interval;
@@ -185,7 +191,7 @@ int WorkloadRunner::start_table_idle_cycle(WT_CONNECTION *conn) {
THROW("Error Opening a Session.");
}
- for (cycle_count = 0 ; !stopping ; ++cycle_count) {
+ for (cycle_count = 0; !stopping; ++cycle_count) {
sprintf(uri, "table:test_cycle%04d", cycle_count);
workgen_clock(&start);
@@ -235,12 +241,13 @@ int WorkloadRunner::start_table_idle_cycle(WT_CONNECTION *conn) {
* This function will sleep for "timestamp_advance" seconds, increment and set oldest_timestamp,
* stable_timestamp with the specified lag until stopping is set to true
*/
-int WorkloadRunner::increment_timestamp(WT_CONNECTION *conn) {
+int
+WorkloadRunner::increment_timestamp(WT_CONNECTION *conn)
+{
uint64_t time_us;
char buf[BUF_SIZE];
- while (!stopping)
- {
+ while (!stopping) {
if (_workload->options.oldest_timestamp_lag > 0) {
time_us = WorkgenTimeStamp::get_timestamp_lag(_workload->options.oldest_timestamp_lag);
sprintf(buf, "oldest_timestamp=%" PRIu64, time_us);
@@ -258,7 +265,9 @@ int WorkloadRunner::increment_timestamp(WT_CONNECTION *conn) {
return 0;
}
-static void *monitor_main(void *arg) {
+static void *
+monitor_main(void *arg)
+{
Monitor *monitor = (Monitor *)arg;
try {
monitor->_errno = monitor->run();
@@ -271,7 +280,9 @@ static void *monitor_main(void *arg) {
// Exponentiate (like the pow function), except that it returns an exact
// integral 64 bit value, and if it overflows, returns the maximum possible
// value for the return type.
-static uint64_t power64(int base, int exp) {
+static uint64_t
+power64(int base, int exp)
+{
uint64_t last, result;
result = 1;
@@ -285,38 +296,42 @@ static uint64_t power64(int base, int exp) {
}
OptionsList::OptionsList() : _option_map() {}
-OptionsList::OptionsList(const OptionsList &other) :
- _option_map(other._option_map) {}
+OptionsList::OptionsList(const OptionsList &other) : _option_map(other._option_map) {}
-void OptionsList::add_option(const char *name, const std::string typestr,
- const char *desc) {
+void
+OptionsList::add_option(const char *name, const std::string typestr, const char *desc)
+{
TypeDescPair pair(typestr, desc);
_option_map[name] = pair;
}
-void OptionsList::add_int(const char *name, int default_value,
- const char *desc) {
+void
+OptionsList::add_int(const char *name, int default_value, const char *desc)
+{
std::stringstream sstm;
sstm << "int, default=" << default_value;
add_option(name, sstm.str(), desc);
}
-void OptionsList::add_bool(const char *name, bool default_value,
- const char *desc) {
+void
+OptionsList::add_bool(const char *name, bool default_value, const char *desc)
+{
std::stringstream sstm;
sstm << "boolean, default=" << (default_value ? "true" : "false");
add_option(name, sstm.str(), desc);
}
-void OptionsList::add_double(const char *name, double default_value,
- const char *desc) {
+void
+OptionsList::add_double(const char *name, double default_value, const char *desc)
+{
std::stringstream sstm;
sstm << "double, default=" << default_value;
add_option(name, sstm.str(), desc);
}
-void OptionsList::add_string(const char *name,
- const std::string &default_value, const char *desc) {
+void
+OptionsList::add_string(const char *name, const std::string &default_value, const char *desc)
+{
std::stringstream sstm;
sstm << "string, default=\"" << default_value << "\"";
add_option(name, sstm.str(), desc);
@@ -332,7 +347,7 @@ pretty_print(const char *p, const char *indent, std::stringstream &sstm)
break;
for (t = p + 70; t > p && *t != ' '; --t)
;
- if (t == p) /* No spaces? */
+ if (t == p) /* No spaces? */
break;
if (indent != NULL)
sstm << indent;
@@ -346,17 +361,21 @@ pretty_print(const char *p, const char *indent, std::stringstream &sstm)
}
}
-std::string OptionsList::help() const {
+std::string
+OptionsList::help() const
+{
std::stringstream sstm;
- for (std::map<std::string, TypeDescPair>::const_iterator i =
- _option_map.begin(); i != _option_map.end(); i++) {
+ for (std::map<std::string, TypeDescPair>::const_iterator i = _option_map.begin();
+ i != _option_map.end(); i++) {
sstm << i->first << " (" << i->second.first << ")" << std::endl;
pretty_print(i->second.second.c_str(), "\t", sstm);
}
return sstm.str();
}
-std::string OptionsList::help_description(const char *option_name) const {
+std::string
+OptionsList::help_description(const char *option_name) const
+{
const std::string key(option_name);
if (_option_map.count(key) == 0)
return (std::string(""));
@@ -364,7 +383,9 @@ std::string OptionsList::help_description(const char *option_name) const {
return (_option_map.find(key)->second.second);
}
-std::string OptionsList::help_type(const char *option_name) const {
+std::string
+OptionsList::help_type(const char *option_name) const
+{
const std::string key(option_name);
if (_option_map.count(key) == 0)
return std::string("");
@@ -373,28 +394,37 @@ std::string OptionsList::help_type(const char *option_name) const {
}
Context::Context() : _verbose(false), _internal(new ContextInternal()) {}
-Context::~Context() { delete _internal; }
-Context& Context::operator=(const Context &other) {
+Context::~Context()
+{
+ delete _internal;
+}
+Context &
+Context::operator=(const Context &other)
+{
_verbose = other._verbose;
*_internal = *other._internal;
return (*this);
}
-ContextInternal::ContextInternal() : _tint(), _table_names(),
- _table_runtime(NULL), _runtime_alloced(0), _tint_last(0),
- _context_count(0) {
+ContextInternal::ContextInternal()
+ : _tint(), _table_names(), _table_runtime(NULL), _runtime_alloced(0), _tint_last(0),
+ _context_count(0)
+{
uint32_t count;
if ((count = workgen_atomic_add32(&context_count, 1)) != 1)
THROW("multiple Contexts not supported");
_context_count = count;
}
-ContextInternal::~ContextInternal() {
+ContextInternal::~ContextInternal()
+{
if (_table_runtime != NULL)
delete _table_runtime;
}
-int ContextInternal::create_all() {
+int
+ContextInternal::create_all()
+{
if (_runtime_alloced < _tint_last) {
// The array references are 1-based, we'll waste one entry.
TableRuntime *new_table_runtime = new TableRuntime[_tint_last + 1];
@@ -407,12 +437,15 @@ int ContextInternal::create_all() {
return (0);
}
-Monitor::Monitor(WorkloadRunner &wrunner) :
- _errno(0), _exception(), _wrunner(wrunner), _stop(false), _handle(),
- _out(NULL), _json(NULL) {}
+Monitor::Monitor(WorkloadRunner &wrunner)
+ : _errno(0), _exception(), _wrunner(wrunner), _stop(false), _handle(), _out(NULL), _json(NULL)
+{
+}
Monitor::~Monitor() {}
-int Monitor::run() {
+int
+Monitor::run()
+{
struct timespec t;
struct tm *tm, _tm;
char version[100];
@@ -432,8 +465,7 @@ int Monitor::run() {
// The whole and fractional part of sample_interval are separated,
// we don't want to sleep longer than a second.
int sample_secs = ms_to_sec(options->sample_interval_ms);
- useconds_t sample_usecs =
- ms_to_us(options->sample_interval_ms) - sec_to_us(sample_secs);
+ useconds_t sample_usecs = ms_to_us(options->sample_interval_ms) - sec_to_us(sample_secs);
// Format JSON prefix.
if (_json != NULL)
@@ -463,16 +495,16 @@ int Monitor::run() {
tm = localtime_r(&t.tv_sec, &_tm);
Stats new_totals(true);
- for (std::vector<ThreadRunner>::iterator tr =
- _wrunner._trunners.begin(); tr != _wrunner._trunners.end(); tr++)
+ for (std::vector<ThreadRunner>::iterator tr = _wrunner._trunners.begin();
+ tr != _wrunner._trunners.end(); tr++)
new_totals.add(tr->_stats, true);
Stats interval(new_totals);
interval.subtract(prev_totals);
- bool checkpointing = new_totals.checkpoint.ops_in_progress > 0 ||
- interval.checkpoint.ops > 0;
+ bool checkpointing =
+ new_totals.checkpoint.ops_in_progress > 0 || interval.checkpoint.ops > 0;
double interval_secs = options->sample_interval_ms / 1000.0;
-
+
// Format entry into _out stream.
if (_out != NULL)
_format_out_entry(interval, interval_secs, t, checkpointing, *tm);
@@ -498,7 +530,9 @@ int Monitor::run() {
return (0);
}
-void Monitor::_format_out_header() {
+void
+Monitor::_format_out_header()
+{
(*_out) << "#time,"
<< "totalsec,"
<< "read ops per second,"
@@ -513,12 +547,13 @@ void Monitor::_format_out_header() {
<< "insert maximum latency(uS),"
<< "update average latency(uS),"
<< "update min latency(uS),"
- << "update maximum latency(uS)"
- << std::endl;
+ << "update maximum latency(uS)" << std::endl;
}
-void Monitor::_format_out_entry(const Stats &interval, double interval_secs,
- const timespec &timespec, bool checkpointing, const tm &tm) {
+void
+Monitor::_format_out_entry(const Stats &interval, double interval_secs, const timespec &timespec,
+ bool checkpointing, const tm &tm)
+{
char time_buf[64];
uint64_t cur_reads = (uint64_t)(interval.read.ops / interval_secs);
uint64_t cur_inserts = (uint64_t)(interval.insert.ops / interval_secs);
@@ -526,48 +561,41 @@ void Monitor::_format_out_entry(const Stats &interval, double interval_secs,
uint64_t totalsec = ts_sec(timespec - _wrunner._start);
(void)strftime(time_buf, sizeof(time_buf), "%b %d %H:%M:%S", &tm);
- (*_out) << time_buf
- << "," << totalsec
- << "," << cur_reads
- << "," << cur_inserts
- << "," << cur_updates
- << "," << (checkpointing ? 'Y' : 'N')
- << "," << interval.read.average_latency()
- << "," << interval.read.min_latency
- << "," << interval.read.max_latency
- << "," << interval.insert.average_latency()
- << "," << interval.insert.min_latency
- << "," << interval.insert.max_latency
- << "," << interval.update.average_latency()
- << "," << interval.update.min_latency
- << "," << interval.update.max_latency
- << std::endl;
-}
-
-void Monitor::_format_json_prefix(const std::string &version) {
+ (*_out) << time_buf << "," << totalsec << "," << cur_reads << "," << cur_inserts << ","
+ << cur_updates << "," << (checkpointing ? 'Y' : 'N') << ","
+ << interval.read.average_latency() << "," << interval.read.min_latency << ","
+ << interval.read.max_latency << "," << interval.insert.average_latency() << ","
+ << interval.insert.min_latency << "," << interval.insert.max_latency << ","
+ << interval.update.average_latency() << "," << interval.update.min_latency << ","
+ << interval.update.max_latency << std::endl;
+}
+
+void
+Monitor::_format_json_prefix(const std::string &version)
+{
(*_json) << "{";
(*_json) << "\"version\":\"" << version << "\",";
(*_json) << "\"workgen\":[";
}
-void Monitor::_format_json_entry(const tm &tm, const timespec &timespec, bool first_iteration,
- const Stats &interval, bool checkpointing, double interval_secs) {
+void
+Monitor::_format_json_entry(const tm &tm, const timespec &timespec, bool first_iteration,
+ const Stats &interval, bool checkpointing, double interval_secs)
+{
#define WORKGEN_TIMESTAMP_JSON "%Y-%m-%dT%H:%M:%S"
-#define TRACK_JSON(f, name, t, percentiles, extra) \
- do { \
- int _i; \
- (f) << "\"" << (name) << "\":{" << extra \
- << "\"ops per sec\":" \
- << (uint64_t)((t).ops / interval_secs) \
- << ",\"rollbacks\":" << ((t).rollbacks) \
- << ",\"average latency\":" << (t).average_latency() \
- << ",\"min latency\":" << (t).min_latency \
- << ",\"max latency\":" << (t).max_latency; \
- for (_i = 0; (percentiles)[_i] != 0; _i++) \
- (f) << ",\"" << (percentiles)[_i] << "% latency\":" \
- << (t).percentile_latency(percentiles[_i]); \
- (f) << "}"; \
- } while(0)
+#define TRACK_JSON(f, name, t, percentiles, extra) \
+ do { \
+ int _i; \
+ (f) << "\"" << (name) << "\":{" << extra \
+ << "\"ops per sec\":" << (uint64_t)((t).ops / interval_secs) \
+ << ",\"rollbacks\":" << ((t).rollbacks) \
+ << ",\"average latency\":" << (t).average_latency() \
+ << ",\"min latency\":" << (t).min_latency << ",\"max latency\":" << (t).max_latency; \
+ for (_i = 0; (percentiles)[_i] != 0; _i++) \
+ (f) << ",\"" << (percentiles)[_i] \
+ << "% latency\":" << (t).percentile_latency(percentiles[_i]); \
+ (f) << "}"; \
+ } while (0)
// Note: we could allow this to be configurable.
int percentiles[4] = {50, 95, 99, 0};
@@ -576,8 +604,8 @@ void Monitor::_format_json_entry(const tm &tm, const timespec &timespec, bool fi
buf_size = strftime(time_buf, sizeof(time_buf), WORKGEN_TIMESTAMP_JSON, &tm);
ASSERT(buf_size <= sizeof(time_buf));
- snprintf(&time_buf[buf_size], sizeof(time_buf) - buf_size,
- ".%3.3" PRIu64 "Z", (uint64_t)ns_to_ms(timespec.tv_nsec));
+ snprintf(&time_buf[buf_size], sizeof(time_buf) - buf_size, ".%3.3" PRIu64 "Z",
+ (uint64_t)ns_to_ms(timespec.tv_nsec));
if (!first_iteration)
(*_json) << ",";
@@ -591,64 +619,73 @@ void Monitor::_format_json_entry(const tm &tm, const timespec &timespec, bool fi
TRACK_JSON(*_json, "update", interval.update, percentiles, "");
(*_json) << ",";
TRACK_JSON(*_json, "checkpoint", interval.checkpoint, percentiles,
- "\"active\":" << (checkpointing ? "1," : "0,"));
+ "\"active\":" << (checkpointing ? "1," : "0,"));
(*_json) << "}" << std::endl;
}
-void Monitor::_format_json_suffix() {
+void
+Monitor::_format_json_suffix()
+{
(*_json) << "]}" << std::endl;
}
-void Monitor::_check_latency_threshold(const Stats &interval, uint64_t latency_max) {
+void
+Monitor::_check_latency_threshold(const Stats &interval, uint64_t latency_max)
+{
uint64_t read_max = interval.read.max_latency;
uint64_t insert_max = interval.insert.max_latency;
uint64_t update_max = interval.update.max_latency;
if (read_max > latency_max)
- std::cerr << "WARNING: max latency exceeded for read operation. Threshold "
- << latency_max << " us, recorded " << read_max << " us, diff "
- << (read_max - latency_max) << " us." << std::endl;
+ std::cerr << "WARNING: max latency exceeded for read operation. Threshold " << latency_max
+ << " us, recorded " << read_max << " us, diff " << (read_max - latency_max)
+ << " us." << std::endl;
if (insert_max > latency_max)
- std::cerr << "WARNING: max latency exceeded for insert operation. Threshold "
- << latency_max << " us, recorded " << insert_max << " us, diff "
- << (insert_max - latency_max) << " us." << std::endl;
+ std::cerr << "WARNING: max latency exceeded for insert operation. Threshold " << latency_max
+ << " us, recorded " << insert_max << " us, diff " << (insert_max - latency_max)
+ << " us." << std::endl;
if (update_max > latency_max)
- std::cerr << "WARNING: max latency exceeded for update operation. Threshold "
- << latency_max << " us, recorded " << insert_max << " us, diff "
- << (update_max - latency_max) << " us." << std::endl;
+ std::cerr << "WARNING: max latency exceeded for update operation. Threshold " << latency_max
+ << " us, recorded " << insert_max << " us, diff " << (update_max - latency_max)
+ << " us." << std::endl;
}
ParetoOptions ParetoOptions::DEFAULT;
-ParetoOptions::ParetoOptions(int param_arg) : param(param_arg), range_low(0.0),
- range_high(1.0), _options() {
+ParetoOptions::ParetoOptions(int param_arg)
+ : param(param_arg), range_low(0.0), range_high(1.0), _options()
+{
_options.add_int("param", param,
"0 is disabled, otherwise a range from 1 (most aggressive) to "
"100 (least aggressive)");
- _options.add_double("range_low", range_low,
- "between 0.0 and 1.0, starting range of the pareto distribution");
- _options.add_double("range_high", range_high,
- "between 0.0 and 1.0, ending range of the pareto distribution");
-}
-ParetoOptions::ParetoOptions(const ParetoOptions &other) :
- param(other.param), range_low(other.range_low),
- range_high(other.range_high), _options(other._options) {}
+ _options.add_double(
+ "range_low", range_low, "between 0.0 and 1.0, starting range of the pareto distribution");
+ _options.add_double(
+ "range_high", range_high, "between 0.0 and 1.0, ending range of the pareto distribution");
+}
+ParetoOptions::ParetoOptions(const ParetoOptions &other)
+ : param(other.param), range_low(other.range_low), range_high(other.range_high),
+ _options(other._options)
+{
+}
ParetoOptions::~ParetoOptions() {}
-ThreadRunner::ThreadRunner() :
- _errno(0), _exception(), _thread(NULL), _context(NULL), _icontext(NULL),
- _workload(NULL), _wrunner(NULL), _rand_state(NULL),
- _throttle(NULL), _throttle_ops(0), _throttle_limit(0),
- _in_transaction(false), _start_time_us(0), _op_time_us(0),
- _number(0), _stats(false), _table_usage(),
- _cursors(NULL), _stop(false), _session(NULL), _keybuf(NULL),
- _valuebuf(NULL), _repeat(false) {
+ThreadRunner::ThreadRunner()
+ : _errno(0), _exception(), _thread(NULL), _context(NULL), _icontext(NULL), _workload(NULL),
+ _wrunner(NULL), _rand_state(NULL), _throttle(NULL), _throttle_ops(0), _throttle_limit(0),
+ _in_transaction(false), _start_time_us(0), _op_time_us(0), _number(0), _stats(false),
+ _table_usage(), _cursors(NULL), _stop(false), _session(NULL), _keybuf(NULL), _valuebuf(NULL),
+ _repeat(false)
+{
}
-ThreadRunner::~ThreadRunner() {
+ThreadRunner::~ThreadRunner()
+{
free_all();
}
-int ThreadRunner::create_all(WT_CONNECTION *conn) {
+int
+ThreadRunner::create_all(WT_CONNECTION *conn)
+{
size_t keysize, valuesize;
WT_RET(close_all());
@@ -672,23 +709,26 @@ int ThreadRunner::create_all(WT_CONNECTION *conn) {
return (0);
}
-int ThreadRunner::open_all() {
+int
+ThreadRunner::open_all()
+{
typedef WT_CURSOR *WT_CURSOR_PTR;
if (_cursors != NULL)
delete _cursors;
_cursors = new WT_CURSOR_PTR[_icontext->_tint_last + 1];
- memset(_cursors, 0, sizeof (WT_CURSOR *) * (_icontext->_tint_last + 1));
- for (std::map<uint32_t, uint32_t>::iterator i = _table_usage.begin();
- i != _table_usage.end(); i++) {
+ memset(_cursors, 0, sizeof(WT_CURSOR *) * (_icontext->_tint_last + 1));
+ for (std::map<uint32_t, uint32_t>::iterator i = _table_usage.begin(); i != _table_usage.end();
+ i++) {
uint32_t tindex = i->first;
const char *uri = _icontext->_table_names[tindex].c_str();
- WT_RET(_session->open_cursor(_session, uri, NULL, NULL,
- &_cursors[tindex]));
+ WT_RET(_session->open_cursor(_session, uri, NULL, NULL, &_cursors[tindex]));
}
return (0);
}
-int ThreadRunner::close_all() {
+int
+ThreadRunner::close_all()
+{
if (_throttle != NULL) {
delete _throttle;
_throttle = NULL;
@@ -701,7 +741,9 @@ int ThreadRunner::close_all() {
return (0);
}
-void ThreadRunner::free_all() {
+void
+ThreadRunner::free_all()
+{
if (_rand_state != NULL) {
workgen_random_free(_rand_state);
_rand_state = NULL;
@@ -720,14 +762,15 @@ void ThreadRunner::free_all() {
}
}
-int ThreadRunner::cross_check(std::vector<ThreadRunner> &runners) {
+int
+ThreadRunner::cross_check(std::vector<ThreadRunner> &runners)
+{
std::map<uint32_t, uint32_t> usage;
// Determine which tables have cross usage
- for (std::vector<ThreadRunner>::iterator r = runners.begin();
- r != runners.end(); r++) {
+ for (std::vector<ThreadRunner>::iterator r = runners.begin(); r != runners.end(); r++) {
for (std::map<uint32_t, uint32_t>::iterator i = r->_table_usage.begin();
- i != r->_table_usage.end(); i++) {
+ i != r->_table_usage.end(); i++) {
uint32_t tindex = i->first;
uint32_t thisusage = i->second;
uint32_t curusage = CONTAINER_VALUE(usage, tindex, 0);
@@ -736,11 +779,9 @@ int ThreadRunner::cross_check(std::vector<ThreadRunner> &runners) {
usage[tindex] = curusage;
}
}
- for (std::map<uint32_t, uint32_t>::iterator i = usage.begin();
- i != usage.end(); i++) {
+ for (std::map<uint32_t, uint32_t>::iterator i = usage.begin(); i != usage.end(); i++) {
if ((i->second & USAGE_MIXED) != 0) {
- for (std::vector<ThreadRunner>::iterator r = runners.begin();
- r != runners.end(); r++) {
+ for (std::vector<ThreadRunner>::iterator r = runners.begin(); r != runners.end(); r++) {
r->_table_usage[i->first] |= USAGE_MIXED;
}
}
@@ -748,7 +789,9 @@ int ThreadRunner::cross_check(std::vector<ThreadRunner> &runners) {
return (0);
}
-int ThreadRunner::run() {
+int
+ThreadRunner::run()
+{
WT_DECL_RET;
ThreadOptions *options = &_thread->options;
std::string name = options->name;
@@ -760,20 +803,18 @@ int ThreadRunner::run() {
VERBOSE(*this, "thread " << name << " running");
if (options->throttle != 0) {
- _throttle = new Throttle(*this, options->throttle,
- options->throttle_burst);
+ _throttle = new Throttle(*this, options->throttle, options->throttle_burst);
}
for (int cnt = 0; !_stop && (_repeat || cnt < 1) && ret == 0; cnt++)
WT_ERR(op_run(&_thread->_op));
-err:
+err :
#ifdef _DEBUG
- {
- std::string messages = this->get_debug();
- if (!messages.empty())
- std::cerr << "DEBUG (thread " << name << "): "
- << messages << std::endl;
- }
+{
+ std::string messages = this->get_debug();
+ if (!messages.empty())
+ std::cerr << "DEBUG (thread " << name << "): " << messages << std::endl;
+}
#endif
if (ret != 0)
std::cerr << "thread " << name << " failed err=" << ret << std::endl;
@@ -781,12 +822,15 @@ err:
return (ret);
}
-void ThreadRunner::get_static_counts(Stats &stats) {
+void
+ThreadRunner::get_static_counts(Stats &stats)
+{
_thread->_op.get_static_counts(stats, 1);
}
-void ThreadRunner::op_create_all(Operation *op, size_t &keysize,
- size_t &valuesize) {
+void
+ThreadRunner::op_create_all(Operation *op, size_t &keysize, size_t &valuesize)
+{
tint_t tint;
op->create_all();
@@ -794,8 +838,7 @@ void ThreadRunner::op_create_all(Operation *op, size_t &keysize,
op->kv_compute_max(true, false);
if (OP_HAS_VALUE(op))
op->kv_compute_max(false, op->_table.options.random_value);
- if (op->_key._keytype == Key::KEYGEN_PARETO &&
- op->_key._pareto.param == 0)
+ if (op->_key._keytype == Key::KEYGEN_PARETO && op->_key._pareto.param == 0)
THROW("Key._pareto value must be set if KEYGEN_PARETO specified");
op->kv_size_buffer(true, keysize);
op->kv_size_buffer(false, valuesize);
@@ -819,8 +862,7 @@ void ThreadRunner::op_create_all(Operation *op, size_t &keysize,
tint = _icontext->_tint[uri];
op->_table._internal->_tint = tint;
}
- uint32_t usage_flags = CONTAINER_VALUE(_table_usage,
- op->_table._internal->_tint, 0);
+ uint32_t usage_flags = CONTAINER_VALUE(_table_usage, op->_table._internal->_tint, 0);
if (op->_optype == Operation::OP_SEARCH)
usage_flags |= ThreadRunner::USAGE_READ;
else
@@ -828,36 +870,33 @@ void ThreadRunner::op_create_all(Operation *op, size_t &keysize,
_table_usage[op->_table._internal->_tint] = usage_flags;
}
if (op->_group != NULL)
- for (std::vector<Operation>::iterator i = op->_group->begin();
- i != op->_group->end(); i++)
+ for (std::vector<Operation>::iterator i = op->_group->begin(); i != op->_group->end(); i++)
op_create_all(&*i, keysize, valuesize);
}
-
-#define PARETO_SHAPE 1.5
+#define PARETO_SHAPE 1.5
// Return a value within the interval [ 0, recno_max )
// that is weighted toward lower numbers with pareto_param at 0 (the minimum),
// and more evenly distributed with pareto_param at 100 (the maximum).
//
static uint64_t
-pareto_calculation(uint32_t randint, uint64_t recno_max,
- ParetoOptions &pareto) {
+pareto_calculation(uint32_t randint, uint64_t recno_max, ParetoOptions &pareto)
+{
double S1, S2, U;
uint32_t result;
double r;
r = (double)randint;
if (pareto.range_high != 1.0 || pareto.range_low != 0.0) {
- if (pareto.range_high <= pareto.range_low ||
- pareto.range_high > 1.0 || pareto.range_low < 0.0)
+ if (pareto.range_high <= pareto.range_low || pareto.range_high > 1.0 ||
+ pareto.range_low < 0.0)
THROW("Pareto illegal range");
- r = (pareto.range_low * (double)UINT32_MAX) +
- r * (pareto.range_high - pareto.range_low);
+ r = (pareto.range_low * (double)UINT32_MAX) + r * (pareto.range_high - pareto.range_low);
}
S1 = (-1 / PARETO_SHAPE);
S2 = recno_max * (pareto.param / 100.0) * (PARETO_SHAPE - 1);
- U = 1 - r / (double)UINT32_MAX; // interval [0, 1)
+ U = 1 - r / (double)UINT32_MAX; // interval [0, 1)
result = (uint64_t)((pow(U, S1) - 1) * S2);
// This Pareto calculation chooses out of range values less than 20%
@@ -876,8 +915,9 @@ pareto_calculation(uint32_t randint, uint64_t recno_max,
return (result);
}
-uint64_t ThreadRunner::op_get_key_recno(Operation *op, uint64_t range,
- tint_t tint) {
+uint64_t
+ThreadRunner::op_get_key_recno(Operation *op, uint64_t range, tint_t tint)
+{
uint64_t recno_count;
uint32_t rval;
@@ -892,10 +932,12 @@ uint64_t ThreadRunner::op_get_key_recno(Operation *op, uint64_t range,
rval = random_value();
if (op->_key._keytype == Key::KEYGEN_PARETO)
rval = pareto_calculation(rval, recno_count, op->_key._pareto);
- return (rval % recno_count + 1); // recnos are one-based.
+ return (rval % recno_count + 1); // recnos are one-based.
}
-int ThreadRunner::op_run(Operation *op) {
+int
+ThreadRunner::op_run(Operation *op)
+{
Track *track;
tint_t tint = op->_table._internal->_tint;
WT_CURSOR *cursor;
@@ -943,10 +985,8 @@ int ThreadRunner::op_run(Operation *op) {
break;
case Operation::OP_INSERT:
track = &_stats.insert;
- if (op->_key._keytype == Key::KEYGEN_APPEND ||
- op->_key._keytype == Key::KEYGEN_AUTO)
- recno = workgen_atomic_add64(
- &_icontext->_table_runtime[tint]._max_recno, 1);
+ if (op->_key._keytype == Key::KEYGEN_APPEND || op->_key._keytype == Key::KEYGEN_AUTO)
+ recno = workgen_atomic_add64(&_icontext->_table_runtime[tint]._max_recno, 1);
else
recno = op_get_key_recno(op, range, tint);
break;
@@ -972,14 +1012,12 @@ int ThreadRunner::op_run(Operation *op) {
break;
}
if ((op->_internal->_flags & WORKGEN_OP_REOPEN) != 0) {
- WT_ERR(_session->open_cursor(_session, op->_table._uri.c_str(), NULL,
- NULL, &cursor));
+ WT_ERR(_session->open_cursor(_session, op->_table._uri.c_str(), NULL, NULL, &cursor));
own_cursor = true;
} else
cursor = _cursors[tint];
- measure_latency = track != NULL && track->ops != 0 &&
- track->track_latency() &&
+ measure_latency = track != NULL && track->ops != 0 && track->track_latency() &&
(track->ops % _workload->options.sample_rate == 0);
VERBOSE(*this, "OP " << op->_optype << " " << op->_table._uri.c_str() << ", recno=" << recno);
@@ -998,8 +1036,8 @@ int ThreadRunner::op_run(Operation *op) {
op->kv_gen(this, true, 100, recno, _keybuf);
cursor->set_key(cursor, _keybuf);
if (OP_HAS_VALUE(op)) {
- uint64_t compressibility = op->_table.options.random_value ?
- 0 : op->_table.options.value_compressibility;
+ uint64_t compressibility =
+ op->_table.options.random_value ? 0 : op->_table.options.value_compressibility;
op->kv_gen(this, false, compressibility, recno, _valuebuf);
cursor->set_value(cursor, _valuebuf);
}
@@ -1009,15 +1047,14 @@ int ThreadRunner::op_run(Operation *op) {
if (op->transaction != NULL) {
if (_in_transaction)
THROW("nested transactions not supported");
- if (op->transaction->use_commit_timestamp && op->transaction->use_prepare_timestamp)
- {
+ if (op->transaction->use_commit_timestamp && op->transaction->use_prepare_timestamp) {
THROW("Either use_prepare_timestamp or use_commit_timestamp must be set.");
}
if (op->transaction->read_timestamp_lag > 0) {
- uint64_t read = WorkgenTimeStamp::get_timestamp_lag(op->transaction->read_timestamp_lag);
+ uint64_t read =
+ WorkgenTimeStamp::get_timestamp_lag(op->transaction->read_timestamp_lag);
sprintf(buf, "%s=%" PRIu64, op->transaction->_begin_config.c_str(), read);
- }
- else {
+ } else {
sprintf(buf, "%s", op->transaction->_begin_config.c_str());
}
WT_ERR(_session->begin_transaction(_session, buf));
@@ -1084,8 +1121,8 @@ int ThreadRunner::op_run(Operation *op) {
if (op->_timed != 0.0)
endtime = _op_time_us + secs_us(op->_timed);
- VERBOSE(*this, "GROUP operation " << op->_timed << " secs, "
- << op->_repeatgroup << "times");
+ VERBOSE(
+ *this, "GROUP operation " << op->_timed << " secs, " << op->_repeatgroup << "times");
do {
for (int count = 0; !_stop && count < op->_repeatgroup; count++) {
@@ -1111,17 +1148,16 @@ err:
time_us = WorkgenTimeStamp::get_timestamp();
sprintf(buf, "prepare_timestamp=%" PRIu64, time_us);
ret = _session->prepare_transaction(_session, buf);
- sprintf(buf, "commit_timestamp=%" PRIu64 ",durable_timestamp=%" PRIu64, time_us, time_us);
+ sprintf(
+ buf, "commit_timestamp=%" PRIu64 ",durable_timestamp=%" PRIu64, time_us, time_us);
ret = _session->commit_transaction(_session, buf);
- }
- else if (op->transaction->use_commit_timestamp) {
+ } else if (op->transaction->use_commit_timestamp) {
uint64_t commit_time_us = WorkgenTimeStamp::get_timestamp();
sprintf(buf, "commit_timestamp=%" PRIu64, commit_time_us);
- ret = _session->commit_transaction(_session, buf);
- }
- else {
- ret = _session->commit_transaction(_session,
- op->transaction->_commit_config.c_str());
+ ret = _session->commit_transaction(_session, buf);
+ } else {
+ ret =
+ _session->commit_transaction(_session, op->transaction->_commit_config.c_str());
}
}
_in_transaction = false;
@@ -1130,28 +1166,34 @@ err:
}
#ifdef _DEBUG
-std::string ThreadRunner::get_debug() {
+std::string
+ThreadRunner::get_debug()
+{
return (_debug_messages.str());
}
#endif
-uint32_t ThreadRunner::random_value() {
+uint32_t
+ThreadRunner::random_value()
+{
return (workgen_random(_rand_state));
}
// Generate a random 32-bit value then return a float value equally distributed
// between -1.0 and 1.0.
-float ThreadRunner::random_signed() {
+float
+ThreadRunner::random_signed()
+{
uint32_t r = random_value();
int sign = ((r & 0x1) == 0 ? 1 : -1);
return (((float)r * sign) / UINT32_MAX);
}
-Throttle::Throttle(ThreadRunner &runner, double throttle,
- double throttle_burst) : _runner(runner), _throttle(throttle),
- _burst(throttle_burst), _next_div(), _ops_delta(0), _ops_prev(0),
- _ops_per_div(0), _ms_per_div(0), _ops_left_this_second(throttle),
- _div_pos(0), _started(false) {
+Throttle::Throttle(ThreadRunner &runner, double throttle, double throttle_burst)
+ : _runner(runner), _throttle(throttle), _burst(throttle_burst), _next_div(), _ops_delta(0),
+ _ops_prev(0), _ops_per_div(0), _ms_per_div(0), _ops_left_this_second(throttle), _div_pos(0),
+ _started(false)
+{
// Our throttling is done by dividing each second into THROTTLE_PER_SEC
// parts (we call the parts divisions). In each division, we perform
@@ -1160,7 +1202,7 @@ Throttle::Throttle(ThreadRunner &runner, double throttle,
// a multiple of THROTTLE_PER_SEC, nor is it even necessarily an integer.
// (That way we can have 1000 threads each inserting 0.5 a second).
ts_clear(_next_div);
- ASSERT(1000 % THROTTLE_PER_SEC == 0); // must evenly divide
+ ASSERT(1000 % THROTTLE_PER_SEC == 0); // must evenly divide
_ms_per_div = 1000 / THROTTLE_PER_SEC;
_ops_per_div = (uint64_t)ceill(_throttle / THROTTLE_PER_SEC);
}
@@ -1181,7 +1223,9 @@ Throttle::~Throttle() {}
// greater). This has the effect of randomizing how much clumping happens, and
// ensures that multiple threads aren't executing in lock step.
//
-int Throttle::throttle(uint64_t op_count, uint64_t *op_limit) {
+int
+Throttle::throttle(uint64_t op_count, uint64_t *op_limit)
+{
uint64_t ops;
int64_t sleep_ms;
timespec now;
@@ -1238,36 +1282,42 @@ int Throttle::throttle(uint64_t op_count, uint64_t *op_limit) {
return (0);
}
-ThreadOptions::ThreadOptions() : name(), session_config(), throttle(0.0), throttle_burst(1.0),
- synchronized(false), _options() {
+ThreadOptions::ThreadOptions()
+ : name(), session_config(), throttle(0.0), throttle_burst(1.0), synchronized(false), _options()
+{
_options.add_string("name", name, "name of the thread");
- _options.add_string("session_config", session_config, "session config which is passed to open_session");
- _options.add_double("throttle", throttle,
- "Limit to this number of operations per second");
+ _options.add_string(
+ "session_config", session_config, "session config which is passed to open_session");
+ _options.add_double("throttle", throttle, "Limit to this number of operations per second");
_options.add_double("throttle_burst", throttle_burst,
"Changes characteristic of throttling from smooth (0.0) "
"to having large bursts with lulls (10.0 or larger)");
}
-ThreadOptions::ThreadOptions(const ThreadOptions &other) :
- name(other.name), session_config(other.session_config), throttle(other.throttle),
- throttle_burst(other.throttle_burst), synchronized(other.synchronized),
- _options(other._options) {}
+ThreadOptions::ThreadOptions(const ThreadOptions &other)
+ : name(other.name), session_config(other.session_config), throttle(other.throttle),
+ throttle_burst(other.throttle_burst), synchronized(other.synchronized),
+ _options(other._options)
+{
+}
ThreadOptions::~ThreadOptions() {}
void
-ThreadListWrapper::extend(const ThreadListWrapper &other) {
- for (std::vector<Thread>::const_iterator i = other._threads.begin();
- i != other._threads.end(); i++)
+ThreadListWrapper::extend(const ThreadListWrapper &other)
+{
+ for (std::vector<Thread>::const_iterator i = other._threads.begin(); i != other._threads.end();
+ i++)
_threads.push_back(*i);
}
void
-ThreadListWrapper::append(const Thread &t) {
+ThreadListWrapper::append(const Thread &t)
+{
_threads.push_back(t);
}
void
-ThreadListWrapper::multiply(const int n) {
+ThreadListWrapper::multiply(const int n)
+{
if (n == 0) {
_threads.clear();
} else {
@@ -1277,74 +1327,80 @@ ThreadListWrapper::multiply(const int n) {
}
}
-Thread::Thread() : options(), _op() {
-}
+Thread::Thread() : options(), _op() {}
-Thread::Thread(const Operation &op) : options(), _op(op) {
-}
+Thread::Thread(const Operation &op) : options(), _op(op) {}
-Thread::Thread(const Thread &other) : options(other.options), _op(other._op) {
-}
+Thread::Thread(const Thread &other) : options(other.options), _op(other._op) {}
-Thread::~Thread() {
-}
+Thread::~Thread() {}
-void Thread::describe(std::ostream &os) const {
+void
+Thread::describe(std::ostream &os) const
+{
os << "Thread: [" << std::endl;
- _op.describe(os); os << std::endl;
+ _op.describe(os);
+ os << std::endl;
os << "]";
}
-Operation::Operation() :
- _optype(OP_NONE), _internal(NULL), _table(), _key(), _value(), _config(),
- transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
+Operation::Operation()
+ : _optype(OP_NONE), _internal(NULL), _table(), _key(), _value(), _config(), transaction(NULL),
+ _group(NULL), _repeatgroup(0), _timed(0.0)
+{
init_internal(NULL);
}
-Operation::Operation(OpType optype, Table table, Key key, Value value) :
- _optype(optype), _internal(NULL), _table(table), _key(key), _value(value),
- _config(), transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
+Operation::Operation(OpType optype, Table table, Key key, Value value)
+ : _optype(optype), _internal(NULL), _table(table), _key(key), _value(value), _config(),
+ transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0)
+{
init_internal(NULL);
size_check();
}
-Operation::Operation(OpType optype, Table table, Key key) :
- _optype(optype), _internal(NULL), _table(table), _key(key), _value(),
- _config(), transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
+Operation::Operation(OpType optype, Table table, Key key)
+ : _optype(optype), _internal(NULL), _table(table), _key(key), _value(), _config(),
+ transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0)
+{
init_internal(NULL);
size_check();
}
-Operation::Operation(OpType optype, Table table) :
- _optype(optype), _internal(NULL), _table(table), _key(), _value(),
- _config(), transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) {
+Operation::Operation(OpType optype, Table table)
+ : _optype(optype), _internal(NULL), _table(table), _key(), _value(), _config(),
+ transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0)
+{
init_internal(NULL);
size_check();
}
-Operation::Operation(const Operation &other) :
- _optype(other._optype), _internal(NULL), _table(other._table),
- _key(other._key), _value(other._value), _config(other._config),
- transaction(other.transaction), _group(other._group),
- _repeatgroup(other._repeatgroup), _timed(other._timed) {
+Operation::Operation(const Operation &other)
+ : _optype(other._optype), _internal(NULL), _table(other._table), _key(other._key),
+ _value(other._value), _config(other._config), transaction(other.transaction),
+ _group(other._group), _repeatgroup(other._repeatgroup), _timed(other._timed)
+{
// Creation and destruction of _group and transaction is managed
// by Python.
init_internal(other._internal);
}
-Operation::Operation(OpType optype, const char *config) :
- _optype(optype), _internal(NULL), _table(), _key(), _value(),
- _config(config), transaction(NULL), _group(NULL), _repeatgroup(0),
- _timed(0.0) {
+Operation::Operation(OpType optype, const char *config)
+ : _optype(optype), _internal(NULL), _table(), _key(), _value(), _config(config),
+ transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0)
+{
init_internal(NULL);
}
-Operation::~Operation() {
+Operation::~Operation()
+{
// Creation and destruction of _group, transaction is managed by Python.
delete _internal;
}
-Operation& Operation::operator=(const Operation &other) {
+Operation &
+Operation::operator=(const Operation &other)
+{
_optype = other._optype;
_table = other._table;
_key = other._key;
@@ -1359,7 +1415,9 @@ Operation& Operation::operator=(const Operation &other) {
return (*this);
}
-void Operation::init_internal(OperationInternal *other) {
+void
+Operation::init_internal(OperationInternal *other)
+{
ASSERT(_internal == NULL);
switch (_optype) {
@@ -1367,8 +1425,7 @@ void Operation::init_internal(OperationInternal *other) {
if (other == NULL)
_internal = new CheckpointOperationInternal();
else
- _internal = new CheckpointOperationInternal(
- *(CheckpointOperationInternal *)other);
+ _internal = new CheckpointOperationInternal(*(CheckpointOperationInternal *)other);
break;
case OP_INSERT:
case OP_REMOVE:
@@ -1377,8 +1434,7 @@ void Operation::init_internal(OperationInternal *other) {
if (other == NULL)
_internal = new TableOperationInternal();
else
- _internal = new TableOperationInternal(
- *(TableOperationInternal *)other);
+ _internal = new TableOperationInternal(*(TableOperationInternal *)other);
break;
case OP_LOG_FLUSH:
_internal = new LogFlushOperationInternal();
@@ -1394,32 +1450,40 @@ void Operation::init_internal(OperationInternal *other) {
if (other == NULL)
_internal = new SleepOperationInternal();
else
- _internal = new SleepOperationInternal(
- *(SleepOperationInternal *)other);
+ _internal = new SleepOperationInternal(*(SleepOperationInternal *)other);
break;
default:
ASSERT(false);
}
}
-bool Operation::combinable() const {
- return (_group != NULL && _repeatgroup == 1 && _timed == 0.0 &&
- transaction == NULL && _config == "");
+bool
+Operation::combinable() const
+{
+ return (
+ _group != NULL && _repeatgroup == 1 && _timed == 0.0 && transaction == NULL && _config == "");
}
-void Operation::create_all() {
+void
+Operation::create_all()
+{
size_check();
_internal->_flags = 0;
_internal->parse_config(_config);
}
-void Operation::describe(std::ostream &os) const {
+void
+Operation::describe(std::ostream &os) const
+{
os << "Operation: " << _optype;
if (is_table_op()) {
- os << ", "; _table.describe(os);
- os << ", "; _key.describe(os);
- os << ", "; _value.describe(os);
+ os << ", ";
+ _table.describe(os);
+ os << ", ";
+ _key.describe(os);
+ os << ", ";
+ _value.describe(os);
}
if (!_config.empty())
os << ", '" << _config << "'";
@@ -1436,8 +1500,7 @@ void Operation::describe(std::ostream &os) const {
os << "[repeat " << _repeatgroup << "]";
os << ": {";
bool first = true;
- for (std::vector<Operation>::const_iterator i = _group->begin();
- i != _group->end(); i++) {
+ for (std::vector<Operation>::const_iterator i = _group->begin(); i != _group->end(); i++) {
if (!first)
os << "}, {";
i->describe(os);
@@ -1447,7 +1510,9 @@ void Operation::describe(std::ostream &os) const {
}
}
-void Operation::get_static_counts(Stats &stats, int multiplier) {
+void
+Operation::get_static_counts(Stats &stats, int multiplier)
+{
if (is_table_op())
switch (_optype) {
case OP_INSERT:
@@ -1469,17 +1534,20 @@ void Operation::get_static_counts(Stats &stats, int multiplier) {
stats.checkpoint.ops += multiplier;
if (_group != NULL)
- for (std::vector<Operation>::iterator i = _group->begin();
- i != _group->end(); i++)
+ for (std::vector<Operation>::iterator i = _group->begin(); i != _group->end(); i++)
i->get_static_counts(stats, multiplier * _repeatgroup);
}
-bool Operation::is_table_op() const {
- return (_optype == OP_INSERT || _optype == OP_REMOVE ||
- _optype == OP_SEARCH || _optype == OP_UPDATE);
+bool
+Operation::is_table_op() const
+{
+ return (
+ _optype == OP_INSERT || _optype == OP_REMOVE || _optype == OP_SEARCH || _optype == OP_UPDATE);
}
-void Operation::kv_compute_max(bool iskey, bool has_random) {
+void
+Operation::kv_compute_max(bool iskey, bool has_random)
+{
uint64_t max;
int size;
TableOperationInternal *internal;
@@ -1514,7 +1582,9 @@ void Operation::kv_compute_max(bool iskey, bool has_random) {
}
}
-void Operation::kv_size_buffer(bool iskey, size_t &maxsize) const {
+void
+Operation::kv_size_buffer(bool iskey, size_t &maxsize) const
+{
TableOperationInternal *internal;
ASSERT(is_table_op());
@@ -1529,8 +1599,10 @@ void Operation::kv_size_buffer(bool iskey, size_t &maxsize) const {
}
}
-void Operation::kv_gen(ThreadRunner *runner, bool iskey,
- uint64_t compressibility, uint64_t n, char *result) const {
+void
+Operation::kv_gen(
+ ThreadRunner *runner, bool iskey, uint64_t compressibility, uint64_t n, char *result) const
+{
TableOperationInternal *internal;
uint_t max;
uint_t size;
@@ -1541,14 +1613,13 @@ void Operation::kv_gen(ThreadRunner *runner, bool iskey,
size = iskey ? internal->_keysize : internal->_valuesize;
max = iskey ? internal->_keymax : internal->_valuemax;
if (n > max)
- THROW((iskey ? "Key" : "Value") << " (" << n
- << ") too large for size (" << size << ")");
+ THROW((iskey ? "Key" : "Value") << " (" << n << ") too large for size (" << size << ")");
/* Setup the buffer, defaulting to zero filled. */
workgen_u64_to_string_zf(n, result, size);
/*
- * Compressibility is a percentage, 100 is all zeroes, it applies to the
- * proportion of the value that can't be used for the identifier.
+ * Compressibility is a percentage, 100 is all zeroes, it applies to the proportion of the value
+ * that can't be used for the identifier.
*/
if (size > 20 && compressibility < 100) {
static const char alphanum[] =
@@ -1571,50 +1642,54 @@ void Operation::kv_gen(ThreadRunner *runner, bool iskey,
for (uint64_t i = 0; i < random_len; ++i)
/*
- * TODO: It'd be nice to use workgen_rand here, but this class
- * is without the context of a runner thread, so it's not easy
- * to get access to a state.
+ * TODO: It'd be nice to use workgen_rand here, but this class is without the context of
+ * a runner thread, so it's not easy to get access to a state.
*/
result[i] = alphanum[runner->random_value() % (sizeof(alphanum) - 1)];
}
}
-void Operation::size_check() const {
+void
+Operation::size_check() const
+{
if (is_table_op()) {
if (_key._size == 0 && _table.options.key_size == 0)
THROW("operation requires a key size");
- if (OP_HAS_VALUE(this) && _value._size == 0 &&
- _table.options.value_size == 0)
+ if (OP_HAS_VALUE(this) && _value._size == 0 && _table.options.value_size == 0)
THROW("operation requires a value size");
}
}
-void Operation::synchronized_check() const {
+void
+Operation::synchronized_check() const
+{
if (_timed != 0.0)
return;
if (_optype != Operation::OP_NONE) {
if (is_table_op() || _internal->sync_time_us() == 0)
THROW("operation cannot be synchronized, needs to be timed()");
} else if (_group != NULL) {
- for (std::vector<Operation>::iterator i = _group->begin();
- i != _group->end(); i++)
+ for (std::vector<Operation>::iterator i = _group->begin(); i != _group->end(); i++)
i->synchronized_check();
}
}
-int CheckpointOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
+int
+CheckpointOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
{
- (void)runner; /* not used */
+ (void)runner; /* not used */
return (session->checkpoint(session, NULL));
}
-int LogFlushOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
+int
+LogFlushOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
{
- (void)runner; /* not used */
+ (void)runner; /* not used */
return (session->log_flush(session, NULL));
}
-void SleepOperationInternal::parse_config(const std::string &config)
+void
+SleepOperationInternal::parse_config(const std::string &config)
{
const char *configp;
char *endp;
@@ -1622,17 +1697,19 @@ void SleepOperationInternal::parse_config(const std::string &config)
configp = config.c_str();
_sleepvalue = strtod(configp, &endp);
if (configp == endp || *endp != '\0' || _sleepvalue < 0.0)
- THROW("sleep operation requires a configuration string as "
+ THROW(
+ "sleep operation requires a configuration string as "
"a non-negative float, e.g. '1.5'");
}
-int SleepOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
+int
+SleepOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
{
uint64_t endtime;
uint64_t now, now_us;
- (void)runner; /* not used */
- (void)session; /* not used */
+ (void)runner; /* not used */
+ (void)session; /* not used */
workgen_clock(&now);
now_us = ns_to_us(now);
@@ -1656,12 +1733,14 @@ int SleepOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
return (0);
}
-uint64_t SleepOperationInternal::sync_time_us() const
+uint64_t
+SleepOperationInternal::sync_time_us() const
{
return (secs_us(_sleepvalue));
}
-void TableOperationInternal::parse_config(const std::string &config)
+void
+TableOperationInternal::parse_config(const std::string &config)
{
if (!config.empty()) {
if (config == "reopen")
@@ -1671,17 +1750,18 @@ void TableOperationInternal::parse_config(const std::string &config)
}
}
-Track::Track(bool latency_tracking) : ops_in_progress(0), ops(0), rollbacks(0),
- latency_ops(0), latency(0), bucket_ops(0), min_latency(0), max_latency(0),
- us(NULL), ms(NULL), sec(NULL) {
+Track::Track(bool latency_tracking)
+ : ops_in_progress(0), ops(0), rollbacks(0), latency_ops(0), latency(0), bucket_ops(0),
+ min_latency(0), max_latency(0), us(NULL), ms(NULL), sec(NULL)
+{
track_latency(latency_tracking);
}
-Track::Track(const Track &other) : ops_in_progress(other.ops_in_progress),
- ops(other.ops), rollbacks(other.rollbacks),
- latency_ops(other.latency_ops), latency(other.latency),
- bucket_ops(other.bucket_ops), min_latency(other.min_latency),
- max_latency(other.max_latency), us(NULL), ms(NULL), sec(NULL) {
+Track::Track(const Track &other)
+ : ops_in_progress(other.ops_in_progress), ops(other.ops), rollbacks(other.rollbacks),
+ latency_ops(other.latency_ops), latency(other.latency), bucket_ops(other.bucket_ops),
+ min_latency(other.min_latency), max_latency(other.max_latency), us(NULL), ms(NULL), sec(NULL)
+{
if (other.us != NULL) {
us = new uint32_t[LATENCY_US_BUCKETS];
ms = new uint32_t[LATENCY_MS_BUCKETS];
@@ -1692,7 +1772,8 @@ Track::Track(const Track &other) : ops_in_progress(other.ops_in_progress),
}
}
-Track::~Track() {
+Track::~Track()
+{
if (us != NULL) {
delete us;
delete ms;
@@ -1700,7 +1781,9 @@ Track::~Track() {
}
}
-void Track::add(Track &other, bool reset) {
+void
+Track::add(Track &other, bool reset)
+{
ops_in_progress += other.ops_in_progress;
ops += other.ops;
latency_ops += other.latency_ops;
@@ -1723,7 +1806,9 @@ void Track::add(Track &other, bool reset) {
}
}
-void Track::assign(const Track &other) {
+void
+Track::assign(const Track &other)
+{
ops_in_progress = other.ops_in_progress;
ops = other.ops;
latency_ops = other.latency_ops;
@@ -1738,8 +1823,7 @@ void Track::assign(const Track &other) {
us = NULL;
ms = NULL;
sec = NULL;
- }
- else if (other.us != NULL && us == NULL) {
+ } else if (other.us != NULL && us == NULL) {
us = new uint32_t[LATENCY_US_BUCKETS];
ms = new uint32_t[LATENCY_MS_BUCKETS];
sec = new uint32_t[LATENCY_SEC_BUCKETS];
@@ -1751,18 +1835,24 @@ void Track::assign(const Track &other) {
}
}
-uint64_t Track::average_latency() const {
+uint64_t
+Track::average_latency() const
+{
if (latency_ops == 0)
return (0);
else
return (latency / latency_ops);
}
-void Track::begin() {
+void
+Track::begin()
+{
ops_in_progress++;
}
-void Track::clear() {
+void
+Track::clear()
+{
ops_in_progress = 0;
ops = 0;
rollbacks = 0;
@@ -1778,12 +1868,16 @@ void Track::clear() {
}
}
-void Track::complete() {
+void
+Track::complete()
+{
--ops_in_progress;
ops++;
}
-void Track::complete_with_latency(uint64_t usecs) {
+void
+Track::complete_with_latency(uint64_t usecs)
+{
ASSERT(us != NULL);
--ops_in_progress;
@@ -1816,7 +1910,9 @@ void Track::complete_with_latency(uint64_t usecs) {
// Return the latency for which the given percent is lower than it.
// E.g. for percent == 95, returns the latency for which 95% of latencies
// are faster (lower), and 5% are slower (higher).
-uint64_t Track::percentile_latency(int percent) const {
+uint64_t
+Track::percentile_latency(int percent) const
+{
// Get the total number of operations in the latency buckets.
// We can't reliably use latency_ops, because this struct was
// added up from Track structures that were being copied while
@@ -1856,7 +1952,9 @@ uint64_t Track::percentile_latency(int percent) const {
return (0);
}
-void Track::subtract(const Track &other) {
+void
+Track::subtract(const Track &other)
+{
ops_in_progress -= other.ops_in_progress;
ops -= other.ops;
latency_ops -= other.latency_ops;
@@ -1874,7 +1972,9 @@ void Track::subtract(const Track &other) {
}
}
-void Track::track_latency(bool newval) {
+void
+Track::track_latency(bool newval)
+{
if (newval) {
if (us == NULL) {
us = new uint32_t[LATENCY_US_BUCKETS];
@@ -1896,21 +1996,27 @@ void Track::track_latency(bool newval) {
}
}
-void Track::_get_us(long *result) {
+void
+Track::_get_us(long *result)
+{
if (us != NULL) {
for (int i = 0; i < LATENCY_US_BUCKETS; i++)
result[i] = (long)us[i];
} else
memset(result, 0, sizeof(long) * LATENCY_US_BUCKETS);
}
-void Track::_get_ms(long *result) {
+void
+Track::_get_ms(long *result)
+{
if (ms != NULL) {
for (int i = 0; i < LATENCY_MS_BUCKETS; i++)
result[i] = (long)ms[i];
} else
memset(result, 0, sizeof(long) * LATENCY_MS_BUCKETS);
}
-void Track::_get_sec(long *result) {
+void
+Track::_get_sec(long *result)
+{
if (sec != NULL) {
for (int i = 0; i < LATENCY_SEC_BUCKETS; i++)
result[i] = (long)sec[i];
@@ -1918,19 +2024,23 @@ void Track::_get_sec(long *result) {
memset(result, 0, sizeof(long) * LATENCY_SEC_BUCKETS);
}
-Stats::Stats(bool latency) : checkpoint(latency), insert(latency),
- not_found(latency), read(latency), remove(latency), update(latency),
- truncate(latency) {
+Stats::Stats(bool latency)
+ : checkpoint(latency), insert(latency), not_found(latency), read(latency), remove(latency),
+ update(latency), truncate(latency)
+{
}
-Stats::Stats(const Stats &other) : checkpoint(other.checkpoint),
- insert(other.insert), not_found(other.not_found), read(other.read),
- remove(other.remove), update(other.update), truncate(other.truncate) {
+Stats::Stats(const Stats &other)
+ : checkpoint(other.checkpoint), insert(other.insert), not_found(other.not_found),
+ read(other.read), remove(other.remove), update(other.update), truncate(other.truncate)
+{
}
Stats::~Stats() {}
-void Stats::add(Stats &other, bool reset) {
+void
+Stats::add(Stats &other, bool reset)
+{
checkpoint.add(other.checkpoint, reset);
insert.add(other.insert, reset);
not_found.add(other.not_found, reset);
@@ -1940,7 +2050,9 @@ void Stats::add(Stats &other, bool reset) {
truncate.add(other.truncate, reset);
}
-void Stats::assign(const Stats &other) {
+void
+Stats::assign(const Stats &other)
+{
checkpoint.assign(other.checkpoint);
insert.assign(other.insert);
not_found.assign(other.not_found);
@@ -1950,7 +2062,9 @@ void Stats::assign(const Stats &other) {
truncate.assign(other.truncate);
}
-void Stats::clear() {
+void
+Stats::clear()
+{
checkpoint.clear();
insert.clear();
not_found.clear();
@@ -1960,7 +2074,9 @@ void Stats::clear() {
truncate.clear();
}
-void Stats::describe(std::ostream &os) const {
+void
+Stats::describe(std::ostream &os) const
+{
os << "Stats: reads " << read.ops;
if (not_found.ops > 0) {
os << " (" << not_found.ops << " not found)";
@@ -1972,7 +2088,9 @@ void Stats::describe(std::ostream &os) const {
os << ", checkpoints " << checkpoint.ops;
}
-void Stats::final_report(std::ostream &os, timespec &totalsecs) const {
+void
+Stats::final_report(std::ostream &os, timespec &totalsecs) const
+{
uint64_t ops = 0;
ops += checkpoint.ops;
ops += read.ops;
@@ -1982,10 +2100,9 @@ void Stats::final_report(std::ostream &os, timespec &totalsecs) const {
ops += truncate.ops;
ops += remove.ops;
-#define FINAL_OUTPUT(os, field, singular, ops, totalsecs) \
- os << "Executed " << field << " " #singular " operations (" \
- << PCT(field, ops) << "%) " << OPS_PER_SEC(field, totalsecs) \
- << " ops/sec" << std::endl
+#define FINAL_OUTPUT(os, field, singular, ops, totalsecs) \
+ os << "Executed " << field << " " #singular " operations (" << PCT(field, ops) << "%) " \
+ << OPS_PER_SEC(field, totalsecs) << " ops/sec" << std::endl
FINAL_OUTPUT(os, read.ops, read, ops, totalsecs);
FINAL_OUTPUT(os, not_found.ops, not found, ops, totalsecs);
@@ -1996,7 +2113,9 @@ void Stats::final_report(std::ostream &os, timespec &totalsecs) const {
FINAL_OUTPUT(os, checkpoint.ops, checkpoint, ops, totalsecs);
}
-void Stats::report(std::ostream &os) const {
+void
+Stats::report(std::ostream &os) const
+{
os << read.ops << " reads";
if (not_found.ops > 0) {
os << " (" << not_found.ops << " not found)";
@@ -2008,7 +2127,9 @@ void Stats::report(std::ostream &os) const {
os << checkpoint.ops << " checkpoints";
}
-void Stats::subtract(const Stats &other) {
+void
+Stats::subtract(const Stats &other)
+{
checkpoint.subtract(other.checkpoint);
insert.subtract(other.insert);
not_found.subtract(other.not_found);
@@ -2018,7 +2139,9 @@ void Stats::subtract(const Stats &other) {
truncate.subtract(other.truncate);
}
-void Stats::track_latency(bool latency) {
+void
+Stats::track_latency(bool latency)
+{
checkpoint.track_latency(latency);
insert.track_latency(latency);
not_found.track_latency(latency);
@@ -2028,56 +2151,67 @@ void Stats::track_latency(bool latency) {
truncate.track_latency(latency);
}
-TableOptions::TableOptions() : key_size(0), value_size(0),
- value_compressibility(100), random_value(false), range(0), _options() {
- _options.add_int("key_size", key_size,
- "default size of the key, unless overridden by Key.size");
- _options.add_int("value_size", value_size,
- "default size of the value, unless overridden by Value.size");
- _options.add_bool("random_value", random_value,
- "generate random content for the value");
+TableOptions::TableOptions()
+ : key_size(0), value_size(0), value_compressibility(100), random_value(false), range(0),
+ _options()
+{
+ _options.add_int(
+ "key_size", key_size, "default size of the key, unless overridden by Key.size");
+ _options.add_int(
+ "value_size", value_size, "default size of the value, unless overridden by Value.size");
+ _options.add_bool("random_value", random_value, "generate random content for the value");
_options.add_bool("value_compressibility", value_compressibility,
"How compressible the generated value should be");
_options.add_int("range", range,
- "if zero, keys are inserted at the end and reads/updates are in the current range, if non-zero, inserts/reads/updates are at a random key between 0 and the given range");
+ "if zero, keys are inserted at the end and reads/updates are in the current range, if "
+ "non-zero, inserts/reads/updates are at a random key between 0 and the given range");
+}
+TableOptions::TableOptions(const TableOptions &other)
+ : key_size(other.key_size), value_size(other.value_size),
+ value_compressibility(other.value_compressibility), random_value(other.random_value),
+ range(other.range), _options(other._options)
+{
}
-TableOptions::TableOptions(const TableOptions &other) :
- key_size(other.key_size), value_size(other.value_size),
- value_compressibility(other.value_compressibility),
- random_value(other.random_value), range(other.range),
- _options(other._options) {}
TableOptions::~TableOptions() {}
-Table::Table() : options(), _uri(), _internal(new TableInternal()) {
-}
-Table::Table(const char *uri) : options(), _uri(uri),
- _internal(new TableInternal()) {
+Table::Table() : options(), _uri(), _internal(new TableInternal()) {}
+Table::Table(const char *uri) : options(), _uri(uri), _internal(new TableInternal()) {}
+Table::Table(const Table &other)
+ : options(other.options), _uri(other._uri), _internal(new TableInternal(*other._internal))
+{
}
-Table::Table(const Table &other) : options(other.options), _uri(other._uri),
- _internal(new TableInternal(*other._internal)) {
+Table::~Table()
+{
+ delete _internal;
}
-Table::~Table() { delete _internal; }
-Table& Table::operator=(const Table &other) {
+Table &
+Table::operator=(const Table &other)
+{
options = other.options;
_uri = other._uri;
*_internal = *other._internal;
return (*this);
}
-void Table::describe(std::ostream &os) const {
+void
+Table::describe(std::ostream &os) const
+{
os << "Table: " << _uri;
}
TableInternal::TableInternal() : _tint(0), _context_count(0) {}
-TableInternal::TableInternal(const TableInternal &other) : _tint(other._tint),
- _context_count(other._context_count) {}
+TableInternal::TableInternal(const TableInternal &other)
+ : _tint(other._tint), _context_count(other._context_count)
+{
+}
TableInternal::~TableInternal() {}
-WorkloadOptions::WorkloadOptions() : max_latency(0),
- report_file("workload.stat"), report_interval(0), run_time(0),
- sample_file("monitor.json"), sample_interval_ms(0), max_idle_table_cycle(0),
- sample_rate(1), warmup(0), oldest_timestamp_lag(0.0), stable_timestamp_lag(0.0),
- timestamp_advance(0.0), max_idle_table_cycle_fatal(false), _options() {
+WorkloadOptions::WorkloadOptions()
+ : max_latency(0), report_file("workload.stat"), report_interval(0), run_time(0),
+ sample_file("monitor.json"), sample_interval_ms(0), max_idle_table_cycle(0), sample_rate(1),
+ warmup(0), oldest_timestamp_lag(0.0), stable_timestamp_lag(0.0), timestamp_advance(0.0),
+ max_idle_table_cycle_fatal(false), _options()
+{
_options.add_int("max_latency", max_latency,
"prints warning if any latency measured exceeds this number of "
"milliseconds. Requires sample_interval to be configured.");
@@ -2102,8 +2236,8 @@ WorkloadOptions::WorkloadOptions() : max_latency(0),
_options.add_int("sample_rate", sample_rate,
"how often the latency of operations is measured. 1 for every operation, "
"2 for every second operation, 3 for every third operation etc.");
- _options.add_int("warmup", warmup,
- "how long to run the workload phase before starting measurements");
+ _options.add_int(
+ "warmup", warmup, "how long to run the workload phase before starting measurements");
_options.add_double("oldest_timestamp_lag", oldest_timestamp_lag,
"how much lag to the oldest timestamp from epoch time");
_options.add_double("stable_timestamp_lag", stable_timestamp_lag,
@@ -2115,31 +2249,38 @@ WorkloadOptions::WorkloadOptions() : max_latency(0),
"print warning (false) or abort (true) of max_idle_table_cycle failure");
}
-WorkloadOptions::WorkloadOptions(const WorkloadOptions &other) :
- max_latency(other.max_latency), report_interval(other.report_interval),
- run_time(other.run_time), sample_interval_ms(other.sample_interval_ms),
- sample_rate(other.sample_rate), _options(other._options) {}
+WorkloadOptions::WorkloadOptions(const WorkloadOptions &other)
+ : max_latency(other.max_latency), report_interval(other.report_interval),
+ run_time(other.run_time), sample_interval_ms(other.sample_interval_ms),
+ sample_rate(other.sample_rate), _options(other._options)
+{
+}
WorkloadOptions::~WorkloadOptions() {}
-Workload::Workload(Context *context, const ThreadListWrapper &tlw) :
- options(), stats(), _context(context), _threads(tlw._threads) {
+Workload::Workload(Context *context, const ThreadListWrapper &tlw)
+ : options(), stats(), _context(context), _threads(tlw._threads)
+{
if (context == NULL)
THROW("Workload contructor requires a Context");
}
-Workload::Workload(Context *context, const Thread &thread) :
- options(), stats(), _context(context), _threads() {
+Workload::Workload(Context *context, const Thread &thread)
+ : options(), stats(), _context(context), _threads()
+{
if (context == NULL)
THROW("Workload contructor requires a Context");
_threads.push_back(thread);
}
-Workload::Workload(const Workload &other) :
- options(other.options), stats(other.stats), _context(other._context),
- _threads(other._threads) {}
+Workload::Workload(const Workload &other)
+ : options(other.options), stats(other.stats), _context(other._context), _threads(other._threads)
+{
+}
Workload::~Workload() {}
-Workload& Workload::operator=(const Workload &other) {
+Workload &
+Workload::operator=(const Workload &other)
+{
options = other.options;
stats.assign(other.stats);
*_context = *other._context;
@@ -2147,53 +2288,63 @@ Workload& Workload::operator=(const Workload &other) {
return (*this);
}
-int Workload::run(WT_CONNECTION *conn) {
+int
+Workload::run(WT_CONNECTION *conn)
+{
WorkloadRunner runner(this);
return (runner.run(conn));
}
-WorkloadRunner::WorkloadRunner(Workload *workload) :
- _workload(workload), _trunners(workload->_threads.size()),
- _report_out(&std::cout), _start(), stopping(false) {
+WorkloadRunner::WorkloadRunner(Workload *workload)
+ : _workload(workload), _trunners(workload->_threads.size()), _report_out(&std::cout), _start(),
+ stopping(false)
+{
ts_clear(_start);
}
WorkloadRunner::~WorkloadRunner() {}
-int WorkloadRunner::run(WT_CONNECTION *conn) {
+int
+WorkloadRunner::run(WT_CONNECTION *conn)
+{
WT_DECL_RET;
WorkloadOptions *options = &_workload->options;
std::ofstream report_out;
_wt_home = conn->get_home(conn);
- if ( (options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) && options->timestamp_advance < 0 )
- THROW("Workload.options.timestamp_advance must be positive if either Workload.options.oldest_timestamp_lag or Workload.options.stable_timestamp_lag is set");
+ if ((options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) &&
+ options->timestamp_advance < 0)
+ THROW(
+ "Workload.options.timestamp_advance must be positive if either "
+ "Workload.options.oldest_timestamp_lag or Workload.options.stable_timestamp_lag is set");
if (options->sample_interval_ms > 0 && options->sample_rate <= 0)
THROW("Workload.options.sample_rate must be positive");
if (!options->report_file.empty()) {
- open_report_file(report_out, options->report_file.c_str(),
- "Workload.options.report_file");
+ open_report_file(report_out, options->report_file.c_str(), "Workload.options.report_file");
_report_out = &report_out;
}
WT_ERR(create_all(conn, _workload->_context));
WT_ERR(open_all());
WT_ERR(ThreadRunner::cross_check(_trunners));
WT_ERR(run_all(conn));
- err:
- //TODO: (void)close_all();
+err:
+ // TODO: (void)close_all();
_report_out = &std::cout;
return (ret);
}
-int WorkloadRunner::open_all() {
+int
+WorkloadRunner::open_all()
+{
for (size_t i = 0; i < _trunners.size(); i++) {
WT_RET(_trunners[i].open_all());
}
return (0);
}
-void WorkloadRunner::open_report_file(std::ofstream &of, const char *filename,
- const char *desc) {
+void
+WorkloadRunner::open_report_file(std::ofstream &of, const char *filename, const char *desc)
+{
std::stringstream sstm;
if (!_wt_home.empty())
@@ -2201,11 +2352,12 @@ void WorkloadRunner::open_report_file(std::ofstream &of, const char *filename,
sstm << filename;
of.open(sstm.str().c_str(), std::fstream::app);
if (!of)
- THROW_ERRNO(errno, desc << ": \"" << sstm.str()
- << "\" could not be opened");
+ THROW_ERRNO(errno, desc << ": \"" << sstm.str() << "\" could not be opened");
}
-int WorkloadRunner::create_all(WT_CONNECTION *conn, Context *context) {
+int
+WorkloadRunner::create_all(WT_CONNECTION *conn, Context *context)
+{
for (size_t i = 0; i < _trunners.size(); i++) {
ThreadRunner *runner = &_trunners[i];
std::stringstream sstm;
@@ -2227,20 +2379,25 @@ int WorkloadRunner::create_all(WT_CONNECTION *conn, Context *context) {
return (0);
}
-int WorkloadRunner::close_all() {
+int
+WorkloadRunner::close_all()
+{
for (size_t i = 0; i < _trunners.size(); i++)
_trunners[i].close_all();
return (0);
}
-void WorkloadRunner::get_stats(Stats *result) {
+void
+WorkloadRunner::get_stats(Stats *result)
+{
for (size_t i = 0; i < _trunners.size(); i++)
result->add(_trunners[i]._stats);
}
-void WorkloadRunner::report(time_t interval, time_t totalsecs,
- Stats *prev_totals) {
+void
+WorkloadRunner::report(time_t interval, time_t totalsecs, Stats *prev_totals)
+{
std::ostream &out = *_report_out;
Stats new_totals(prev_totals->track_latency());
@@ -2249,11 +2406,12 @@ void WorkloadRunner::report(time_t interval, time_t totalsecs,
diff.subtract(*prev_totals);
prev_totals->assign(new_totals);
diff.report(out);
- out << " in " << interval << " secs ("
- << totalsecs << " total secs)" << std::endl;
+ out << " in " << interval << " secs (" << totalsecs << " total secs)" << std::endl;
}
-void WorkloadRunner::final_report(timespec &totalsecs) {
+void
+WorkloadRunner::final_report(timespec &totalsecs)
+{
std::ostream &out = *_report_out;
Stats *stats = &_workload->stats;
@@ -2265,7 +2423,9 @@ void WorkloadRunner::final_report(timespec &totalsecs) {
out << "Run completed: " << totalsecs << " seconds" << std::endl;
}
-int WorkloadRunner::run_all(WT_CONNECTION *conn) {
+int
+WorkloadRunner::run_all(WT_CONNECTION *conn)
+{
void *status;
std::vector<pthread_t> thread_handles;
Stats counts(false);
@@ -2293,13 +2453,11 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
monitor._out = &monitor_out;
if (!options->sample_file.empty()) {
- open_report_file(monitor_json, options->sample_file.c_str(),
- "sample JSON output file");
+ open_report_file(monitor_json, options->sample_file.c_str(), "sample JSON output file");
monitor._json = &monitor_json;
}
- if ((ret = pthread_create(&monitor._handle, NULL, monitor_main,
- &monitor)) != 0) {
+ if ((ret = pthread_create(&monitor._handle, NULL, monitor_main, &monitor)) != 0) {
std::cerr << "monitor thread failed err=" << ret << std::endl;
return (ret);
}
@@ -2310,8 +2468,7 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
ThreadRunner *runner = &_trunners[i];
runner->_stop = false;
runner->_repeat = (options->run_time != 0);
- if ((ret = pthread_create(&thandle, NULL, thread_runner_main,
- runner)) != 0) {
+ if ((ret = pthread_create(&thandle, NULL, thread_runner_main, runner)) != 0) {
std::cerr << "pthread_create failed err=" << ret << std::endl;
std::cerr << "Stopping all threads." << std::endl;
for (size_t j = 0; j < thread_handles.size(); j++) {
@@ -2331,8 +2488,7 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
runnerConnection->runner = this;
runnerConnection->connection = conn;
- if ((ret = pthread_create(&time_thandle, NULL, thread_workload,
- runnerConnection)) != 0) {
+ if ((ret = pthread_create(&time_thandle, NULL, thread_workload, runnerConnection)) != 0) {
std::cerr << "pthread_create failed err=" << ret << std::endl;
std::cerr << "Stopping Time threads." << std::endl;
(void)pthread_join(time_thandle, &status);
@@ -2350,7 +2506,7 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
createDropTableCycle->connection = conn;
if ((ret = pthread_create(&idle_table_thandle, NULL, thread_idle_table_cycle_workload,
- createDropTableCycle)) != 0) {
+ createDropTableCycle)) != 0) {
std::cerr << "pthread_create failed err=" << ret << std::endl;
std::cerr << "Stopping Create Drop table idle cycle threads." << std::endl;
(void)pthread_join(idle_table_thandle, &status);
@@ -2363,8 +2519,7 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
timespec now;
/* Don't run the test if any of the above pthread_create fails. */
- if (!stopping && ret == 0)
- {
+ if (!stopping && ret == 0) {
// Treat warmup separately from report interval so that if we have a
// warmup period we clear and ignore stats after it ends.
if (options->warmup != 0)
@@ -2395,7 +2550,7 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
if (sleep_amt.tv_sec > 0)
sleep((unsigned int)sleep_amt.tv_sec);
else
- usleep((useconds_t)((sleep_amt.tv_nsec + 999)/ 1000));
+ usleep((useconds_t)((sleep_amt.tv_nsec + 999) / 1000));
workgen_epoch(&now);
if (now >= next_report && now < end && options->report_interval != 0) {
@@ -2421,8 +2576,7 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
for (size_t i = 0; i < _trunners.size(); i++) {
WT_TRET(pthread_join(thread_handles[i], &status));
if (_trunners[i]._errno != 0)
- VERBOSE(_trunners[i],
- "Thread " << i << " has errno " << _trunners[i]._errno);
+ VERBOSE(_trunners[i], "Thread " << i << " has errno " << _trunners[i]._errno);
WT_TRET(_trunners[i]._errno);
_trunners[i].close_all();
if (exception == NULL && !_trunners[i]._exception._str.empty())
@@ -2445,8 +2599,7 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
if (options->sample_interval_ms > 0) {
WT_TRET(pthread_join(monitor._handle, &status));
if (monitor._errno != 0)
- std::cerr << "Monitor thread has errno " << monitor._errno
- << std::endl;
+ std::cerr << "Monitor thread has errno " << monitor._errno << std::endl;
if (exception == NULL && !monitor._exception._str.empty())
exception = &monitor._exception;
@@ -2467,4 +2620,4 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
return (ret);
}
-}
+} // namespace workgen