summaryrefslogtreecommitdiff
path: root/bench/workgen/workgen.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'bench/workgen/workgen.cxx')
-rw-r--r--bench/workgen/workgen.cxx1651
1 files changed, 1651 insertions, 0 deletions
diff --git a/bench/workgen/workgen.cxx b/bench/workgen/workgen.cxx
new file mode 100644
index 00000000000..880b8ca6467
--- /dev/null
+++ b/bench/workgen/workgen.cxx
@@ -0,0 +1,1651 @@
+/*-
+ * Public Domain 2014-2017 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#define __STDC_LIMIT_MACROS // needed to get UINT64_MAX in C++
+#include <iomanip>
+#include <iostream>
+#include <fstream>
+#include <sstream>
+#include "wiredtiger.h"
+#include "workgen.h"
+#include "workgen_int.h"
+#include "workgen_time.h"
+extern "C" {
+// Include some specific WT files, as some files included by wt_internal.h
+// have some C-ism's that don't work in C++.
+#include <pthread.h>
+#include <string.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <math.h>
+#include "error.h"
+#include "misc.h"
+}
+
+#define LATENCY_US_BUCKETS 1000
+#define LATENCY_MS_BUCKETS 1000
+#define LATENCY_SEC_BUCKETS 100
+
+#define THROTTLE_PER_SEC 20 // times per sec we will throttle
+
+#define MIN(a, b) ((a) < (b) ? (a) : (b))
+#define MAX(a, b) ((a) < (b) ? (b) : (a))
+#define TIMESPEC_DOUBLE(ts) ((double)(ts).tv_sec + ts.tv_nsec * 0.000000001)
+#define PCT(n, total) ((total) == 0 ? 0 : ((n) * 100) / (total))
+#define OPS_PER_SEC(ops, ts) (int) ((ts) == 0 ? 0.0 : \
+ (ops) / TIMESPEC_DOUBLE(ts))
+
+// Get the value of a STL container, even if it is not present
+#define CONTAINER_VALUE(container, idx, dfault) \
+ (((container).count(idx) > 0) ? (container)[idx] : (dfault))
+
+#define CROSS_USAGE(a, b) \
+ (((a & USAGE_READ) != 0 && (b & USAGE_WRITE) != 0) || \
+ ((a & USAGE_WRITE) != 0 && (b & USAGE_READ) != 0))
+
+#define ASSERT(cond) \
+ do { \
+ if (!(cond)) { \
+ fprintf(stderr, "%s:%d: ASSERT failed: %s\n", \
+ __FILE__, __LINE__, #cond); \
+ abort(); \
+ } \
+ } while(0)
+
+#define THROW_ERRNO(e, args) \
+ do { \
+ std::stringstream __sstm; \
+ __sstm << args; \
+ WorkgenException __wge(e, __sstm.str().c_str()); \
+ throw(__wge); \
+ } while(0)
+
+#define THROW(args) THROW_ERRNO(0, args)
+
+#define VERBOSE(runner, args) \
+ do { \
+ if ((runner)._context->_verbose) \
+ std::cout << args << std::endl; \
+ } while(0)
+
+#define OP_HAS_VALUE(op) \
+ ((op)->_optype == Operation::OP_INSERT || \
+ (op)->_optype == Operation::OP_UPDATE)
+
+namespace workgen {
+
+// The number of contexts. Normally there is one context created, but it will
+// be possible to use several eventually. More than one is not yet
+// implemented, but we must at least guard against the caller creating more
+// than one.
+static uint32_t context_count = 0;
+
+static void *thread_runner_main(void *arg) {
+ ThreadRunner *runner = (ThreadRunner *)arg;
+ try {
+ runner->_errno = runner->run();
+ } catch (WorkgenException &wge) {
+ runner->_exception = wge;
+ }
+ return (NULL);
+}
+
+static void *monitor_main(void *arg) {
+ Monitor *monitor = (Monitor *)arg;
+ try {
+ monitor->_errno = monitor->run();
+ } catch (WorkgenException &wge) {
+ monitor->_exception = wge;
+ }
+ return (NULL);
+}
+
+// Exponentiate (like the pow function), except that it returns an exact
+// integral 64 bit value, and if it overflows, returns the maximum possible
+// value for the return type.
+static uint64_t power64(int base, int exp) {
+ uint64_t last, result;
+
+ result = 1;
+ for (int i = 0; i < exp; i++) {
+ last = result;
+ result *= base;
+ if (result < last)
+ return UINT64_MAX;
+ }
+ return result;
+}
+
+OptionsList::OptionsList() : _option_map() {}
+OptionsList::OptionsList(const OptionsList &other) :
+ _option_map(other._option_map) {}
+
+void OptionsList::add_option(const char *name, const std::string typestr,
+ const char *desc) {
+ TypeDescPair pair(typestr, desc);
+ _option_map[name] = pair;
+}
+
+void OptionsList::add_int(const char *name, int default_value,
+ const char *desc) {
+ std::stringstream sstm;
+ sstm << "int, default=" << default_value;
+ add_option(name, sstm.str(), desc);
+}
+
+void OptionsList::add_bool(const char *name, bool default_value,
+ const char *desc) {
+ std::stringstream sstm;
+ sstm << "boolean, default=" << (default_value ? "true" : "false");
+ add_option(name, sstm.str(), desc);
+}
+
+void OptionsList::add_double(const char *name, double default_value,
+ const char *desc) {
+ std::stringstream sstm;
+ sstm << "double, default=" << default_value;
+ add_option(name, sstm.str(), desc);
+}
+
+void OptionsList::add_string(const char *name,
+ const std::string &default_value, const char *desc) {
+ std::stringstream sstm;
+ sstm << "string, default=\"" << default_value << "\"";
+ add_option(name, sstm.str(), desc);
+}
+
+static void
+pretty_print(const char *p, const char *indent, std::stringstream &sstm)
+{
+ const char *t;
+
+ for (;; p = t + 1) {
+ if (strlen(p) <= 70)
+ break;
+ for (t = p + 70; t > p && *t != ' '; --t)
+ ;
+ if (t == p) /* No spaces? */
+ break;
+ if (indent != NULL)
+ sstm << indent;
+ std::string line(p, (size_t)(t - p));
+ sstm << line << std::endl;
+ }
+ if (*p != '\0') {
+ if (indent != NULL)
+ sstm << indent;
+ sstm << p << std::endl;
+ }
+}
+
+std::string OptionsList::help() const {
+ std::stringstream sstm;
+ for (std::map<std::string, TypeDescPair>::const_iterator i =
+ _option_map.begin(); i != _option_map.end(); i++) {
+ sstm << i->first << " (" << i->second.first << ")" << std::endl;
+ pretty_print(i->second.second.c_str(), "\t", sstm);
+ }
+ return sstm.str();
+}
+
+std::string OptionsList::help_description(const char *option_name) const {
+ const std::string key(option_name);
+ if (_option_map.count(key) == 0)
+ return (std::string(""));
+ else
+ return (_option_map.find(key)->second.second);
+}
+
+std::string OptionsList::help_type(const char *option_name) const {
+ const std::string key(option_name);
+ if (_option_map.count(key) == 0)
+ return std::string("");
+ else
+ return (_option_map.find(key)->second.first);
+}
+
+Context::Context() : _verbose(false), _internal(new ContextInternal()) {}
+Context::~Context() { delete _internal; }
+Context& Context::operator=(const Context &other) {
+ _verbose = other._verbose;
+ *_internal = *other._internal;
+ return (*this);
+}
+
+ContextInternal::ContextInternal() : _tint(), _table_names(),
+ _recno(NULL), _recno_alloced(0), _tint_last(0), _context_count(0) {
+ uint32_t count;
+ if ((count = workgen_atomic_add32(&context_count, 1)) != 1)
+ THROW("multiple Contexts not supported");
+ _context_count = count;
+}
+
+ContextInternal::~ContextInternal() {
+ if (_recno != NULL)
+ delete _recno;
+}
+
+int ContextInternal::create_all() {
+ if (_recno_alloced != _tint_last) {
+ // The array references are 1-based, we'll waste one entry.
+ uint64_t *new_recno = new uint64_t[_tint_last + 1];
+ memcpy(new_recno, _recno, sizeof(uint64_t) * _recno_alloced);
+ memset(&new_recno[_recno_alloced], 0,
+ sizeof(uint64_t) * (_tint_last - _recno_alloced + 1));
+ delete _recno;
+ _recno = new_recno;
+ _recno_alloced = _tint_last;
+ }
+ return (0);
+}
+
+Monitor::Monitor(WorkloadRunner &wrunner) :
+ _errno(0), _exception(), _wrunner(wrunner), _stop(false), _handle(),
+ _out(NULL), _json(NULL) {}
+Monitor::~Monitor() {}
+
+int Monitor::run() {
+ struct timespec t;
+ struct tm *tm, _tm;
+ char time_buf[64], version[100];
+ Stats prev_totals;
+ WorkloadOptions *options = &_wrunner._workload->options;
+ uint64_t latency_max = (uint64_t)options->max_latency;
+ bool first;
+
+ (*_out) << "#time,"
+ << "totalsec,"
+ << "read ops per second,"
+ << "insert ops per second,"
+ << "update ops per second,"
+ << "checkpoints,"
+ << "read average latency(uS),"
+ << "read minimum latency(uS),"
+ << "read maximum latency(uS),"
+ << "insert average latency(uS),"
+ << "insert min latency(uS),"
+ << "insert maximum latency(uS),"
+ << "update average latency(uS),"
+ << "update min latency(uS),"
+ << "update maximum latency(uS)"
+ << std::endl;
+
+ first = true;
+ workgen_version(version, sizeof(version));
+ Stats prev_interval;
+ while (!_stop) {
+ for (int i = 0; i < options->sample_interval && !_stop; i++)
+ sleep(1);
+ if (_stop)
+ break;
+
+ workgen_epoch(&t);
+ tm = localtime_r(&t.tv_sec, &_tm);
+ (void)strftime(time_buf, sizeof(time_buf), "%b %d %H:%M:%S", tm);
+
+ Stats new_totals(true);
+ for (std::vector<ThreadRunner>::iterator tr =
+ _wrunner._trunners.begin(); tr != _wrunner._trunners.end(); tr++)
+ new_totals.add(tr->_stats, true);
+ Stats interval(new_totals);
+ interval.subtract(prev_totals);
+ interval.smooth(prev_interval);
+
+ int interval_secs = options->sample_interval;
+ uint64_t cur_reads = interval.read.ops / interval_secs;
+ uint64_t cur_inserts = interval.insert.ops / interval_secs;
+ uint64_t cur_updates = interval.update.ops / interval_secs;
+
+ uint64_t totalsec = ts_sec(t - _wrunner._start);
+ (*_out) << time_buf
+ << "," << totalsec
+ << "," << cur_reads
+ << "," << cur_inserts
+ << "," << cur_updates
+ << "," << 'N' // checkpoint in progress
+ << "," << interval.read.average_latency()
+ << "," << interval.read.min_latency
+ << "," << interval.read.max_latency
+ << "," << interval.insert.average_latency()
+ << "," << interval.insert.min_latency
+ << "," << interval.insert.max_latency
+ << "," << interval.update.average_latency()
+ << "," << interval.update.min_latency
+ << "," << interval.update.max_latency
+ << std::endl;
+
+ if (_json != NULL) {
+#define WORKGEN_TIMESTAMP_JSON "%Y-%m-%dT%H:%M:%S.000Z"
+ (void)strftime(time_buf, sizeof(time_buf),
+ WORKGEN_TIMESTAMP_JSON, tm);
+
+#define TRACK_JSON(name, t) \
+ "\"" << (name) << "\":{" \
+ << "\"ops per sec\":" << ((t).ops / interval_secs) \
+ << ",\"average latency\":" << (t).average_latency() \
+ << ",\"min latency\":" << (t).min_latency \
+ << ",\"max latency\":" << (t).max_latency \
+ << "}"
+
+ (*_json) << "{";
+ if (first) {
+ (*_json) << "\"version\":\"" << version << "\",";
+ first = false;
+ }
+ (*_json) << "\"localTime\":\"" << time_buf
+ << "\",\"workgen\":{"
+ << TRACK_JSON("read", interval.read) << ","
+ << TRACK_JSON("insert", interval.insert) << ","
+ << TRACK_JSON("update", interval.update)
+ << "}}" << std::endl;
+ }
+
+ uint64_t read_max = interval.read.max_latency;
+ uint64_t insert_max = interval.read.max_latency;
+ uint64_t update_max = interval.read.max_latency;
+
+ if (latency_max != 0 &&
+ (read_max > latency_max || insert_max > latency_max ||
+ update_max > latency_max)) {
+ std::cerr << "WARNING: max latency exceeded:"
+ << " threshold " << latency_max
+ << " read max " << read_max
+ << " insert max " << insert_max
+ << " update max " << update_max << std::endl;
+ }
+
+ prev_interval.assign(interval);
+ prev_totals.assign(new_totals);
+ }
+ return (0);
+}
+
+ThreadRunner::ThreadRunner() :
+ _errno(0), _exception(), _thread(NULL), _context(NULL), _icontext(NULL),
+ _workload(NULL), _wrunner(NULL), _rand_state(NULL),
+ _throttle(NULL), _throttle_ops(0), _throttle_limit(0),
+ _in_transaction(false), _number(0), _stats(false), _table_usage(),
+ _cursors(NULL), _stop(false), _session(NULL), _keybuf(NULL),
+ _valuebuf(NULL), _repeat(false) {
+}
+
+ThreadRunner::~ThreadRunner() {
+ free_all();
+}
+
+int ThreadRunner::create_all(WT_CONNECTION *conn) {
+ size_t keysize, valuesize;
+
+ WT_RET(close_all());
+ ASSERT(_session == NULL);
+ WT_RET(conn->open_session(conn, NULL, NULL, &_session));
+ _table_usage.clear();
+ _stats.track_latency(_workload->options.sample_interval > 0);
+ WT_RET(workgen_random_alloc(_session, &_rand_state));
+ _throttle_ops = 0;
+ _throttle_limit = 0;
+ _in_transaction = 0;
+ keysize = 1;
+ valuesize = 1;
+ op_create_all(&_thread->_op, keysize, valuesize);
+ _keybuf = new char[keysize];
+ _valuebuf = new char[valuesize];
+ _keybuf[keysize - 1] = '\0';
+ _valuebuf[valuesize - 1] = '\0';
+ return (0);
+}
+
+int ThreadRunner::open_all() {
+ typedef WT_CURSOR *WT_CURSOR_PTR;
+ if (_cursors != NULL)
+ delete _cursors;
+ _cursors = new WT_CURSOR_PTR[_icontext->_tint_last + 1];
+ memset(_cursors, 0, sizeof (WT_CURSOR *) * (_icontext->_tint_last + 1));
+ for (std::map<uint32_t, uint32_t>::iterator i = _table_usage.begin();
+ i != _table_usage.end(); i++) {
+ uint32_t tindex = i->first;
+ const char *uri = _icontext->_table_names[tindex].c_str();
+ WT_RET(_session->open_cursor(_session, uri, NULL, NULL,
+ &_cursors[tindex]));
+ }
+ return (0);
+}
+
+int ThreadRunner::close_all() {
+ if (_throttle != NULL) {
+ delete _throttle;
+ _throttle = NULL;
+ }
+ if (_session != NULL) {
+ WT_RET(_session->close(_session, NULL));
+ _session = NULL;
+ }
+ free_all();
+ return (0);
+}
+
+void ThreadRunner::free_all() {
+ if (_rand_state != NULL) {
+ workgen_random_free(_rand_state);
+ _rand_state = NULL;
+ }
+ if (_cursors != NULL) {
+ delete _cursors;
+ _cursors = NULL;
+ }
+ if (_keybuf != NULL) {
+ delete _keybuf;
+ _keybuf = NULL;
+ }
+ if (_valuebuf != NULL) {
+ delete _valuebuf;
+ _valuebuf = NULL;
+ }
+}
+
+int ThreadRunner::cross_check(std::vector<ThreadRunner> &runners) {
+ std::map<uint32_t, uint32_t> usage;
+
+ // Determine which tables have cross usage
+ for (std::vector<ThreadRunner>::iterator r = runners.begin();
+ r != runners.end(); r++) {
+ for (std::map<uint32_t, uint32_t>::iterator i = r->_table_usage.begin();
+ i != r->_table_usage.end(); i++) {
+ uint32_t tindex = i->first;
+ uint32_t thisusage = i->second;
+ uint32_t curusage = CONTAINER_VALUE(usage, tindex, 0);
+ if (CROSS_USAGE(curusage, thisusage))
+ curusage |= USAGE_MIXED;
+ usage[tindex] = curusage;
+ }
+ }
+ for (std::map<uint32_t, uint32_t>::iterator i = usage.begin();
+ i != usage.end(); i++) {
+ if ((i->second & USAGE_MIXED) != 0) {
+ for (std::vector<ThreadRunner>::iterator r = runners.begin();
+ r != runners.end(); r++) {
+ r->_table_usage[i->first] |= USAGE_MIXED;
+ }
+ }
+ }
+ return (0);
+}
+
+int ThreadRunner::run() {
+ WT_DECL_RET;
+ ThreadOptions *options = &_thread->options;
+ std::string name = options->name;
+
+ VERBOSE(*this, "thread " << name << " running");
+ if (options->throttle != 0) {
+ _throttle = new Throttle(*this, options->throttle,
+ options->throttle_burst);
+ }
+ for (int cnt = 0; !_stop && (_repeat || cnt < 1) && ret == 0; cnt++)
+ WT_ERR(op_run(&_thread->_op));
+
+err:
+#ifdef _DEBUG
+ {
+ std::string messages = this->get_debug();
+ if (!messages.empty())
+ std::cerr << "DEBUG (thread " << name << "): "
+ << messages << std::endl;
+ }
+#endif
+ if (ret != 0)
+ std::cerr << "thread " << name << " failed err=" << ret << std::endl;
+ VERBOSE(*this, "thread " << name << "finished");
+ return (ret);
+}
+
+void ThreadRunner::get_static_counts(Stats &stats) {
+ _thread->_op.get_static_counts(stats, 1);
+}
+
+void ThreadRunner::op_create_all(Operation *op, size_t &keysize,
+ size_t &valuesize) {
+ tint_t tint;
+
+ op->size_check();
+ if (op->_optype != Operation::OP_NONE) {
+ op->kv_compute_max(true);
+ if (OP_HAS_VALUE(op))
+ op->kv_compute_max(false);
+ op->kv_size_buffer(true, keysize);
+ op->kv_size_buffer(false, valuesize);
+
+ // Note: to support multiple contexts we'd need a generation
+ // count whenever we execute.
+ if (op->_table._internal->_context_count != 0 &&
+ op->_table._internal->_context_count != _icontext->_context_count)
+ THROW("multiple Contexts not supported");
+ if ((tint = op->_table._internal->_tint) == 0) {
+ std::string uri = op->_table._uri;
+
+ // We are single threaded in this function, so do not have
+ // to worry about locking.
+ if (_icontext->_tint.count(uri) == 0) {
+ // TODO: don't use atomic add, it's overkill.
+ tint = workgen_atomic_add32(&_icontext->_tint_last, 1);
+ _icontext->_tint[uri] = tint;
+ _icontext->_table_names[tint] = uri;
+ } else
+ tint = _icontext->_tint[uri];
+ op->_table._internal->_tint = tint;
+ }
+ uint32_t usage_flags = CONTAINER_VALUE(_table_usage,
+ op->_table._internal->_tint, 0);
+ if (op->_optype == Operation::OP_SEARCH)
+ usage_flags |= ThreadRunner::USAGE_READ;
+ else
+ usage_flags |= ThreadRunner::USAGE_WRITE;
+ _table_usage[op->_table._internal->_tint] = usage_flags;
+ }
+ if (op->_group != NULL)
+ for (std::vector<Operation>::iterator i = op->_group->begin();
+ i != op->_group->end(); i++)
+ op_create_all(&*i, keysize, valuesize);
+}
+
+uint64_t ThreadRunner::op_get_key_recno(Operation *op, tint_t tint) {
+ uint64_t recno_count;
+ uint32_t rand;
+
+ recno_count = _icontext->_recno[tint];
+ if (recno_count == 0)
+ // The file has no entries, returning 0 forces a WT_NOTFOUND return.
+ return (0);
+ rand = workgen_random(_rand_state);
+ return (rand % recno_count + 1); // recnos are one-based.
+}
+
+int ThreadRunner::op_run(Operation *op) {
+ Track *track;
+ tint_t tint = op->_table._internal->_tint;
+ WT_CURSOR *cursor = _cursors[tint];
+ WT_DECL_RET;
+ uint64_t recno;
+ bool measure_latency;
+
+ recno = 0;
+ track = NULL;
+ if (_throttle != NULL) {
+ if (_throttle_ops >= _throttle_limit && !_in_transaction) {
+ WT_ERR(_throttle->throttle(_throttle_ops,
+ &_throttle_limit));
+ _throttle_ops = 0;
+ }
+ if (op->_optype != Operation::OP_NONE)
+ ++_throttle_ops;
+ }
+
+ // A potential race: thread1 is inserting, and increments
+ // Context->_recno[] for fileX.wt. thread2 is doing one of
+ // remove/search/update and grabs the new value of Context->_recno[]
+ // for fileX.wt. thread2 randomly chooses the highest recno (which
+ // has not yet been inserted by thread1), and when it accesses
+ // the record will get WT_NOTFOUND. It should be somewhat rare
+ // (and most likely when the threads are first beginning). Any
+ // WT_NOTFOUND returns are allowed and get their own statistic bumped.
+ switch (op->_optype) {
+ case Operation::OP_INSERT:
+ track = &_stats.insert;
+ recno = workgen_atomic_add64(&_icontext->_recno[tint], 1);
+ break;
+ case Operation::OP_REMOVE:
+ track = &_stats.remove;
+ recno = op_get_key_recno(op, tint);
+ break;
+ case Operation::OP_SEARCH:
+ track = &_stats.read;
+ recno = op_get_key_recno(op, tint);
+ break;
+ case Operation::OP_UPDATE:
+ track = &_stats.update;
+ recno = op_get_key_recno(op, tint);
+ break;
+ case Operation::OP_NONE:
+ recno = 0;
+ break;
+ }
+
+ measure_latency = track != NULL && track->ops != 0 &&
+ track->track_latency() &&
+ (track->ops % _workload->options.sample_rate == 0);
+
+ timespec start;
+ if (measure_latency)
+ workgen_epoch(&start);
+
+ if (op->_transaction != NULL) {
+ if (_in_transaction)
+ THROW("nested transactions not supported");
+ _session->begin_transaction(_session,
+ op->_transaction->_begin_config.c_str());
+ _in_transaction = true;
+ }
+ if (op->_optype != Operation::OP_NONE) {
+ op->kv_gen(true, recno, _keybuf);
+ cursor->set_key(cursor, _keybuf);
+ if (OP_HAS_VALUE(op)) {
+ op->kv_gen(false, recno, _valuebuf);
+ cursor->set_value(cursor, _valuebuf);
+ }
+ switch (op->_optype) {
+ case Operation::OP_INSERT:
+ WT_ERR(cursor->insert(cursor));
+ break;
+ case Operation::OP_REMOVE:
+ WT_ERR_NOTFOUND_OK(cursor->remove(cursor));
+ break;
+ case Operation::OP_SEARCH:
+ ret = cursor->search(cursor);
+ break;
+ case Operation::OP_UPDATE:
+ WT_ERR_NOTFOUND_OK(cursor->update(cursor));
+ break;
+ default:
+ ASSERT(false);
+ }
+ if (ret != 0) {
+ track = &_stats.not_found;
+ ret = 0; // WT_NOTFOUND allowed.
+ }
+ cursor->reset(cursor);
+ }
+ if (measure_latency) {
+ timespec stop;
+ workgen_epoch(&stop);
+ track->incr_with_latency(ts_us(stop - start));
+ } else if (track != NULL)
+ track->incr();
+
+ if (op->_group != NULL)
+ for (int count = 0; !_stop && count < op->_repeatgroup; count++)
+ for (std::vector<Operation>::iterator i = op->_group->begin();
+ i != op->_group->end(); i++)
+ WT_ERR(op_run(&*i));
+err:
+ if (op->_transaction != NULL) {
+ if (ret != 0 || op->_transaction->_rollback)
+ WT_TRET(_session->rollback_transaction(_session, NULL));
+ else
+ ret = _session->commit_transaction(_session,
+ op->_transaction->_commit_config.c_str());
+ _in_transaction = false;
+ }
+ return (ret);
+}
+
+#ifdef _DEBUG
+std::string ThreadRunner::get_debug() {
+ return (_debug_messages.str());
+}
+#endif
+
+Throttle::Throttle(ThreadRunner &runner, double throttle,
+ double throttle_burst) : _runner(runner), _throttle(throttle),
+ _burst(throttle_burst), _next_div(), _ops_delta(0), _ops_prev(0),
+ _ops_per_div(0), _ms_per_div(0), _started(false) {
+ ts_clear(_next_div);
+ _ms_per_div = ceill(1000.0 / THROTTLE_PER_SEC);
+ _ops_per_div = ceill(_throttle / THROTTLE_PER_SEC);
+}
+
+Throttle::~Throttle() {}
+
+// Given a random 32-bit value, return a float value equally distributed
+// between -1.0 and 1.0.
+static float rand_signed(uint32_t r) {
+ int sign = ((r & 0x1) == 0 ? 1 : -1);
+ return (((float)r * sign) / UINT32_MAX);
+}
+
+// Each time throttle is called, we sleep and return a number of operations to
+// perform next. To implement this we keep a time calculation in _next_div set
+// initially to the current time + 1/THROTTLE_PER_SEC. Each call to throttle
+// advances _next_div by 1/THROTTLE_PER_SEC, and if _next_div is in the future,
+// we sleep for the difference between the _next_div and the current_time. We
+// always return (Thread.options.throttle / THROTTLE_PER_SEC) as the number of
+// operations.
+//
+// The only variation is that the amount of individual sleeps is modified by a
+// random amount (which varies more widely as Thread.options.throttle_burst is
+// greater). This has the effect of randomizing how much clumping happens, and
+// ensures that multiple threads aren't executing in lock step.
+//
+int Throttle::throttle(uint64_t op_count, uint64_t *op_limit) {
+ uint64_t ops;
+ int64_t sleep_ms;
+ timespec now;
+
+ workgen_epoch(&now);
+ DEBUG_CAPTURE(_runner, "throttle: ops=" << op_count);
+ if (!_started) {
+ _next_div = ts_add_ms(now, _ms_per_div);
+ _started = true;
+ } else {
+ _ops_delta += (op_count - _ops_prev);
+ if (now < _next_div) {
+ sleep_ms = ts_ms(_next_div - now);
+ sleep_ms += (_ms_per_div * _burst *
+ rand_signed(workgen_random(_runner._rand_state)));
+ if (sleep_ms > 0) {
+ DEBUG_CAPTURE(_runner, ", sleep=" << sleep_ms);
+ usleep((useconds_t)ms_to_us(sleep_ms));
+ }
+ }
+ _next_div = ts_add_ms(_next_div, _ms_per_div);
+ }
+ ops = _ops_per_div;
+ if (_ops_delta < (int64_t)ops) {
+ ops -= _ops_delta;
+ _ops_delta = 0;
+ } else {
+ _ops_delta -= ops;
+ ops = 0;
+ }
+ *op_limit = ops;
+ _ops_prev = ops;
+ DEBUG_CAPTURE(_runner, ", return=" << ops << std::endl);
+ return (0);
+}
+
+ThreadOptions::ThreadOptions() : name(), throttle(0.0), throttle_burst(1.0),
+ _options() {
+ _options.add_string("name", name, "name of the thread");
+ _options.add_double("throttle", throttle,
+ "Limit to this number of operations per second");
+ _options.add_double("throttle_burst", throttle_burst,
+ "Changes characteristic of throttling from smooth (0.0) "
+ "to having large bursts with lulls (10.0 or larger)");
+}
+ThreadOptions::ThreadOptions(const ThreadOptions &other) :
+ name(other.name), throttle(other.throttle),
+ throttle_burst(other.throttle_burst), _options(other._options) {}
+ThreadOptions::~ThreadOptions() {}
+
+void
+ThreadListWrapper::extend(const ThreadListWrapper &other) {
+ for (std::vector<Thread>::const_iterator i = other._threads.begin();
+ i != other._threads.end(); i++)
+ _threads.push_back(*i);
+}
+
+void
+ThreadListWrapper::append(const Thread &t) {
+ _threads.push_back(t);
+}
+
+void
+ThreadListWrapper::multiply(const int n) {
+ if (n == 0) {
+ _threads.clear();
+ } else {
+ std::vector<Thread> copy(_threads);
+ for (int cnt = 1; cnt < n; cnt++)
+ extend(copy);
+ }
+}
+
+Thread::Thread() : options(), _op() {
+}
+
+Thread::Thread(const Operation &op) : options(), _op(op) {
+}
+
+Thread::Thread(const Thread &other) : options(other.options), _op(other._op) {
+}
+
+Thread::~Thread() {
+}
+
+void Thread::describe(std::ostream &os) const {
+ os << "Thread: [" << std::endl;
+ _op.describe(os); os << std::endl;
+ os << "]";
+}
+
+Operation::Operation() :
+ _optype(OP_NONE), _table(), _key(), _value(), _transaction(NULL),
+ _group(NULL), _repeatgroup(0),
+ _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+}
+
+Operation::Operation(OpType optype, Table table, Key key, Value value) :
+ _optype(optype), _table(table), _key(key), _value(value),
+ _transaction(NULL), _group(NULL), _repeatgroup(0),
+ _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ size_check();
+}
+
+Operation::Operation(OpType optype, Table table, Key key) :
+ _optype(optype), _table(table), _key(key), _value(), _transaction(NULL),
+ _group(NULL), _repeatgroup(0),
+ _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ size_check();
+}
+
+Operation::Operation(OpType optype, Table table) :
+ _optype(optype), _table(table), _key(), _value(), _transaction(NULL),
+ _group(NULL), _repeatgroup(0),
+ _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ size_check();
+}
+
+Operation::Operation(const Operation &other) :
+ _optype(other._optype), _table(other._table), _key(other._key),
+ _value(other._value), _transaction(other._transaction),
+ _group(other._group), _repeatgroup(other._repeatgroup),
+ _keysize(other._keysize), _valuesize(other._valuesize),
+ _keymax(other._keymax), _valuemax(other._valuemax) {
+ // Creation and destruction of _group and _transaction is managed
+ // by Python.
+}
+
+Operation::~Operation() {
+ // Creation and destruction of _group, _transaction is managed by Python.
+}
+
+Operation& Operation::operator=(const Operation &other) {
+ _optype = other._optype;
+ _table = other._table;
+ _key = other._key;
+ _value = other._value;
+ _transaction = other._transaction;
+ _group = other._group;
+ _repeatgroup = other._repeatgroup;
+ _keysize = other._keysize;
+ _valuesize = other._valuesize;
+ _keymax = other._keymax;
+ _valuemax = other._valuemax;
+ return (*this);
+}
+
+void Operation::describe(std::ostream &os) const {
+ os << "Operation: " << _optype;
+ if (_optype != OP_NONE) {
+ os << ", "; _table.describe(os);
+ os << ", "; _key.describe(os);
+ os << ", "; _value.describe(os);
+ }
+ if (_transaction != NULL) {
+ os << ", ["; _transaction->describe(os); os << "]";
+ }
+ if (_group != NULL) {
+ os << ", group[" << _repeatgroup << "]: {";
+ bool first = true;
+ for (std::vector<Operation>::const_iterator i = _group->begin();
+ i != _group->end(); i++) {
+ if (!first)
+ os << "}, {";
+ i->describe(os);
+ first = false;
+ }
+ os << "}";
+ }
+}
+
+void Operation::get_static_counts(Stats &stats, int multiplier) {
+ switch (_optype) {
+ case OP_NONE:
+ break;
+ case OP_INSERT:
+ stats.insert.ops += multiplier;
+ break;
+ case OP_REMOVE:
+ stats.remove.ops += multiplier;
+ break;
+ case OP_SEARCH:
+ stats.read.ops += multiplier;
+ break;
+ case OP_UPDATE:
+ stats.update.ops += multiplier;
+ break;
+ default:
+ ASSERT(false);
+ }
+ if (_group != NULL)
+ for (std::vector<Operation>::iterator i = _group->begin();
+ i != _group->end(); i++)
+ i->get_static_counts(stats, multiplier * _repeatgroup);
+}
+
+void Operation::kv_compute_max(bool iskey) {
+ uint64_t max;
+ int size;
+
+ size = iskey ? _key._size : _value._size;
+ if (size == 0)
+ size = iskey ? _table.options.key_size : _table.options.value_size;
+
+ if (iskey && size < 2)
+ THROW("Key.size too small for table '" << _table._uri << "'");
+ if (!iskey && size < 1)
+ THROW("Value.size too small for table '" << _table._uri << "'");
+
+ if (size > 1)
+ max = power64(10, (size - 1)) - 1;
+ else
+ max = 0;
+
+ if (iskey) {
+ _keysize = size;
+ _keymax = max;
+ } else {
+ _valuesize = size;
+ _valuemax = max;
+ }
+}
+
+void Operation::kv_size_buffer(bool iskey, size_t &maxsize) const {
+ if (iskey) {
+ if ((size_t)_keysize > maxsize)
+ maxsize = _keysize;
+ } else {
+ if ((size_t)_valuesize > maxsize)
+ maxsize = _valuesize;
+ }
+}
+
+void Operation::kv_gen(bool iskey, uint64_t n, char *result) const {
+ uint64_t max;
+ int size;
+
+ size = iskey ? _keysize : _valuesize;
+ max = iskey ? _keymax : _valuemax;
+ if (n > max)
+ THROW((iskey ? "Key" : "Value") << " (" << n
+ << ") too large for size (" << size << ")");
+ workgen_u64_to_string_zf(n, result, size);
+}
+
+void Operation::size_check() const {
+ if (_optype != OP_NONE && _key._size == 0 && _table.options.key_size == 0)
+ THROW("operation requires a key size");
+ if (OP_HAS_VALUE(this) && _value._size == 0 &&
+ _table.options.value_size == 0)
+ THROW("operation requires a value size");
+}
+
+Track::Track(bool latency_tracking) : ops(0), latency_ops(0), latency(0),
+ min_latency(0), max_latency(0), us(NULL), ms(NULL), sec(NULL) {
+ track_latency(latency_tracking);
+}
+
+Track::Track(const Track &other) : ops(other.ops),
+ latency_ops(other.latency_ops), latency(other.latency),
+ min_latency(other.min_latency), max_latency(other.max_latency),
+ us(NULL), ms(NULL), sec(NULL) {
+ if (other.us != NULL) {
+ us = new uint32_t[LATENCY_US_BUCKETS];
+ ms = new uint32_t[LATENCY_MS_BUCKETS];
+ sec = new uint32_t[LATENCY_SEC_BUCKETS];
+ memcpy(us, other.us, sizeof(uint32_t) * LATENCY_US_BUCKETS);
+ memcpy(ms, other.ms, sizeof(uint32_t) * LATENCY_MS_BUCKETS);
+ memcpy(sec, other.sec, sizeof(uint32_t) * LATENCY_SEC_BUCKETS);
+ }
+}
+
+Track::~Track() {
+ if (us != NULL) {
+ delete us;
+ delete ms;
+ delete sec;
+ }
+}
+
+void Track::add(Track &other, bool reset) {
+ ops += other.ops;
+ latency_ops += other.latency_ops;
+ latency += other.latency;
+
+ min_latency = MIN(min_latency, other.min_latency);
+ if (reset)
+ other.min_latency = 0;
+ max_latency = MAX(max_latency, other.max_latency);
+ if (reset)
+ other.max_latency = 0;
+
+ if (us != NULL && other.us != NULL) {
+ for (int i = 0; i < LATENCY_US_BUCKETS; i++)
+ us[i] += other.us[i];
+ for (int i = 0; i < LATENCY_MS_BUCKETS; i++)
+ ms[i] += other.ms[i];
+ for (int i = 0; i < LATENCY_SEC_BUCKETS; i++)
+ sec[i] += other.sec[i];
+ }
+}
+
+void Track::assign(const Track &other) {
+ ops = other.ops;
+ latency_ops = other.latency_ops;
+ latency = other.latency;
+ min_latency = other.min_latency;
+ max_latency = other.max_latency;
+
+ if (other.us == NULL && us != NULL) {
+ delete us;
+ delete ms;
+ delete sec;
+ us = NULL;
+ ms = NULL;
+ sec = NULL;
+ }
+ else if (other.us != NULL && us == NULL) {
+ us = new uint32_t[LATENCY_US_BUCKETS];
+ ms = new uint32_t[LATENCY_MS_BUCKETS];
+ sec = new uint32_t[LATENCY_SEC_BUCKETS];
+ }
+ if (us != NULL) {
+ memcpy(us, other.us, sizeof(uint32_t) * LATENCY_US_BUCKETS);
+ memcpy(ms, other.ms, sizeof(uint32_t) * LATENCY_MS_BUCKETS);
+ memcpy(sec, other.sec, sizeof(uint32_t) * LATENCY_SEC_BUCKETS);
+ }
+}
+
+uint64_t Track::average_latency() const {
+ if (latency_ops == 0)
+ return (0);
+ else
+ return (latency / latency_ops);
+}
+
+void Track::clear() {
+ ops = 0;
+ latency_ops = 0;
+ latency = 0;
+ min_latency = 0;
+ max_latency = 0;
+ if (us != NULL) {
+ memset(us, 0, sizeof(uint32_t) * LATENCY_US_BUCKETS);
+ memset(ms, 0, sizeof(uint32_t) * LATENCY_MS_BUCKETS);
+ memset(sec, 0, sizeof(uint32_t) * LATENCY_SEC_BUCKETS);
+ }
+}
+
+void Track::incr() {
+ ops++;
+}
+
+void Track::incr_with_latency(uint64_t usecs) {
+ ASSERT(us != NULL);
+
+ ops++;
+ latency_ops++;
+ latency += usecs;
+ if (usecs > max_latency)
+ max_latency = (uint32_t)usecs;
+ if (usecs < min_latency)
+ min_latency = (uint32_t)usecs;
+
+ // Update a latency bucket.
+ // First buckets: usecs from 100us to 1000us at 100us each.
+ if (usecs < LATENCY_US_BUCKETS)
+ us[usecs]++;
+
+ // Second buckets: milliseconds from 1ms to 1000ms, at 1ms each.
+ else if (usecs < ms_to_us(LATENCY_MS_BUCKETS))
+ ms[us_to_ms(usecs)]++;
+
+ // Third buckets are seconds from 1s to 100s, at 1s each.
+ else if (usecs < sec_to_us(LATENCY_SEC_BUCKETS))
+ sec[us_to_sec(usecs)]++;
+
+ // >100 seconds, accumulate in the biggest bucket. */
+ else
+ sec[LATENCY_SEC_BUCKETS - 1]++;
+}
+
+void Track::subtract(const Track &other) {
+ ops -= other.ops;
+ latency_ops -= other.latency_ops;
+ latency -= other.latency;
+
+ // There's no sensible thing to be done for min/max_latency.
+
+ if (us != NULL && other.us != NULL) {
+ for (int i = 0; i < LATENCY_US_BUCKETS; i++)
+ us[i] -= other.us[i];
+ for (int i = 0; i < LATENCY_MS_BUCKETS; i++)
+ ms[i] -= other.ms[i];
+ for (int i = 0; i < LATENCY_SEC_BUCKETS; i++)
+ sec[i] -= other.sec[i];
+ }
+}
+
+// If there are no entries in this Track, take them from
+// a previous Track. Used to smooth graphs. We don't worry
+// about latency buckets here.
+void Track::smooth(const Track &other) {
+ if (latency_ops == 0) {
+ ops = other.ops;
+ latency = other.latency;
+ latency_ops = other.latency_ops;
+ min_latency = other.min_latency;
+ max_latency = other.max_latency;
+ }
+}
+
+void Track::track_latency(bool newval) {
+ if (newval) {
+ if (us == NULL) {
+ us = new uint32_t[LATENCY_US_BUCKETS];
+ ms = new uint32_t[LATENCY_MS_BUCKETS];
+ sec = new uint32_t[LATENCY_SEC_BUCKETS];
+ memset(us, 0, sizeof(uint32_t) * LATENCY_US_BUCKETS);
+ memset(ms, 0, sizeof(uint32_t) * LATENCY_MS_BUCKETS);
+ memset(sec, 0, sizeof(uint32_t) * LATENCY_SEC_BUCKETS);
+ }
+ } else {
+ if (us != NULL) {
+ delete us;
+ delete ms;
+ delete sec;
+ us = NULL;
+ ms = NULL;
+ sec = NULL;
+ }
+ }
+}
+
+void Track::_get_us(long *result) {
+ if (us != NULL) {
+ for (int i = 0; i < LATENCY_US_BUCKETS; i++)
+ result[i] = (long)us[i];
+ } else
+ memset(result, 0, sizeof(long) * LATENCY_US_BUCKETS);
+}
+void Track::_get_ms(long *result) {
+ if (ms != NULL) {
+ for (int i = 0; i < LATENCY_MS_BUCKETS; i++)
+ result[i] = (long)ms[i];
+ } else
+ memset(result, 0, sizeof(long) * LATENCY_MS_BUCKETS);
+}
+void Track::_get_sec(long *result) {
+ if (sec != NULL) {
+ for (int i = 0; i < LATENCY_SEC_BUCKETS; i++)
+ result[i] = (long)sec[i];
+ } else
+ memset(result, 0, sizeof(long) * LATENCY_SEC_BUCKETS);
+}
+
+Stats::Stats(bool latency) : insert(latency), not_found(latency),
+ read(latency), remove(latency), update(latency), truncate(latency) {
+}
+
+Stats::Stats(const Stats &other) : insert(other.insert),
+ not_found(other.not_found), read(other.read), remove(other.remove),
+ update(other.update), truncate(other.truncate) {
+}
+
+Stats::~Stats() {}
+
+void Stats::add(Stats &other, bool reset) {
+ insert.add(other.insert, reset);
+ not_found.add(other.not_found, reset);
+ read.add(other.read, reset);
+ remove.add(other.remove, reset);
+ update.add(other.update, reset);
+ truncate.add(other.truncate, reset);
+}
+
+void Stats::assign(const Stats &other) {
+ insert.assign(other.insert);
+ not_found.assign(other.not_found);
+ read.assign(other.read);
+ remove.assign(other.remove);
+ update.assign(other.update);
+ truncate.assign(other.truncate);
+}
+
+void Stats::clear() {
+ insert.clear();
+ not_found.clear();
+ read.clear();
+ remove.clear();
+ update.clear();
+ truncate.clear();
+}
+
+void Stats::describe(std::ostream &os) const {
+ os << "Stats: reads " << read.ops;
+ if (not_found.ops > 0) {
+ os << " (" << not_found.ops << " not found)";
+ }
+ os << ", inserts " << insert.ops;
+ os << ", updates " << update.ops;
+ os << ", truncates " << truncate.ops;
+ os << ", removes " << remove.ops;
+}
+
+void Stats::final_report(std::ostream &os, timespec &totalsecs) const {
+ uint64_t ops = 0;
+ ops += read.ops;
+ ops += not_found.ops;
+ ops += insert.ops;
+ ops += update.ops;
+ ops += truncate.ops;
+ ops += remove.ops;
+
+#define FINAL_OUTPUT(os, field, singular, ops, totalsecs) \
+ os << "Executed " << field << " " #singular " operations (" \
+ << PCT(field, ops) << "%) " << OPS_PER_SEC(field, totalsecs) \
+ << " ops/sec" << std::endl
+
+ FINAL_OUTPUT(os, read.ops, read, ops, totalsecs);
+ FINAL_OUTPUT(os, not_found.ops, not found, ops, totalsecs);
+ FINAL_OUTPUT(os, insert.ops, insert, ops, totalsecs);
+ FINAL_OUTPUT(os, update.ops, update, ops, totalsecs);
+ FINAL_OUTPUT(os, truncate.ops, truncate, ops, totalsecs);
+ FINAL_OUTPUT(os, remove.ops, remove, ops, totalsecs);
+}
+
+void Stats::report(std::ostream &os) const {
+ os << read.ops << " reads";
+ if (not_found.ops > 0) {
+ os << " (" << not_found.ops << " not found)";
+ }
+ os << ", " << insert.ops << " inserts, ";
+ os << update.ops << " updates, ";
+ os << truncate.ops << " truncates, ";
+ os << remove.ops << " removes";
+}
+
+void Stats::smooth(const Stats &other) {
+ insert.smooth(other.insert);
+ not_found.smooth(other.not_found);
+ read.smooth(other.read);
+ remove.smooth(other.remove);
+ update.smooth(other.update);
+ truncate.smooth(other.truncate);
+}
+
+void Stats::subtract(const Stats &other) {
+ insert.subtract(other.insert);
+ not_found.subtract(other.not_found);
+ read.subtract(other.read);
+ remove.subtract(other.remove);
+ update.subtract(other.update);
+ truncate.subtract(other.truncate);
+}
+
+void Stats::track_latency(bool latency) {
+ insert.track_latency(latency);
+ not_found.track_latency(latency);
+ read.track_latency(latency);
+ remove.track_latency(latency);
+ update.track_latency(latency);
+ truncate.track_latency(latency);
+}
+
+TableOptions::TableOptions() : key_size(0), value_size(0), _options() {
+ _options.add_int("key_size", key_size,
+ "default size of the key, unless overridden by Key.size");
+ _options.add_int("value_size", value_size,
+ "default size of the value, unless overridden by Value.size");
+}
+TableOptions::TableOptions(const TableOptions &other) :
+ key_size(other.key_size), value_size(other.value_size),
+ _options(other._options) {}
+TableOptions::~TableOptions() {}
+
+Table::Table() : options(), _uri(), _internal(new TableInternal()) {
+}
+Table::Table(const char *uri) : options(), _uri(uri),
+ _internal(new TableInternal()) {
+}
+Table::Table(const Table &other) : options(other.options), _uri(other._uri),
+ _internal(new TableInternal(*other._internal)) {
+}
+Table::~Table() { delete _internal; }
+Table& Table::operator=(const Table &other) {
+ options = other.options;
+ _uri = other._uri;
+ *_internal = *other._internal;
+ return (*this);
+}
+
+void Table::describe(std::ostream &os) const {
+ os << "Table: " << _uri;
+}
+
+TableInternal::TableInternal() : _tint(0), _context_count(0) {}
+TableInternal::TableInternal(const TableInternal &other) : _tint(other._tint),
+ _context_count(other._context_count) {}
+TableInternal::~TableInternal() {}
+
+WorkloadOptions::WorkloadOptions() : max_latency(0),
+ report_file("workload.stat"), report_interval(0), run_time(0),
+ sample_file("sample.json"), sample_interval(0), sample_rate(1),
+ _options() {
+ _options.add_int("max_latency", max_latency,
+ "prints warning if any latency measured exceeds this number of "
+ "milliseconds. Requires sample_interval to be configured.");
+ _options.add_int("report_interval", report_interval,
+ "output throughput information every interval seconds, 0 to disable");
+ _options.add_string("report_file", report_file,
+ "file name for collecting run output, "
+ "including output from the report_interval option. "
+ "The file name is relative to the connection's home directory. "
+ "When set to the empty string, stdout is used.");
+ _options.add_int("run_time", run_time, "total workload seconds");
+ _options.add_string("sample_file", sample_file,
+ "file name for collecting latency output in a JSON-like format, "
+ "enabled by the report_interval option. "
+ "The file name is relative to the connection's home directory. "
+ "When set to the empty string, no JSON is emitted.");
+ _options.add_int("sample_interval", sample_interval,
+ "performance logging every interval seconds, 0 to disable");
+ _options.add_int("sample_rate", sample_rate,
+ "how often the latency of operations is measured. 1 for every operation, "
+ "2 for every second operation, 3 for every third operation etc.");
+}
+
+WorkloadOptions::WorkloadOptions(const WorkloadOptions &other) :
+ max_latency(other.max_latency), report_interval(other.report_interval),
+ run_time(other.run_time), sample_interval(other.sample_interval),
+ sample_rate(other.sample_rate), _options(other._options) {}
+WorkloadOptions::~WorkloadOptions() {}
+
+Workload::Workload(Context *context, const ThreadListWrapper &tlw) :
+ options(), stats(), _context(context), _threads(tlw._threads) {
+ if (context == NULL)
+ THROW("Workload contructor requires a Context");
+}
+
+Workload::Workload(Context *context, const Thread &thread) :
+ options(), stats(), _context(context), _threads() {
+ if (context == NULL)
+ THROW("Workload contructor requires a Context");
+ _threads.push_back(thread);
+}
+
+Workload::Workload(const Workload &other) :
+ options(other.options), stats(other.stats), _context(other._context),
+ _threads(other._threads) {}
+Workload::~Workload() {}
+
+Workload& Workload::operator=(const Workload &other) {
+ options = other.options;
+ stats.assign(other.stats);
+ *_context = *other._context;
+ _threads = other._threads;
+ return (*this);
+}
+
+int Workload::run(WT_CONNECTION *conn) {
+ WorkloadRunner runner(this);
+
+ return (runner.run(conn));
+}
+
+WorkloadRunner::WorkloadRunner(Workload *workload) :
+ _workload(workload), _trunners(workload->_threads.size()),
+ _report_out(&std::cout), _start() {
+ ts_clear(_start);
+}
+WorkloadRunner::~WorkloadRunner() {}
+
+int WorkloadRunner::run(WT_CONNECTION *conn) {
+ WT_DECL_RET;
+ WorkloadOptions *options = &_workload->options;
+ std::ofstream report_out;
+
+ _wt_home = conn->get_home(conn);
+ if (options->sample_interval > 0 && options->sample_rate <= 0)
+ THROW("Workload.options.sample_rate must be positive");
+ if (!options->report_file.empty()) {
+ open_report_file(report_out, options->report_file.c_str(),
+ "Workload.options.report_file");
+ _report_out = &report_out;
+ }
+ WT_ERR(create_all(conn, _workload->_context));
+ WT_ERR(open_all());
+ WT_ERR(ThreadRunner::cross_check(_trunners));
+ WT_ERR(run_all());
+ err:
+ //TODO: (void)close_all();
+ _report_out = &std::cout;
+ return (ret);
+}
+
+int WorkloadRunner::open_all() {
+ for (size_t i = 0; i < _trunners.size(); i++) {
+ WT_RET(_trunners[i].open_all());
+ }
+ return (0);
+}
+
+void WorkloadRunner::open_report_file(std::ofstream &of, const char *filename,
+ const char *desc) {
+ std::stringstream sstm;
+
+ if (!_wt_home.empty())
+ sstm << _wt_home << "/";
+ sstm << filename;
+ of.open(sstm.str().c_str(), std::fstream::app);
+ if (!of)
+ THROW_ERRNO(errno, desc << ": \"" << sstm.str()
+ << "\" could not be opened");
+}
+
+int WorkloadRunner::create_all(WT_CONNECTION *conn, Context *context) {
+ for (size_t i = 0; i < _trunners.size(); i++) {
+ ThreadRunner *runner = &_trunners[i];
+ std::stringstream sstm;
+ Thread *thread = &_workload->_threads[i];
+ if (thread->options.name.empty()) {
+ sstm << "thread" << i;
+ thread->options.name = sstm.str();
+ }
+ runner->_thread = thread;
+ runner->_context = context;
+ runner->_icontext = context->_internal;
+ runner->_workload = _workload;
+ runner->_wrunner = this;
+ runner->_number = (uint32_t)i;
+ // TODO: recover from partial failure here
+ WT_RET(runner->create_all(conn));
+ }
+ WT_RET(context->_internal->create_all());
+ return (0);
+}
+
+int WorkloadRunner::close_all() {
+ for (size_t i = 0; i < _trunners.size(); i++)
+ _trunners[i].close_all();
+
+ return (0);
+}
+
+void WorkloadRunner::get_stats(Stats *result) {
+ for (size_t i = 0; i < _trunners.size(); i++)
+ result->add(_trunners[i]._stats);
+}
+
+void WorkloadRunner::report(time_t interval, time_t totalsecs,
+ Stats *prev_totals) {
+ std::ostream &out = *_report_out;
+ Stats new_totals(prev_totals->track_latency());
+
+ get_stats(&new_totals);
+ Stats diff(new_totals);
+ diff.subtract(*prev_totals);
+ prev_totals->assign(new_totals);
+ diff.report(out);
+ out << " in " << interval << " secs ("
+ << totalsecs << " total secs)" << std::endl;
+}
+
+void WorkloadRunner::final_report(timespec &totalsecs) {
+ std::ostream &out = *_report_out;
+ Stats *stats = &_workload->stats;
+
+ stats->clear();
+ stats->track_latency(_workload->options.sample_interval > 0);
+
+ get_stats(stats);
+ stats->final_report(out, totalsecs);
+ out << "Run completed: " << totalsecs << " seconds" << std::endl;
+}
+
+int WorkloadRunner::run_all() {
+ void *status;
+ std::vector<pthread_t> thread_handles;
+ Stats counts(false);
+ WorkgenException *exception;
+ WorkloadOptions *options = &_workload->options;
+ Monitor monitor(*this);
+ std::ofstream monitor_out;
+ std::ofstream monitor_json;
+ std::ostream &out = *_report_out;
+ WT_DECL_RET;
+
+ for (size_t i = 0; i < _trunners.size(); i++)
+ _trunners[i].get_static_counts(counts);
+ out << "Starting workload: " << _trunners.size() << " threads, ";
+ counts.report(out);
+ out << std::endl;
+
+ workgen_epoch(&_start);
+ timespec end = _start + options->run_time;
+ timespec next_report = _start + options->report_interval;
+
+ // Start all threads
+ if (options->sample_interval > 0) {
+ open_report_file(monitor_out, "monitor", "monitor output file");
+ monitor._out = &monitor_out;
+
+ if (!options->sample_file.empty()) {
+ open_report_file(monitor_json, options->sample_file.c_str(),
+ "sample JSON output file");
+ monitor._json = &monitor_json;
+ }
+
+ if ((ret = pthread_create(&monitor._handle, NULL, monitor_main,
+ &monitor)) != 0) {
+ std::cerr << "monitor thread failed err=" << ret << std::endl;
+ return (ret);
+ }
+ }
+
+ for (size_t i = 0; i < _trunners.size(); i++) {
+ pthread_t thandle;
+ ThreadRunner *runner = &_trunners[i];
+ runner->_stop = false;
+ runner->_repeat = (options->run_time != 0);
+ if ((ret = pthread_create(&thandle, NULL, thread_runner_main,
+ runner)) != 0) {
+ std::cerr << "pthread_create failed err=" << ret << std::endl;
+ std::cerr << "Stopping all threads." << std::endl;
+ for (size_t j = 0; j < thread_handles.size(); j++) {
+ _trunners[j]._stop = true;
+ (void)pthread_join(thread_handles[j], &status);
+ _trunners[j].close_all();
+ }
+ return (ret);
+ }
+ thread_handles.push_back(thandle);
+ runner->_stats.clear();
+ }
+
+ // Let the test run, reporting as needed.
+ Stats curstats(false);
+ timespec now = _start;
+ while (now < end) {
+ timespec sleep_amt;
+
+ sleep_amt = end - now;
+ if (next_report != 0) {
+ timespec next_diff = next_report - now;
+ if (next_diff < next_report)
+ sleep_amt = next_diff;
+ }
+ if (sleep_amt.tv_sec > 0)
+ sleep((unsigned int)sleep_amt.tv_sec);
+ else
+ usleep((useconds_t)((sleep_amt.tv_nsec + 999)/ 1000));
+
+ workgen_epoch(&now);
+ if (now >= next_report && now < end && options->report_interval != 0) {
+ report(options->report_interval, (now - _start).tv_sec, &curstats);
+ while (now >= next_report)
+ next_report += options->report_interval;
+ }
+ }
+
+ // signal all threads to stop
+ if (options->run_time != 0)
+ for (size_t i = 0; i < _trunners.size(); i++)
+ _trunners[i]._stop = true;
+ if (options->sample_interval > 0)
+ monitor._stop = true;
+
+ // wait for all threads
+ exception = NULL;
+ for (size_t i = 0; i < _trunners.size(); i++) {
+ WT_TRET(pthread_join(thread_handles[i], &status));
+ if (_trunners[i]._errno != 0)
+ VERBOSE(_trunners[i],
+ "Thread " << i << " has errno " << _trunners[i]._errno);
+ WT_TRET(_trunners[i]._errno);
+ _trunners[i].close_all();
+ if (exception == NULL && !_trunners[i]._exception._str.empty())
+ exception = &_trunners[i]._exception;
+ }
+ if (options->sample_interval > 0) {
+ WT_TRET(pthread_join(monitor._handle, &status));
+ if (monitor._errno != 0)
+ std::cerr << "Monitor thread has errno " << monitor._errno
+ << std::endl;
+ if (exception == NULL && !monitor._exception._str.empty())
+ exception = &monitor._exception;
+
+ monitor_out.close();
+ if (!options->sample_file.empty())
+ monitor_json.close();
+ }
+
+ // issue the final report
+ timespec finalsecs = now - _start;
+ final_report(finalsecs);
+
+ if (ret != 0)
+ std::cerr << "run_all failed err=" << ret << std::endl;
+ (*_report_out) << std::endl;
+ if (exception != NULL)
+ throw *exception;
+ return (ret);
+}
+
+};