summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/bench/workgen
diff options
context:
space:
mode:
authorAlex Gorrod <alexander.gorrod@mongodb.com>2017-06-14 19:39:53 +0000
committerAlex Gorrod <alexander.gorrod@mongodb.com>2017-06-14 19:39:53 +0000
commit11ca50ae96399aa8b0eaeee5dc115398d78fee2b (patch)
treea54af4da3e0dc8f216c191ce23e650e318ce00ac /src/third_party/wiredtiger/bench/workgen
parenta16bc9371921766d41e7a86c3ca10080c677ce6d (diff)
downloadmongo-11ca50ae96399aa8b0eaeee5dc115398d78fee2b.tar.gz
Import wiredtiger: 47e8c3d1d22018eaaa09f91dfd78addb49e0b49b from branch mongodb-3.6
ref: 7aaeaaa054..47e8c3d1d2 for: 3.5.9 WT-2596 Document behavior after a crash with a backup cursor open WT-3169 Add ability to log a message when the lookaside file is used WT-3326 workgen: run wtperf "runner" files WT-3332 Add statistic tracking connection wide transaction conflicts WT-3346 workgen: create JSON output for latency sampling WT-3349 Add timing stats to rwlocks WT-3361 Resolve Windows build warnings, build more test programs on Windows. WT-3362 Cursor opens should never block for the duration of a checkpoint WT-3369 WT_CURSOR->uri should always match the URI used to open the cursor
Diffstat (limited to 'src/third_party/wiredtiger/bench/workgen')
-rw-r--r--src/third_party/wiredtiger/bench/workgen/runner/runner/__init__.py2
-rw-r--r--src/third_party/wiredtiger/bench/workgen/runner/runner/core.py104
-rwxr-xr-xsrc/third_party/wiredtiger/bench/workgen/runner/workgen_stat.sh75
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen.cxx54
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen.h1
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen_func.c13
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen_func.h2
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen_int.h1
-rw-r--r--src/third_party/wiredtiger/bench/workgen/wtperf.py440
9 files changed, 685 insertions, 7 deletions
diff --git a/src/third_party/wiredtiger/bench/workgen/runner/runner/__init__.py b/src/third_party/wiredtiger/bench/workgen/runner/runner/__init__.py
index 67b547bc51b..ed21fffe8dc 100644
--- a/src/third_party/wiredtiger/bench/workgen/runner/runner/__init__.py
+++ b/src/third_party/wiredtiger/bench/workgen/runner/runner/__init__.py
@@ -88,5 +88,5 @@ except:
shutil.rmtree('WT_TEST', True)
os.mkdir('WT_TEST')
-from .core import txn, extensions_config
+from .core import txn, extensions_config, op_group_transaction, op_log_like, op_multi_table
from .latency import workload_latency
diff --git a/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py b/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py
index a0f0d4d77cd..2c8311c4ca7 100644
--- a/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py
+++ b/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py
@@ -29,12 +29,12 @@
# runner/core.py
# Core functions available to all runners
import glob, os
-import workgen
+from workgen import Key, Operation, OpList, Table, Transaction, Value
# txn --
# Put the operation (and any suboperations) within a transaction.
def txn(op, config=None):
- t = workgen.Transaction(config)
+ t = Transaction(config)
op._transaction = t
return op
@@ -99,3 +99,103 @@ def extensions_config(exts):
if len(extfiles) != 0:
result = ',extensions=[' + ','.join(extfiles.values()) + ']'
return result
+
+def _op_multi_table_as_list(ops_arg, tables):
+ result = []
+ if ops_arg._optype != Operation.OP_NONE:
+ for table in tables:
+ result.append(Operation(ops_arg._optype, table, ops_arg._key, ops_arg._value))
+ else:
+ for op in ops._group:
+ result.extend(_op_multi_table_as_list(op, tables))
+ return result
+
+# A convenient way to build a list of operations
+def op_append(op1, op2):
+ if op1 == None:
+ op1 = op2
+ else:
+ op1 += op2
+ return op1
+
+# Emulate wtperf's table_count option. Spread the given operations over
+# a set of tables.
+def op_multi_table(ops_arg, tables):
+ ops = None
+ for op in _op_multi_table_as_list(ops_arg, tables):
+ ops = op_append(ops, op)
+ return ops
+
+# should be 8 bytes format 'Q'
+_logkey = Key(Key.KEYGEN_APPEND, 8)
+def _op_log_op(op, log_table):
+ keysize = op._key._size
+ if keysize == 0:
+ keysize = op._table.options.key_size
+ valuesize = op._value._size
+ if valuesize == 0:
+ valuesize = op._table.options.value_size
+ v = Value(keysize + valuesize)
+ return Operation(Operation.OP_INSERT, log_table, _logkey, v)
+
+def _optype_is_write(optype):
+ return optype == Operation.OP_INSERT or optype == Operation.OP_UPDATE or \
+ optype == Operation.OP_REMOVE
+
+# Emulate wtperf's log_like option. For all operations, add a second
+# insert operation going to a log table.
+def op_log_like(op, log_table, ops_per_txn):
+ if op._optype != Operation.OP_NONE:
+ if _optype_is_write(op._optype):
+ op += _op_log_op(op, log_table)
+ if ops_per_txn == 0:
+ op = txn(op) # txn for each action.
+ else:
+ oplist = []
+ for op2 in op._group:
+ if op2._optype == Operation.OP_NONE:
+ oplist.append(op_log_like(op2, log_table))
+ elif ops_per_txn == 0 and _optype_is_write(op2._optype):
+ op2 += _op_log_op(op2, log_table)
+ oplist.append(txn(op2)) # txn for each action.
+ else:
+ oplist.append(op2)
+ if _optype_is_write(op2._optype):
+ oplist.append(_op_log_op(op2, log_table))
+ op._group = OpList(oplist)
+ return op
+
+def _op_transaction_list(oplist, txn_config):
+ result = None
+ for op in oplist:
+ result = op_append(result, op)
+ return txn(result, txn_config)
+
+# Emulate wtperf's ops_per_txn option. Create transactions around
+# groups of operations of the indicated size.
+def op_group_transaction(ops_arg, ops_per_txn, txn_config):
+ if ops_arg != Operation.OP_NONE:
+ return txn(ops_arg, txn_config)
+ if ops_arg._transaction != None:
+ raise Exception('nested transactions not supported')
+ if ops_arg._repeatgroup != None:
+ raise Exception('grouping transactions with multipliers not supported')
+
+ oplist = []
+ ops = None
+ nops = 0
+ txgroup = []
+ for op in ops_arg._group:
+ if op.optype == Operation.OP_NONE:
+ oplist.append(_op_transaction_list(txgroup, txn_config))
+ txgroup = []
+ oplist.append(op)
+ else:
+ txgroup.append(op)
+ if len(txgroup) >= ops_per_txn:
+ oplist.append(_op_transaction_list(txgroup, txn_config))
+ txgroup = []
+ if len(txgroup) > 0:
+ oplist.append(_op_transaction_list(txgroup, txn_config))
+ ops_arg._group = OpList(oplist)
+ return ops_arg
diff --git a/src/third_party/wiredtiger/bench/workgen/runner/workgen_stat.sh b/src/third_party/wiredtiger/bench/workgen/runner/workgen_stat.sh
new file mode 100755
index 00000000000..1739c29859e
--- /dev/null
+++ b/src/third_party/wiredtiger/bench/workgen/runner/workgen_stat.sh
@@ -0,0 +1,75 @@
+#!/bin/bash
+#
+# workgen_stat.sh - combine JSON time series output from WT and workgen.
+#
+Usage() {
+ cat <<EOF
+Usage: $0 [ options ]
+Options:
+ -h <WT_home_directory> # set the WiredTiger home directory
+ -e <analyzer_name> # run analyzer on the combined files
+ -o <output_file> # output file for result
+
+At least one of '-t2' or '-o' must be selected.
+EOF
+ exit 1
+}
+
+Filter() {
+ sed -e 's/"version" *: *"[^"]*",//' "$@"
+}
+
+wthome=.
+outfile=
+analyze=
+
+while [ "$#" != 0 ]; do
+ arg="$1"
+ shift
+ case "$arg" in
+ -h )
+ if [ $# = 0 ]; then
+ Usage
+ fi
+ wthome="$1"
+ shift
+ ;;
+ -o )
+ if [ $# = 0 ]; then
+ Usage
+ fi
+ outfile="$1"
+ shift
+ ;;
+ -e )
+ if [ $# = 0 ]; then
+ Usage
+ fi
+ analyze="$1"
+ shift
+ ;;
+ esac
+done
+if [ ! -d "$wthome" ]; then
+ echo "$wthome: WT home directory does not exist"
+ exit 1
+fi
+if [ ! -f "$wthome/WiredTiger.wt" ]; then
+ echo "$wthome: directory is not a WiredTiger home directory"
+ exit 1
+fi
+if [ "$outfile" = '' ]; then
+ if [ "$analyze" = false ]; then
+ Usage
+ fi
+ outfile="$wthome/stat_tmp.json"
+fi
+(cd $wthome; Filter WiredTigerStat.* sample.json) | sort > $outfile
+if [ "$analyze" != '' ]; then
+ sysname=`uname -s`
+ if [ "$sysname" = Darwin ]; then
+ open -a "$analyze" "$outfile"
+ else
+ "$analyze" "$outfile"
+ fi
+fi
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.cxx b/src/third_party/wiredtiger/bench/workgen/workgen.cxx
index c56acfd2989..880b8ca6467 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen.cxx
+++ b/src/third_party/wiredtiger/bench/workgen/workgen.cxx
@@ -267,16 +267,18 @@ int ContextInternal::create_all() {
}
Monitor::Monitor(WorkloadRunner &wrunner) :
- _errno(0), _exception(), _wrunner(wrunner), _stop(false), _handle() {}
+ _errno(0), _exception(), _wrunner(wrunner), _stop(false), _handle(),
+ _out(NULL), _json(NULL) {}
Monitor::~Monitor() {}
int Monitor::run() {
struct timespec t;
struct tm *tm, _tm;
- char time_buf[64];
+ char time_buf[64], version[100];
Stats prev_totals;
WorkloadOptions *options = &_wrunner._workload->options;
uint64_t latency_max = (uint64_t)options->max_latency;
+ bool first;
(*_out) << "#time,"
<< "totalsec,"
@@ -295,6 +297,8 @@ int Monitor::run() {
<< "update maximum latency(uS)"
<< std::endl;
+ first = true;
+ workgen_version(version, sizeof(version));
Stats prev_interval;
while (!_stop) {
for (int i = 0; i < options->sample_interval && !_stop; i++)
@@ -337,6 +341,32 @@ int Monitor::run() {
<< "," << interval.update.max_latency
<< std::endl;
+ if (_json != NULL) {
+#define WORKGEN_TIMESTAMP_JSON "%Y-%m-%dT%H:%M:%S.000Z"
+ (void)strftime(time_buf, sizeof(time_buf),
+ WORKGEN_TIMESTAMP_JSON, tm);
+
+#define TRACK_JSON(name, t) \
+ "\"" << (name) << "\":{" \
+ << "\"ops per sec\":" << ((t).ops / interval_secs) \
+ << ",\"average latency\":" << (t).average_latency() \
+ << ",\"min latency\":" << (t).min_latency \
+ << ",\"max latency\":" << (t).max_latency \
+ << "}"
+
+ (*_json) << "{";
+ if (first) {
+ (*_json) << "\"version\":\"" << version << "\",";
+ first = false;
+ }
+ (*_json) << "\"localTime\":\"" << time_buf
+ << "\",\"workgen\":{"
+ << TRACK_JSON("read", interval.read) << ","
+ << TRACK_JSON("insert", interval.insert) << ","
+ << TRACK_JSON("update", interval.update)
+ << "}}" << std::endl;
+ }
+
uint64_t read_max = interval.read.max_latency;
uint64_t insert_max = interval.read.max_latency;
uint64_t update_max = interval.read.max_latency;
@@ -1315,8 +1345,8 @@ TableInternal::TableInternal(const TableInternal &other) : _tint(other._tint),
TableInternal::~TableInternal() {}
WorkloadOptions::WorkloadOptions() : max_latency(0),
- report_file("workload.stat"), report_interval(0),
- run_time(0), sample_interval(0), sample_rate(1),
+ report_file("workload.stat"), report_interval(0), run_time(0),
+ sample_file("sample.json"), sample_interval(0), sample_rate(1),
_options() {
_options.add_int("max_latency", max_latency,
"prints warning if any latency measured exceeds this number of "
@@ -1329,6 +1359,11 @@ WorkloadOptions::WorkloadOptions() : max_latency(0),
"The file name is relative to the connection's home directory. "
"When set to the empty string, stdout is used.");
_options.add_int("run_time", run_time, "total workload seconds");
+ _options.add_string("sample_file", sample_file,
+ "file name for collecting latency output in a JSON-like format, "
+ "enabled by the report_interval option. "
+ "The file name is relative to the connection's home directory. "
+ "When set to the empty string, no JSON is emitted.");
_options.add_int("sample_interval", sample_interval,
"performance logging every interval seconds, 0 to disable");
_options.add_int("sample_rate", sample_rate,
@@ -1492,6 +1527,7 @@ int WorkloadRunner::run_all() {
WorkloadOptions *options = &_workload->options;
Monitor monitor(*this);
std::ofstream monitor_out;
+ std::ofstream monitor_json;
std::ostream &out = *_report_out;
WT_DECL_RET;
@@ -1510,6 +1546,12 @@ int WorkloadRunner::run_all() {
open_report_file(monitor_out, "monitor", "monitor output file");
monitor._out = &monitor_out;
+ if (!options->sample_file.empty()) {
+ open_report_file(monitor_json, options->sample_file.c_str(),
+ "sample JSON output file");
+ monitor._json = &monitor_json;
+ }
+
if ((ret = pthread_create(&monitor._handle, NULL, monitor_main,
&monitor)) != 0) {
std::cerr << "monitor thread failed err=" << ret << std::endl;
@@ -1588,6 +1630,10 @@ int WorkloadRunner::run_all() {
<< std::endl;
if (exception == NULL && !monitor._exception._str.empty())
exception = &monitor._exception;
+
+ monitor_out.close();
+ if (!options->sample_file.empty())
+ monitor_json.close();
}
// issue the final report
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.h b/src/third_party/wiredtiger/bench/workgen/workgen.h
index c1ae01ed5a4..c7be8ee0035 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen.h
+++ b/src/third_party/wiredtiger/bench/workgen/workgen.h
@@ -358,6 +358,7 @@ struct WorkloadOptions {
int run_time;
int sample_interval;
int sample_rate;
+ std::string sample_file;
WorkloadOptions();
WorkloadOptions(const WorkloadOptions &other);
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen_func.c b/src/third_party/wiredtiger/bench/workgen/workgen_func.c
index 2e1271a515e..5ce2146a8e4 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen_func.c
+++ b/src/third_party/wiredtiger/bench/workgen/workgen_func.c
@@ -87,3 +87,16 @@ workgen_u64_to_string_zf(uint64_t n, char *buf, size_t len)
{
u64_to_string_zf(n, buf, len);
}
+
+#define WORKGEN_VERSION_PREFIX "workgen-"
+extern void
+workgen_version(char *buf, size_t len)
+{
+ size_t prefix_len;
+
+ prefix_len = strlen(WORKGEN_VERSION_PREFIX);
+ (void)strncpy(buf, WORKGEN_VERSION_PREFIX, len);
+ if (len > prefix_len)
+ (void)strncpy(&buf[prefix_len], WIREDTIGER_VERSION_STRING,
+ len - prefix_len);
+}
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen_func.h b/src/third_party/wiredtiger/bench/workgen/workgen_func.h
index 20ebf2632cc..ec7ecf0a504 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen_func.h
+++ b/src/third_party/wiredtiger/bench/workgen/workgen_func.h
@@ -42,3 +42,5 @@ extern void
workgen_random_free(struct workgen_random_state *rnd_state);
extern void
workgen_u64_to_string_zf(uint64_t n, char *buf, size_t len);
+extern void
+workgen_version(char *buf, size_t len);
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen_int.h b/src/third_party/wiredtiger/bench/workgen/workgen_int.h
index 01fb727691b..9283aea1d7b 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen_int.h
+++ b/src/third_party/wiredtiger/bench/workgen/workgen_int.h
@@ -146,6 +146,7 @@ struct Monitor {
volatile bool _stop;
pthread_t _handle;
std::ostream *_out;
+ std::ostream *_json;
Monitor(WorkloadRunner &wrunner);
~Monitor();
diff --git a/src/third_party/wiredtiger/bench/workgen/wtperf.py b/src/third_party/wiredtiger/bench/workgen/wtperf.py
new file mode 100644
index 00000000000..3a196fe7b57
--- /dev/null
+++ b/src/third_party/wiredtiger/bench/workgen/wtperf.py
@@ -0,0 +1,440 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2017 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+
+# wtperf.py
+# A partial emulation of wtperf. Translates a .wtperf file into a Python
+# script that uses the workgen module, and runs the script. Errors are
+# issued for any .wtperf directives that are not known.
+# See also the usage() function.
+#
+from __future__ import print_function
+import os, sys, tempfile
+
+def eprint(*args, **kwargs):
+ print(*args, file=sys.stderr, **kwargs)
+
+class OptionValue:
+ def __init__(self, value, filename, linenum):
+ self.value = value
+ self.filename = filename
+ self.linenum = linenum
+
+class TranslateException(Exception):
+ pass
+
+class Options(object):
+ pass
+
+class Translator:
+ def __init__(self, filename, prefix, verbose):
+ self.filename = filename
+ self.prefix = prefix
+ self.verbose = verbose
+ self.linenum = 0
+ self.opts = {}
+ self.used_opts = {}
+ self.has_error = False
+
+ def error_file_line(self, fname, linenum, msg):
+ self.has_error = True
+ eprint(fname + ':' + str(linenum) + ': error: ' + msg)
+
+ # Report an error and continue
+ def error(self, msg):
+ self.error_file_line(self.filename, self.linenum, msg)
+
+ # Report an error and unwind the stack
+ def fatal_error(self, msg, errtype):
+ self.error(msg)
+ raise TranslateException(errtype)
+
+ supported_opt_list = [ 'compression', 'conn_config', 'icount',
+ 'key_sz', 'log_like_table',
+ 'populate_ops_per_txn', 'populate_threads',
+ 'reopen_connection',
+ 'table_config', 'table_count',
+ 'threads', 'transaction_config', 'value_sz' ]
+
+ def set_opt(self, optname, val):
+ if optname not in self.supported_opt_list:
+ self.error("unknown option: " + optname)
+ return
+ elif val[0] == '"' and val[-1] == '"':
+ v = val[1:-1]
+ elif val == 'true':
+ v = True
+ elif val == 'false':
+ v = False
+ elif val[0] == '(':
+ v = val # config string stored as is
+ else:
+ try:
+ v = int(val) # it might be an integer
+ except ValueError:
+ v = val # it's a string after all
+ self.opts[optname] = OptionValue(v, self.filename, self.linenum)
+
+ def get_opt(self, optname, dfault):
+ if optname in self.opts:
+ ret = self.opts[optname]
+ self.filename = ret.filename
+ self.linenum = ret.linenum
+ self.used_opts[optname] = 1
+ return ret.value
+ else:
+ return dfault
+
+ def get_int_opt(self, optname, dfault):
+ return self.get_opt(optname, dfault) + 0
+
+ def get_boolean_opt(self, optname, dfault):
+ return not not self.get_opt(optname, dfault)
+
+ # Split a string 'left_side=right_side' into two parts
+ def split_assign(self, s):
+ equalpos = s.find('=')
+ if equalpos < 0:
+ self.error("missing '=' for line: " + line)
+ return (None, None)
+ else:
+ return s.split('=', 1)
+
+ # Split a config string honoring nesting e.g.
+ # "(abc=123,def=234,ghi=(hi=1,bye=2))" would return 3 items.
+ def split_config_parens(self, s):
+ if s[0:1] != '(':
+ import pdb
+ pdb.set_trace()
+ self.fatal_error('missing left paren', 'config parse error')
+ if s[-1:] != ')':
+ self.fatal_error('missing right paren', 'config parse error')
+ s = s[1:-1]
+ result = []
+ level = 0
+ cur = ''
+ for ch in s:
+ if ch == ',' and level == 0:
+ result.append(cur)
+ cur = ''
+ else:
+ cur += ch
+ if ch == '(':
+ level += 1
+ elif ch == ')':
+ level -= 1
+ if level < 0:
+ self.fatal_error('unbalanced paren', 'config parse error')
+ if level != 0:
+ self.fatal_error('unbalanced paren', 'config parse error')
+ if len(cur) != 0:
+ result.append(cur)
+ return result
+
+ def assign_str(self, left, right):
+ return left + '=' + str(right) + '\n'
+
+ def add_operation_str(self, count, opname, multi):
+ result = ''
+ tablename = 'tables[0]' if multi else 'table'
+ if count > 1:
+ result += str(count) + ' * '
+ if count > 0:
+ result += 'Operation(Operation.' + opname + ', ' + \
+ tablename + ') + \\\n'
+ result += ' '
+ return result
+
+ # Wtperf's throttle is based on the number of regular operations,
+ # not including log_like operations. Workgen counts all operations,
+ # it doesn't treat log operations any differently. Adjust the throttle
+ # number to account for the difference.
+ def calc_throttle(self, thread_opts, log_like_table):
+ throttle = thread_opts.throttle
+ if not log_like_table:
+ return (throttle, '')
+ modify = thread_opts.inserts + thread_opts.updates
+ regular = modify + thread_opts.reads
+ total = regular + modify
+ factor = (total + 0.0) / regular
+ new_throttle = int(throttle * factor)
+ if new_throttle == throttle:
+ comment = ''
+ else:
+ comment = '# wtperf throttle=' + str(throttle) + ' adjusted by ' + \
+ str(factor) + ' to compensate for log_like operations.\n'
+ return (new_throttle, comment)
+
+ def parse_threads(self, threads_config):
+ tdecls = ''
+ tlist = self.split_config_parens(threads_config)
+ table_count = self.get_int_opt('table_count', 1)
+ log_like_table = self.get_boolean_opt('log_like_table', False)
+ txn_config = self.get_opt('transaction_config', '')
+ if log_like_table:
+ tdecls += 'log_name = "table:log"\n'
+ tdecls += 's.create(log_name, "key_format=S,value_format=S," +' + \
+ ' compress_table_config)\n'
+ tdecls += 'log_table = Table(log_name)\n\n'
+ thread_count = 0
+ tnames = ''
+ multi = (table_count > 1)
+ for t in tlist:
+ thread_name = 'thread' + str(thread_count)
+ thread_count += 1
+
+ # For wtperf compatibility, we allow both 'insert/inserts' etc.
+ topts = Options()
+ topts.count = 1
+ topts.insert = 0
+ topts.inserts = 0
+ topts.ops_per_txn = 0
+ topts.read = 0
+ topts.reads = 0
+ topts.throttle = 0
+ topts.update = 0
+ topts.updates = 0
+
+ for o in self.split_config_parens(t):
+ (k, v) = self.split_assign(o)
+ if hasattr(topts, k):
+ try:
+ setattr(topts, k, int(v))
+ except ValueError:
+ self.error('thread option ' + k + ': integer expected')
+ else:
+ self.error('unknown thread option: ' + k)
+
+ topts.inserts += topts.insert; topts.insert = 0
+ topts.updates += topts.update; topts.update = 0
+ topts.reads += topts.read; topts.read = 0
+ if topts.count == 0:
+ continue
+
+ if topts.inserts + topts.reads + topts.updates == 0:
+ self.fatal_error('need read/insert/update/...',
+ 'thread config error')
+ tdecls += 'ops = '
+ tdecls += self.add_operation_str(topts.inserts, 'OP_INSERT', multi)
+ tdecls += self.add_operation_str(topts.reads, 'OP_SEARCH', multi)
+ tdecls += self.add_operation_str(topts.updates, 'OP_UPDATE', multi)
+ tdecls = tdecls.rstrip(' \n\\+') + '\n'
+ if multi:
+ tdecls += 'ops = op_multi_table(ops, tables)\n'
+ if topts.ops_per_txn > 0:
+ tdecls += 'ops = op_group_transaction(ops, ' + \
+ str(topts.ops_per_txn) + ', "' + txn_config + '")\n'
+ if log_like_table:
+ tdecls += 'ops = op_log_like(ops, log_table, ' + \
+ str(topts.ops_per_txn) + ')\n'
+ tdecls += thread_name + ' = Thread(ops)\n'
+ if topts.throttle > 0:
+ (throttle, comment) = self.calc_throttle(topts, log_like_table)
+ tdecls += comment
+ tdecls += self.assign_str(thread_name + '.options.throttle',
+ throttle)
+ tdecls += '\n'
+ if topts.count > 1:
+ tnames += str(topts.count) + ' * '
+ tnames += thread_name + ' + '
+
+ tnames = tnames.rstrip(' +')
+ return (tdecls, tnames)
+
+ def translate(self):
+ try:
+ return self.translate_inner()
+ except TranslateException:
+ # An error has already been reported
+ return None
+
+ def translate_inner(self):
+ workloadopts = ''
+ with open(self.filename) as fin:
+ for line in fin:
+ self.linenum += 1
+ commentpos = line.find('#')
+ if commentpos >= 0:
+ line = line[0:commentpos]
+ line = line.strip()
+ if len(line) == 0:
+ continue
+ (key, val) = self.split_assign(line)
+ if key in [ 'max_latency', 'report_file', 'report_interval',
+ 'run_time', 'sample_interval', 'sample_rate' ]:
+ workloadopts += 'workload.options.' + key + '=' + val + '\n'
+ else:
+ self.set_opt(key, val)
+
+ table_count = self.get_int_opt('table_count', 1)
+ conn_config = self.get_opt('conn_config', '')
+ table_config = self.get_opt('table_config', '')
+ key_sz = self.get_int_opt('key_sz', 20)
+ value_sz = self.get_int_opt('value_sz', 100)
+ reopen = self.get_boolean_opt('reopen_connection', False)
+ compression = self.get_opt('compression', '')
+ txn_config = self.get_opt('transaction_config', '')
+
+ s = '#/usr/bin/env python\n'
+ s += '# generated from ' + self.filename + '\n'
+ s += self.prefix
+ s += 'from runner import *\n'
+ s += 'from wiredtiger import *\n'
+ s += 'from workgen import *\n'
+ s += '\n'
+ s += 'context = Context()\n'
+ s += 'conn_config = "' + conn_config + '"\n'
+ if compression != '':
+ s += 'conn_config += extensions_config(["compressors/' + \
+ compression + '"])\n'
+ compression = 'block_compressor=' + compression + ','
+ s += 'conn = wiredtiger_open("WT_TEST", "create," + conn_config)\n'
+ s += 's = conn.open_session()\n'
+ s += '\n'
+ s += 'wtperf_table_config = "key_format=S,value_format=S,type=lsm," +\\\n'
+ s += ' "exclusive=true,allocation_size=4kb," +\\\n'
+ s += ' "internal_page_max=64kb,leaf_page_max=4kb,split_pct=100,"\n'
+ s += 'compress_table_config = "' + compression + '"\n'
+ s += 'table_config = "' + table_config + '"\n'
+ if table_count == 1:
+ s += 'tname = "file:test.wt"\n'
+ s += 's.create(tname, wtperf_table_config +\\\n'
+ s += ' compress_table_config + table_config)\n'
+ s += 'table = Table(tname)\n'
+ s += 'table.options.key_size = ' + str(key_sz) + '\n'
+ s += 'table.options.value_size = ' + str(value_sz) + '\n'
+ else:
+ s += 'table_count = ' + str(table_count) + '\n'
+ s += 'tables = []\n'
+ s += 'for i in range(0, table_count):\n'
+ s += ' tname = "file:test" + str(i) + ".wt"\n'
+ s += ' s.create(tname, ' + \
+ 'wtperf_table_config + ' + \
+ 'compress_table_config + table_config)\n'
+ s += ' t = Table(tname)\n'
+ s += ' t.options.key_size = ' + str(key_sz) + '\n'
+ s += ' t.options.value_size = ' + str(value_sz) + '\n'
+ s += ' tables.append(t)\n'
+ s += '\n'
+
+ icount = self.get_int_opt('icount', 0)
+ pop_thread = self.get_int_opt('populate_threads', 1)
+ pop_per_txn = self.get_int_opt('populate_ops_per_txn', 0)
+ if icount != 0:
+ if pop_thread == 0:
+ self.fatal_error('icount != 0 and populate_threads == 0: ' +\
+ 'cannot populate entries with no threads')
+ elif pop_thread == 1:
+ mult = ''
+ else:
+ mult = str(pop_thread) + ' * '
+
+ # if there are multiple tables to be filled during populate,
+ # the icount is split between them all.
+ nops_per_thread = icount / (pop_thread * table_count)
+ if table_count == 1:
+ s += 'pop_ops = Operation(Operation.OP_INSERT, table)\n'
+ else:
+ s += 'pop_ops = Operation(Operation.OP_INSERT, tables[0])\n'
+ s += 'pop_ops = op_multi_table(pop_ops, tables)\n'
+ if pop_per_txn > 0:
+ s += 'pop_ops = op_group_transaction(pop_ops, ' + \
+ str(pop_per_txn) + ', "' + txn_config + '")\n'
+ s += 'pop_thread = Thread(pop_ops * ' + str(nops_per_thread) + ')\n'
+ s += 'pop_workload = Workload(context, ' + mult + 'pop_thread)\n'
+ if self.verbose > 0:
+ s += 'print("populate:")\n'
+ s += 'pop_workload.run(conn)\n'
+ else:
+ if self.get_int_opt('populate_threads', 0) != 0:
+ self.error("populate_threads > 0, icount == 0")
+
+ thread_config = self.get_opt('threads', '')
+ if thread_config != '':
+ (t_create, t_var) = self.parse_threads(thread_config)
+ s += '\n' + t_create
+ if reopen:
+ s += '\n# reopen the connection\n'
+ s += 'conn.close()\n'
+ s += 'conn = wiredtiger_open(' + \
+ '"WT_TEST", "create," + conn_config)\n'
+ s += '\n'
+ s += 'workload = Workload(context, ' + t_var + ')\n'
+ s += workloadopts
+ if self.verbose > 0:
+ s += 'print("workload:")\n'
+ s += 'workload.run(conn)\n'
+
+ for o in self.used_opts:
+ del self.opts[o]
+ if len(self.opts) != 0:
+ self.error('internal error, options not handled: ' + str(self.opts))
+ return s
+
+def usage():
+ eprint((
+ 'Usage: python wtperf.py [ options ] file.wtperf ...\n'
+ '\n'
+ 'Options:\n'
+ ' --python Python output generated on stdout\n'
+ ' -v --verbose Verbose output\n'
+ '\n'
+ 'If --python is not specified, the resulting workload is run.'))
+
+verbose = 0
+py_out = False
+workgen_dir = os.path.dirname(os.path.abspath(__file__))
+runner_dir = os.path.join(workgen_dir, 'runner')
+prefix = (
+ '# The next lines are unneeded if this script is in the runner directory.\n'
+ 'import sys\n'
+ 'sys.path.append("' + runner_dir + '")\n\n')
+
+exit_status = 0
+for arg in sys.argv[1:]:
+ if arg == '--python':
+ py_out = True
+ elif arg == '--verbose' or arg == '-v':
+ verbose += 1
+ elif arg.endswith('.wtperf'):
+ translator = Translator(arg, prefix, verbose)
+ pysrc = translator.translate()
+ if translator.has_error:
+ exit_status = 1
+ elif py_out:
+ print(pysrc)
+ else:
+ (outfd, tmpfile) = tempfile.mkstemp(suffix='.py')
+ os.write(outfd, pysrc)
+ os.close(outfd)
+ execfile(tmpfile)
+ os.remove(tmpfile)
+ else:
+ usage()
+ sys.exit(1)
+sys.exit(exit_status)