summaryrefslogtreecommitdiff
path: root/bench
diff options
context:
space:
mode:
Diffstat (limited to 'bench')
-rw-r--r--bench/workgen/runner/runner/__init__.py2
-rw-r--r--bench/workgen/runner/runner/core.py104
-rwxr-xr-xbench/workgen/runner/workgen_stat.sh75
-rw-r--r--bench/workgen/workgen.cxx54
-rw-r--r--bench/workgen/workgen.h1
-rw-r--r--bench/workgen/workgen_func.c13
-rw-r--r--bench/workgen/workgen_func.h2
-rw-r--r--bench/workgen/workgen_int.h1
-rw-r--r--bench/workgen/wtperf.py440
-rw-r--r--bench/wtperf/idle_table_cycle.c54
-rw-r--r--bench/wtperf/wtperf.c165
-rw-r--r--bench/wtperf/wtperf.h6
12 files changed, 770 insertions, 147 deletions
diff --git a/bench/workgen/runner/runner/__init__.py b/bench/workgen/runner/runner/__init__.py
index 67b547bc51b..ed21fffe8dc 100644
--- a/bench/workgen/runner/runner/__init__.py
+++ b/bench/workgen/runner/runner/__init__.py
@@ -88,5 +88,5 @@ except:
shutil.rmtree('WT_TEST', True)
os.mkdir('WT_TEST')
-from .core import txn, extensions_config
+from .core import txn, extensions_config, op_group_transaction, op_log_like, op_multi_table
from .latency import workload_latency
diff --git a/bench/workgen/runner/runner/core.py b/bench/workgen/runner/runner/core.py
index a0f0d4d77cd..2c8311c4ca7 100644
--- a/bench/workgen/runner/runner/core.py
+++ b/bench/workgen/runner/runner/core.py
@@ -29,12 +29,12 @@
# runner/core.py
# Core functions available to all runners
import glob, os
-import workgen
+from workgen import Key, Operation, OpList, Table, Transaction, Value
# txn --
# Put the operation (and any suboperations) within a transaction.
def txn(op, config=None):
- t = workgen.Transaction(config)
+ t = Transaction(config)
op._transaction = t
return op
@@ -99,3 +99,103 @@ def extensions_config(exts):
if len(extfiles) != 0:
result = ',extensions=[' + ','.join(extfiles.values()) + ']'
return result
+
+def _op_multi_table_as_list(ops_arg, tables):
+ result = []
+ if ops_arg._optype != Operation.OP_NONE:
+ for table in tables:
+ result.append(Operation(ops_arg._optype, table, ops_arg._key, ops_arg._value))
+ else:
+ for op in ops._group:
+ result.extend(_op_multi_table_as_list(op, tables))
+ return result
+
+# A convenient way to build a list of operations
+def op_append(op1, op2):
+ if op1 == None:
+ op1 = op2
+ else:
+ op1 += op2
+ return op1
+
+# Emulate wtperf's table_count option. Spread the given operations over
+# a set of tables.
+def op_multi_table(ops_arg, tables):
+ ops = None
+ for op in _op_multi_table_as_list(ops_arg, tables):
+ ops = op_append(ops, op)
+ return ops
+
+# should be 8 bytes format 'Q'
+_logkey = Key(Key.KEYGEN_APPEND, 8)
+def _op_log_op(op, log_table):
+ keysize = op._key._size
+ if keysize == 0:
+ keysize = op._table.options.key_size
+ valuesize = op._value._size
+ if valuesize == 0:
+ valuesize = op._table.options.value_size
+ v = Value(keysize + valuesize)
+ return Operation(Operation.OP_INSERT, log_table, _logkey, v)
+
+def _optype_is_write(optype):
+ return optype == Operation.OP_INSERT or optype == Operation.OP_UPDATE or \
+ optype == Operation.OP_REMOVE
+
+# Emulate wtperf's log_like option. For all operations, add a second
+# insert operation going to a log table.
+def op_log_like(op, log_table, ops_per_txn):
+ if op._optype != Operation.OP_NONE:
+ if _optype_is_write(op._optype):
+ op += _op_log_op(op, log_table)
+ if ops_per_txn == 0:
+ op = txn(op) # txn for each action.
+ else:
+ oplist = []
+ for op2 in op._group:
+ if op2._optype == Operation.OP_NONE:
+ oplist.append(op_log_like(op2, log_table))
+ elif ops_per_txn == 0 and _optype_is_write(op2._optype):
+ op2 += _op_log_op(op2, log_table)
+ oplist.append(txn(op2)) # txn for each action.
+ else:
+ oplist.append(op2)
+ if _optype_is_write(op2._optype):
+ oplist.append(_op_log_op(op2, log_table))
+ op._group = OpList(oplist)
+ return op
+
+def _op_transaction_list(oplist, txn_config):
+ result = None
+ for op in oplist:
+ result = op_append(result, op)
+ return txn(result, txn_config)
+
+# Emulate wtperf's ops_per_txn option. Create transactions around
+# groups of operations of the indicated size.
+def op_group_transaction(ops_arg, ops_per_txn, txn_config):
+ if ops_arg != Operation.OP_NONE:
+ return txn(ops_arg, txn_config)
+ if ops_arg._transaction != None:
+ raise Exception('nested transactions not supported')
+ if ops_arg._repeatgroup != None:
+ raise Exception('grouping transactions with multipliers not supported')
+
+ oplist = []
+ ops = None
+ nops = 0
+ txgroup = []
+ for op in ops_arg._group:
+ if op.optype == Operation.OP_NONE:
+ oplist.append(_op_transaction_list(txgroup, txn_config))
+ txgroup = []
+ oplist.append(op)
+ else:
+ txgroup.append(op)
+ if len(txgroup) >= ops_per_txn:
+ oplist.append(_op_transaction_list(txgroup, txn_config))
+ txgroup = []
+ if len(txgroup) > 0:
+ oplist.append(_op_transaction_list(txgroup, txn_config))
+ ops_arg._group = OpList(oplist)
+ return ops_arg
diff --git a/bench/workgen/runner/workgen_stat.sh b/bench/workgen/runner/workgen_stat.sh
new file mode 100755
index 00000000000..1739c29859e
--- /dev/null
+++ b/bench/workgen/runner/workgen_stat.sh
@@ -0,0 +1,75 @@
+#!/bin/bash
+#
+# workgen_stat.sh - combine JSON time series output from WT and workgen.
+#
+Usage() {
+ cat <<EOF
+Usage: $0 [ options ]
+Options:
+ -h <WT_home_directory> # set the WiredTiger home directory
+ -e <analyzer_name> # run analyzer on the combined files
+ -o <output_file> # output file for result
+
+At least one of '-t2' or '-o' must be selected.
+EOF
+ exit 1
+}
+
+Filter() {
+ sed -e 's/"version" *: *"[^"]*",//' "$@"
+}
+
+wthome=.
+outfile=
+analyze=
+
+while [ "$#" != 0 ]; do
+ arg="$1"
+ shift
+ case "$arg" in
+ -h )
+ if [ $# = 0 ]; then
+ Usage
+ fi
+ wthome="$1"
+ shift
+ ;;
+ -o )
+ if [ $# = 0 ]; then
+ Usage
+ fi
+ outfile="$1"
+ shift
+ ;;
+ -e )
+ if [ $# = 0 ]; then
+ Usage
+ fi
+ analyze="$1"
+ shift
+ ;;
+ esac
+done
+if [ ! -d "$wthome" ]; then
+ echo "$wthome: WT home directory does not exist"
+ exit 1
+fi
+if [ ! -f "$wthome/WiredTiger.wt" ]; then
+ echo "$wthome: directory is not a WiredTiger home directory"
+ exit 1
+fi
+if [ "$outfile" = '' ]; then
+ if [ "$analyze" = false ]; then
+ Usage
+ fi
+ outfile="$wthome/stat_tmp.json"
+fi
+(cd $wthome; Filter WiredTigerStat.* sample.json) | sort > $outfile
+if [ "$analyze" != '' ]; then
+ sysname=`uname -s`
+ if [ "$sysname" = Darwin ]; then
+ open -a "$analyze" "$outfile"
+ else
+ "$analyze" "$outfile"
+ fi
+fi
diff --git a/bench/workgen/workgen.cxx b/bench/workgen/workgen.cxx
index c56acfd2989..880b8ca6467 100644
--- a/bench/workgen/workgen.cxx
+++ b/bench/workgen/workgen.cxx
@@ -267,16 +267,18 @@ int ContextInternal::create_all() {
}
Monitor::Monitor(WorkloadRunner &wrunner) :
- _errno(0), _exception(), _wrunner(wrunner), _stop(false), _handle() {}
+ _errno(0), _exception(), _wrunner(wrunner), _stop(false), _handle(),
+ _out(NULL), _json(NULL) {}
Monitor::~Monitor() {}
int Monitor::run() {
struct timespec t;
struct tm *tm, _tm;
- char time_buf[64];
+ char time_buf[64], version[100];
Stats prev_totals;
WorkloadOptions *options = &_wrunner._workload->options;
uint64_t latency_max = (uint64_t)options->max_latency;
+ bool first;
(*_out) << "#time,"
<< "totalsec,"
@@ -295,6 +297,8 @@ int Monitor::run() {
<< "update maximum latency(uS)"
<< std::endl;
+ first = true;
+ workgen_version(version, sizeof(version));
Stats prev_interval;
while (!_stop) {
for (int i = 0; i < options->sample_interval && !_stop; i++)
@@ -337,6 +341,32 @@ int Monitor::run() {
<< "," << interval.update.max_latency
<< std::endl;
+ if (_json != NULL) {
+#define WORKGEN_TIMESTAMP_JSON "%Y-%m-%dT%H:%M:%S.000Z"
+ (void)strftime(time_buf, sizeof(time_buf),
+ WORKGEN_TIMESTAMP_JSON, tm);
+
+#define TRACK_JSON(name, t) \
+ "\"" << (name) << "\":{" \
+ << "\"ops per sec\":" << ((t).ops / interval_secs) \
+ << ",\"average latency\":" << (t).average_latency() \
+ << ",\"min latency\":" << (t).min_latency \
+ << ",\"max latency\":" << (t).max_latency \
+ << "}"
+
+ (*_json) << "{";
+ if (first) {
+ (*_json) << "\"version\":\"" << version << "\",";
+ first = false;
+ }
+ (*_json) << "\"localTime\":\"" << time_buf
+ << "\",\"workgen\":{"
+ << TRACK_JSON("read", interval.read) << ","
+ << TRACK_JSON("insert", interval.insert) << ","
+ << TRACK_JSON("update", interval.update)
+ << "}}" << std::endl;
+ }
+
uint64_t read_max = interval.read.max_latency;
uint64_t insert_max = interval.read.max_latency;
uint64_t update_max = interval.read.max_latency;
@@ -1315,8 +1345,8 @@ TableInternal::TableInternal(const TableInternal &other) : _tint(other._tint),
TableInternal::~TableInternal() {}
WorkloadOptions::WorkloadOptions() : max_latency(0),
- report_file("workload.stat"), report_interval(0),
- run_time(0), sample_interval(0), sample_rate(1),
+ report_file("workload.stat"), report_interval(0), run_time(0),
+ sample_file("sample.json"), sample_interval(0), sample_rate(1),
_options() {
_options.add_int("max_latency", max_latency,
"prints warning if any latency measured exceeds this number of "
@@ -1329,6 +1359,11 @@ WorkloadOptions::WorkloadOptions() : max_latency(0),
"The file name is relative to the connection's home directory. "
"When set to the empty string, stdout is used.");
_options.add_int("run_time", run_time, "total workload seconds");
+ _options.add_string("sample_file", sample_file,
+ "file name for collecting latency output in a JSON-like format, "
+ "enabled by the report_interval option. "
+ "The file name is relative to the connection's home directory. "
+ "When set to the empty string, no JSON is emitted.");
_options.add_int("sample_interval", sample_interval,
"performance logging every interval seconds, 0 to disable");
_options.add_int("sample_rate", sample_rate,
@@ -1492,6 +1527,7 @@ int WorkloadRunner::run_all() {
WorkloadOptions *options = &_workload->options;
Monitor monitor(*this);
std::ofstream monitor_out;
+ std::ofstream monitor_json;
std::ostream &out = *_report_out;
WT_DECL_RET;
@@ -1510,6 +1546,12 @@ int WorkloadRunner::run_all() {
open_report_file(monitor_out, "monitor", "monitor output file");
monitor._out = &monitor_out;
+ if (!options->sample_file.empty()) {
+ open_report_file(monitor_json, options->sample_file.c_str(),
+ "sample JSON output file");
+ monitor._json = &monitor_json;
+ }
+
if ((ret = pthread_create(&monitor._handle, NULL, monitor_main,
&monitor)) != 0) {
std::cerr << "monitor thread failed err=" << ret << std::endl;
@@ -1588,6 +1630,10 @@ int WorkloadRunner::run_all() {
<< std::endl;
if (exception == NULL && !monitor._exception._str.empty())
exception = &monitor._exception;
+
+ monitor_out.close();
+ if (!options->sample_file.empty())
+ monitor_json.close();
}
// issue the final report
diff --git a/bench/workgen/workgen.h b/bench/workgen/workgen.h
index c1ae01ed5a4..c7be8ee0035 100644
--- a/bench/workgen/workgen.h
+++ b/bench/workgen/workgen.h
@@ -358,6 +358,7 @@ struct WorkloadOptions {
int run_time;
int sample_interval;
int sample_rate;
+ std::string sample_file;
WorkloadOptions();
WorkloadOptions(const WorkloadOptions &other);
diff --git a/bench/workgen/workgen_func.c b/bench/workgen/workgen_func.c
index 2e1271a515e..5ce2146a8e4 100644
--- a/bench/workgen/workgen_func.c
+++ b/bench/workgen/workgen_func.c
@@ -87,3 +87,16 @@ workgen_u64_to_string_zf(uint64_t n, char *buf, size_t len)
{
u64_to_string_zf(n, buf, len);
}
+
+#define WORKGEN_VERSION_PREFIX "workgen-"
+extern void
+workgen_version(char *buf, size_t len)
+{
+ size_t prefix_len;
+
+ prefix_len = strlen(WORKGEN_VERSION_PREFIX);
+ (void)strncpy(buf, WORKGEN_VERSION_PREFIX, len);
+ if (len > prefix_len)
+ (void)strncpy(&buf[prefix_len], WIREDTIGER_VERSION_STRING,
+ len - prefix_len);
+}
diff --git a/bench/workgen/workgen_func.h b/bench/workgen/workgen_func.h
index 20ebf2632cc..ec7ecf0a504 100644
--- a/bench/workgen/workgen_func.h
+++ b/bench/workgen/workgen_func.h
@@ -42,3 +42,5 @@ extern void
workgen_random_free(struct workgen_random_state *rnd_state);
extern void
workgen_u64_to_string_zf(uint64_t n, char *buf, size_t len);
+extern void
+workgen_version(char *buf, size_t len);
diff --git a/bench/workgen/workgen_int.h b/bench/workgen/workgen_int.h
index 01fb727691b..9283aea1d7b 100644
--- a/bench/workgen/workgen_int.h
+++ b/bench/workgen/workgen_int.h
@@ -146,6 +146,7 @@ struct Monitor {
volatile bool _stop;
pthread_t _handle;
std::ostream *_out;
+ std::ostream *_json;
Monitor(WorkloadRunner &wrunner);
~Monitor();
diff --git a/bench/workgen/wtperf.py b/bench/workgen/wtperf.py
new file mode 100644
index 00000000000..3a196fe7b57
--- /dev/null
+++ b/bench/workgen/wtperf.py
@@ -0,0 +1,440 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2017 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+
+# wtperf.py
+# A partial emulation of wtperf. Translates a .wtperf file into a Python
+# script that uses the workgen module, and runs the script. Errors are
+# issued for any .wtperf directives that are not known.
+# See also the usage() function.
+#
+from __future__ import print_function
+import os, sys, tempfile
+
+def eprint(*args, **kwargs):
+ print(*args, file=sys.stderr, **kwargs)
+
+class OptionValue:
+ def __init__(self, value, filename, linenum):
+ self.value = value
+ self.filename = filename
+ self.linenum = linenum
+
+class TranslateException(Exception):
+ pass
+
+class Options(object):
+ pass
+
+class Translator:
+ def __init__(self, filename, prefix, verbose):
+ self.filename = filename
+ self.prefix = prefix
+ self.verbose = verbose
+ self.linenum = 0
+ self.opts = {}
+ self.used_opts = {}
+ self.has_error = False
+
+ def error_file_line(self, fname, linenum, msg):
+ self.has_error = True
+ eprint(fname + ':' + str(linenum) + ': error: ' + msg)
+
+ # Report an error and continue
+ def error(self, msg):
+ self.error_file_line(self.filename, self.linenum, msg)
+
+ # Report an error and unwind the stack
+ def fatal_error(self, msg, errtype):
+ self.error(msg)
+ raise TranslateException(errtype)
+
+ supported_opt_list = [ 'compression', 'conn_config', 'icount',
+ 'key_sz', 'log_like_table',
+ 'populate_ops_per_txn', 'populate_threads',
+ 'reopen_connection',
+ 'table_config', 'table_count',
+ 'threads', 'transaction_config', 'value_sz' ]
+
+ def set_opt(self, optname, val):
+ if optname not in self.supported_opt_list:
+ self.error("unknown option: " + optname)
+ return
+ elif val[0] == '"' and val[-1] == '"':
+ v = val[1:-1]
+ elif val == 'true':
+ v = True
+ elif val == 'false':
+ v = False
+ elif val[0] == '(':
+ v = val # config string stored as is
+ else:
+ try:
+ v = int(val) # it might be an integer
+ except ValueError:
+ v = val # it's a string after all
+ self.opts[optname] = OptionValue(v, self.filename, self.linenum)
+
+ def get_opt(self, optname, dfault):
+ if optname in self.opts:
+ ret = self.opts[optname]
+ self.filename = ret.filename
+ self.linenum = ret.linenum
+ self.used_opts[optname] = 1
+ return ret.value
+ else:
+ return dfault
+
+ def get_int_opt(self, optname, dfault):
+ return self.get_opt(optname, dfault) + 0
+
+ def get_boolean_opt(self, optname, dfault):
+ return not not self.get_opt(optname, dfault)
+
+ # Split a string 'left_side=right_side' into two parts
+ def split_assign(self, s):
+ equalpos = s.find('=')
+ if equalpos < 0:
+ self.error("missing '=' for line: " + line)
+ return (None, None)
+ else:
+ return s.split('=', 1)
+
+ # Split a config string honoring nesting e.g.
+ # "(abc=123,def=234,ghi=(hi=1,bye=2))" would return 3 items.
+ def split_config_parens(self, s):
+ if s[0:1] != '(':
+ import pdb
+ pdb.set_trace()
+ self.fatal_error('missing left paren', 'config parse error')
+ if s[-1:] != ')':
+ self.fatal_error('missing right paren', 'config parse error')
+ s = s[1:-1]
+ result = []
+ level = 0
+ cur = ''
+ for ch in s:
+ if ch == ',' and level == 0:
+ result.append(cur)
+ cur = ''
+ else:
+ cur += ch
+ if ch == '(':
+ level += 1
+ elif ch == ')':
+ level -= 1
+ if level < 0:
+ self.fatal_error('unbalanced paren', 'config parse error')
+ if level != 0:
+ self.fatal_error('unbalanced paren', 'config parse error')
+ if len(cur) != 0:
+ result.append(cur)
+ return result
+
+ def assign_str(self, left, right):
+ return left + '=' + str(right) + '\n'
+
+ def add_operation_str(self, count, opname, multi):
+ result = ''
+ tablename = 'tables[0]' if multi else 'table'
+ if count > 1:
+ result += str(count) + ' * '
+ if count > 0:
+ result += 'Operation(Operation.' + opname + ', ' + \
+ tablename + ') + \\\n'
+ result += ' '
+ return result
+
+ # Wtperf's throttle is based on the number of regular operations,
+ # not including log_like operations. Workgen counts all operations,
+ # it doesn't treat log operations any differently. Adjust the throttle
+ # number to account for the difference.
+ def calc_throttle(self, thread_opts, log_like_table):
+ throttle = thread_opts.throttle
+ if not log_like_table:
+ return (throttle, '')
+ modify = thread_opts.inserts + thread_opts.updates
+ regular = modify + thread_opts.reads
+ total = regular + modify
+ factor = (total + 0.0) / regular
+ new_throttle = int(throttle * factor)
+ if new_throttle == throttle:
+ comment = ''
+ else:
+ comment = '# wtperf throttle=' + str(throttle) + ' adjusted by ' + \
+ str(factor) + ' to compensate for log_like operations.\n'
+ return (new_throttle, comment)
+
+ def parse_threads(self, threads_config):
+ tdecls = ''
+ tlist = self.split_config_parens(threads_config)
+ table_count = self.get_int_opt('table_count', 1)
+ log_like_table = self.get_boolean_opt('log_like_table', False)
+ txn_config = self.get_opt('transaction_config', '')
+ if log_like_table:
+ tdecls += 'log_name = "table:log"\n'
+ tdecls += 's.create(log_name, "key_format=S,value_format=S," +' + \
+ ' compress_table_config)\n'
+ tdecls += 'log_table = Table(log_name)\n\n'
+ thread_count = 0
+ tnames = ''
+ multi = (table_count > 1)
+ for t in tlist:
+ thread_name = 'thread' + str(thread_count)
+ thread_count += 1
+
+ # For wtperf compatibility, we allow both 'insert/inserts' etc.
+ topts = Options()
+ topts.count = 1
+ topts.insert = 0
+ topts.inserts = 0
+ topts.ops_per_txn = 0
+ topts.read = 0
+ topts.reads = 0
+ topts.throttle = 0
+ topts.update = 0
+ topts.updates = 0
+
+ for o in self.split_config_parens(t):
+ (k, v) = self.split_assign(o)
+ if hasattr(topts, k):
+ try:
+ setattr(topts, k, int(v))
+ except ValueError:
+ self.error('thread option ' + k + ': integer expected')
+ else:
+ self.error('unknown thread option: ' + k)
+
+ topts.inserts += topts.insert; topts.insert = 0
+ topts.updates += topts.update; topts.update = 0
+ topts.reads += topts.read; topts.read = 0
+ if topts.count == 0:
+ continue
+
+ if topts.inserts + topts.reads + topts.updates == 0:
+ self.fatal_error('need read/insert/update/...',
+ 'thread config error')
+ tdecls += 'ops = '
+ tdecls += self.add_operation_str(topts.inserts, 'OP_INSERT', multi)
+ tdecls += self.add_operation_str(topts.reads, 'OP_SEARCH', multi)
+ tdecls += self.add_operation_str(topts.updates, 'OP_UPDATE', multi)
+ tdecls = tdecls.rstrip(' \n\\+') + '\n'
+ if multi:
+ tdecls += 'ops = op_multi_table(ops, tables)\n'
+ if topts.ops_per_txn > 0:
+ tdecls += 'ops = op_group_transaction(ops, ' + \
+ str(topts.ops_per_txn) + ', "' + txn_config + '")\n'
+ if log_like_table:
+ tdecls += 'ops = op_log_like(ops, log_table, ' + \
+ str(topts.ops_per_txn) + ')\n'
+ tdecls += thread_name + ' = Thread(ops)\n'
+ if topts.throttle > 0:
+ (throttle, comment) = self.calc_throttle(topts, log_like_table)
+ tdecls += comment
+ tdecls += self.assign_str(thread_name + '.options.throttle',
+ throttle)
+ tdecls += '\n'
+ if topts.count > 1:
+ tnames += str(topts.count) + ' * '
+ tnames += thread_name + ' + '
+
+ tnames = tnames.rstrip(' +')
+ return (tdecls, tnames)
+
+ def translate(self):
+ try:
+ return self.translate_inner()
+ except TranslateException:
+ # An error has already been reported
+ return None
+
+ def translate_inner(self):
+ workloadopts = ''
+ with open(self.filename) as fin:
+ for line in fin:
+ self.linenum += 1
+ commentpos = line.find('#')
+ if commentpos >= 0:
+ line = line[0:commentpos]
+ line = line.strip()
+ if len(line) == 0:
+ continue
+ (key, val) = self.split_assign(line)
+ if key in [ 'max_latency', 'report_file', 'report_interval',
+ 'run_time', 'sample_interval', 'sample_rate' ]:
+ workloadopts += 'workload.options.' + key + '=' + val + '\n'
+ else:
+ self.set_opt(key, val)
+
+ table_count = self.get_int_opt('table_count', 1)
+ conn_config = self.get_opt('conn_config', '')
+ table_config = self.get_opt('table_config', '')
+ key_sz = self.get_int_opt('key_sz', 20)
+ value_sz = self.get_int_opt('value_sz', 100)
+ reopen = self.get_boolean_opt('reopen_connection', False)
+ compression = self.get_opt('compression', '')
+ txn_config = self.get_opt('transaction_config', '')
+
+ s = '#/usr/bin/env python\n'
+ s += '# generated from ' + self.filename + '\n'
+ s += self.prefix
+ s += 'from runner import *\n'
+ s += 'from wiredtiger import *\n'
+ s += 'from workgen import *\n'
+ s += '\n'
+ s += 'context = Context()\n'
+ s += 'conn_config = "' + conn_config + '"\n'
+ if compression != '':
+ s += 'conn_config += extensions_config(["compressors/' + \
+ compression + '"])\n'
+ compression = 'block_compressor=' + compression + ','
+ s += 'conn = wiredtiger_open("WT_TEST", "create," + conn_config)\n'
+ s += 's = conn.open_session()\n'
+ s += '\n'
+ s += 'wtperf_table_config = "key_format=S,value_format=S,type=lsm," +\\\n'
+ s += ' "exclusive=true,allocation_size=4kb," +\\\n'
+ s += ' "internal_page_max=64kb,leaf_page_max=4kb,split_pct=100,"\n'
+ s += 'compress_table_config = "' + compression + '"\n'
+ s += 'table_config = "' + table_config + '"\n'
+ if table_count == 1:
+ s += 'tname = "file:test.wt"\n'
+ s += 's.create(tname, wtperf_table_config +\\\n'
+ s += ' compress_table_config + table_config)\n'
+ s += 'table = Table(tname)\n'
+ s += 'table.options.key_size = ' + str(key_sz) + '\n'
+ s += 'table.options.value_size = ' + str(value_sz) + '\n'
+ else:
+ s += 'table_count = ' + str(table_count) + '\n'
+ s += 'tables = []\n'
+ s += 'for i in range(0, table_count):\n'
+ s += ' tname = "file:test" + str(i) + ".wt"\n'
+ s += ' s.create(tname, ' + \
+ 'wtperf_table_config + ' + \
+ 'compress_table_config + table_config)\n'
+ s += ' t = Table(tname)\n'
+ s += ' t.options.key_size = ' + str(key_sz) + '\n'
+ s += ' t.options.value_size = ' + str(value_sz) + '\n'
+ s += ' tables.append(t)\n'
+ s += '\n'
+
+ icount = self.get_int_opt('icount', 0)
+ pop_thread = self.get_int_opt('populate_threads', 1)
+ pop_per_txn = self.get_int_opt('populate_ops_per_txn', 0)
+ if icount != 0:
+ if pop_thread == 0:
+ self.fatal_error('icount != 0 and populate_threads == 0: ' +\
+ 'cannot populate entries with no threads')
+ elif pop_thread == 1:
+ mult = ''
+ else:
+ mult = str(pop_thread) + ' * '
+
+ # if there are multiple tables to be filled during populate,
+ # the icount is split between them all.
+ nops_per_thread = icount / (pop_thread * table_count)
+ if table_count == 1:
+ s += 'pop_ops = Operation(Operation.OP_INSERT, table)\n'
+ else:
+ s += 'pop_ops = Operation(Operation.OP_INSERT, tables[0])\n'
+ s += 'pop_ops = op_multi_table(pop_ops, tables)\n'
+ if pop_per_txn > 0:
+ s += 'pop_ops = op_group_transaction(pop_ops, ' + \
+ str(pop_per_txn) + ', "' + txn_config + '")\n'
+ s += 'pop_thread = Thread(pop_ops * ' + str(nops_per_thread) + ')\n'
+ s += 'pop_workload = Workload(context, ' + mult + 'pop_thread)\n'
+ if self.verbose > 0:
+ s += 'print("populate:")\n'
+ s += 'pop_workload.run(conn)\n'
+ else:
+ if self.get_int_opt('populate_threads', 0) != 0:
+ self.error("populate_threads > 0, icount == 0")
+
+ thread_config = self.get_opt('threads', '')
+ if thread_config != '':
+ (t_create, t_var) = self.parse_threads(thread_config)
+ s += '\n' + t_create
+ if reopen:
+ s += '\n# reopen the connection\n'
+ s += 'conn.close()\n'
+ s += 'conn = wiredtiger_open(' + \
+ '"WT_TEST", "create," + conn_config)\n'
+ s += '\n'
+ s += 'workload = Workload(context, ' + t_var + ')\n'
+ s += workloadopts
+ if self.verbose > 0:
+ s += 'print("workload:")\n'
+ s += 'workload.run(conn)\n'
+
+ for o in self.used_opts:
+ del self.opts[o]
+ if len(self.opts) != 0:
+ self.error('internal error, options not handled: ' + str(self.opts))
+ return s
+
+def usage():
+ eprint((
+ 'Usage: python wtperf.py [ options ] file.wtperf ...\n'
+ '\n'
+ 'Options:\n'
+ ' --python Python output generated on stdout\n'
+ ' -v --verbose Verbose output\n'
+ '\n'
+ 'If --python is not specified, the resulting workload is run.'))
+
+verbose = 0
+py_out = False
+workgen_dir = os.path.dirname(os.path.abspath(__file__))
+runner_dir = os.path.join(workgen_dir, 'runner')
+prefix = (
+ '# The next lines are unneeded if this script is in the runner directory.\n'
+ 'import sys\n'
+ 'sys.path.append("' + runner_dir + '")\n\n')
+
+exit_status = 0
+for arg in sys.argv[1:]:
+ if arg == '--python':
+ py_out = True
+ elif arg == '--verbose' or arg == '-v':
+ verbose += 1
+ elif arg.endswith('.wtperf'):
+ translator = Translator(arg, prefix, verbose)
+ pysrc = translator.translate()
+ if translator.has_error:
+ exit_status = 1
+ elif py_out:
+ print(pysrc)
+ else:
+ (outfd, tmpfile) = tempfile.mkstemp(suffix='.py')
+ os.write(outfd, pysrc)
+ os.close(outfd)
+ execfile(tmpfile)
+ os.remove(tmpfile)
+ else:
+ usage()
+ sys.exit(1)
+sys.exit(exit_status)
diff --git a/bench/wtperf/idle_table_cycle.c b/bench/wtperf/idle_table_cycle.c
index ce64049ce89..d0baa786ba9 100644
--- a/bench/wtperf/idle_table_cycle.c
+++ b/bench/wtperf/idle_table_cycle.c
@@ -57,7 +57,7 @@ check_timing(WTPERF *wtperf,
* Measure how long each step takes, and flag an error if it exceeds the
* configured maximum.
*/
-static void *
+static WT_THREAD_RET
cycle_idle_tables(void *arg)
{
struct timespec start, stop;
@@ -76,7 +76,7 @@ cycle_idle_tables(void *arg)
wtperf->conn, NULL, opts->sess_config, &session)) != 0) {
lprintf(wtperf, ret, 0,
"Error opening a session on %s", wtperf->home);
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
for (cycle_count = 0; wtperf->idle_cycle_run; ++cycle_count) {
@@ -96,10 +96,10 @@ cycle_idle_tables(void *arg)
lprintf(wtperf, ret, 0,
"Table create failed in cycle_idle_tables.");
wtperf->error = true;
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
if (check_timing(wtperf, "create", start, &stop) != 0)
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
start = stop;
/* Open and close cursor. */
@@ -108,16 +108,16 @@ cycle_idle_tables(void *arg)
lprintf(wtperf, ret, 0,
"Cursor open failed in cycle_idle_tables.");
wtperf->error = true;
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
if ((ret = cursor->close(cursor)) != 0) {
lprintf(wtperf, ret, 0,
"Cursor close failed in cycle_idle_tables.");
wtperf->error = true;
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
if (check_timing(wtperf, "cursor", start, &stop) != 0)
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
start = stop;
#if 1
@@ -133,14 +133,14 @@ cycle_idle_tables(void *arg)
lprintf(wtperf, ret, 0,
"Table drop failed in cycle_idle_tables.");
wtperf->error = true;
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
if (check_timing(wtperf, "drop", start, &stop) != 0)
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
#endif
}
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
/*
@@ -150,47 +150,33 @@ cycle_idle_tables(void *arg)
* structure. Should reshuffle the configuration structure so explicit static
* initialization isn't necessary.
*/
-int
-start_idle_table_cycle(WTPERF *wtperf, pthread_t *idle_table_cycle_thread)
+void
+start_idle_table_cycle(WTPERF *wtperf, wt_thread_t *idle_table_cycle_thread)
{
CONFIG_OPTS *opts;
- pthread_t thread_id;
- int ret;
+ wt_thread_t thread_id;
opts = wtperf->opts;
if (opts->idle_table_cycle == 0)
- return (0);
+ return;
wtperf->idle_cycle_run = true;
- if ((ret = pthread_create(
- &thread_id, NULL, cycle_idle_tables, wtperf)) != 0) {
- lprintf(wtperf,
- ret, 0, "Error creating idle table cycle thread.");
- wtperf->idle_cycle_run = false;
- return (ret);
- }
+ testutil_check(__wt_thread_create(
+ NULL, &thread_id, cycle_idle_tables, wtperf));
*idle_table_cycle_thread = thread_id;
-
- return (0);
}
-int
-stop_idle_table_cycle(WTPERF *wtperf, pthread_t idle_table_cycle_thread)
+void
+stop_idle_table_cycle(WTPERF *wtperf, wt_thread_t idle_table_cycle_thread)
{
CONFIG_OPTS *opts;
- int ret;
opts = wtperf->opts;
if (opts->idle_table_cycle == 0 || !wtperf->idle_cycle_run)
- return (0);
+ return;
wtperf->idle_cycle_run = false;
- if ((ret = pthread_join(idle_table_cycle_thread, NULL)) != 0) {
- lprintf(
- wtperf, ret, 0, "Error joining idle table cycle thread.");
- return (ret);
- }
- return (0);
+ testutil_check(__wt_thread_join(NULL, idle_table_cycle_thread));
}
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c
index 68bc08226c2..a8d3f135280 100644
--- a/bench/wtperf/wtperf.c
+++ b/bench/wtperf/wtperf.c
@@ -32,23 +32,23 @@
#define DEFAULT_HOME "WT_TEST"
#define DEFAULT_MONITOR_DIR "WT_TEST"
-static void *checkpoint_worker(void *);
+static WT_THREAD_RET checkpoint_worker(void *);
static int drop_all_tables(WTPERF *);
static int execute_populate(WTPERF *);
static int execute_workload(WTPERF *);
static int find_table_count(WTPERF *);
-static void *monitor(void *);
-static void *populate_thread(void *);
+static WT_THREAD_RET monitor(void *);
+static WT_THREAD_RET populate_thread(void *);
static void randomize_value(WTPERF_THREAD *, char *);
static void recreate_dir(const char *);
static int start_all_runs(WTPERF *);
static int start_run(WTPERF *);
-static int start_threads(WTPERF *,
- WORKLOAD *, WTPERF_THREAD *, u_int, void *(*)(void *));
-static int stop_threads(WTPERF *, u_int, WTPERF_THREAD *);
-static void *thread_run_wtperf(void *);
+static void start_threads(WTPERF *, WORKLOAD *,
+ WTPERF_THREAD *, u_int, WT_THREAD_CALLBACK(*)(void *));
+static void stop_threads(u_int, WTPERF_THREAD *);
+static WT_THREAD_RET thread_run_wtperf(void *);
static void update_value_delta(WTPERF_THREAD *);
-static void *worker(void *);
+static WT_THREAD_RET worker(void *);
static uint64_t wtperf_rand(WTPERF_THREAD *);
static uint64_t wtperf_value_range(WTPERF *);
@@ -312,7 +312,7 @@ op_name(uint8_t *op)
/* NOTREACHED */
}
-static void *
+static WT_THREAD_RET
worker_async(void *arg)
{
CONFIG_OPTS *opts;
@@ -420,7 +420,7 @@ op_err: lprintf(wtperf, ret, 0,
if (0) {
err: wtperf->error = wtperf->stop = true;
}
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
/*
@@ -513,7 +513,7 @@ err: lprintf(wtperf, ret, 0, "Pre-workload traverse error");
return (ret);
}
-static void *
+static WT_THREAD_RET
worker(void *arg)
{
struct timespec start, stop;
@@ -893,7 +893,7 @@ err: wtperf->error = wtperf->stop = true;
}
free(cursors);
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
/*
@@ -1014,7 +1014,7 @@ run_mix_schedule(WTPERF *wtperf, WORKLOAD *workp)
return (0);
}
-static void *
+static WT_THREAD_RET
populate_thread(void *arg)
{
struct timespec start, stop;
@@ -1163,10 +1163,10 @@ err: wtperf->error = wtperf->stop = true;
}
free(cursors);
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
-static void *
+static WT_THREAD_RET
populate_async(void *arg)
{
struct timespec start, stop;
@@ -1261,10 +1261,10 @@ populate_async(void *arg)
if (0) {
err: wtperf->error = wtperf->stop = true;
}
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
-static void *
+static WT_THREAD_RET
monitor(void *arg)
{
struct timespec t;
@@ -1426,10 +1426,10 @@ err: wtperf->error = wtperf->stop = true;
(void)fclose(fp);
free(path);
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
-static void *
+static WT_THREAD_RET
checkpoint_worker(void *arg)
{
CONFIG_OPTS *opts;
@@ -1490,7 +1490,7 @@ checkpoint_worker(void *arg)
err: wtperf->error = wtperf->stop = true;
}
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
static int
@@ -1498,15 +1498,15 @@ execute_populate(WTPERF *wtperf)
{
struct timespec start, stop;
CONFIG_OPTS *opts;
- WTPERF_THREAD *popth;
WT_ASYNC_OP *asyncop;
- pthread_t idle_table_cycle_thread;
+ WTPERF_THREAD *popth;
+ WT_THREAD_CALLBACK(*pfunc)(void *);
size_t i;
uint64_t last_ops, msecs, print_ops_sec;
uint32_t interval, tables;
+ wt_thread_t idle_table_cycle_thread;
double print_secs;
int elapsed, ret;
- void *(*pfunc)(void *);
opts = wtperf->opts;
@@ -1516,9 +1516,7 @@ execute_populate(WTPERF *wtperf)
opts->populate_threads, opts->icount);
/* Start cycling idle tables if configured. */
- if ((ret =
- start_idle_table_cycle(wtperf, &idle_table_cycle_thread)) != 0)
- return (ret);
+ start_idle_table_cycle(wtperf, &idle_table_cycle_thread);
wtperf->insert_key = 0;
@@ -1530,9 +1528,8 @@ execute_populate(WTPERF *wtperf)
pfunc = populate_async;
} else
pfunc = populate_thread;
- if ((ret = start_threads(wtperf, NULL,
- wtperf->popthreads, opts->populate_threads, pfunc)) != 0)
- return (ret);
+ start_threads(wtperf, NULL,
+ wtperf->popthreads, opts->populate_threads, pfunc);
__wt_epoch(NULL, &start);
for (elapsed = 0, interval = 0, last_ops = 0;
@@ -1568,10 +1565,8 @@ execute_populate(WTPERF *wtperf)
*/
popth = wtperf->popthreads;
wtperf->popthreads = NULL;
- ret = stop_threads(wtperf, opts->populate_threads, popth);
+ stop_threads(opts->populate_threads, popth);
free(popth);
- if (ret != 0)
- return (ret);
/* Report if any worker threads didn't finish. */
if (wtperf->error) {
@@ -1640,8 +1635,7 @@ execute_populate(WTPERF *wtperf)
}
/* Stop cycling idle tables. */
- if ((ret = stop_idle_table_cycle(wtperf, idle_table_cycle_thread)) != 0)
- return (ret);
+ stop_idle_table_cycle(wtperf, idle_table_cycle_thread);
return (0);
}
@@ -1701,13 +1695,13 @@ execute_workload(WTPERF *wtperf)
WTPERF_THREAD *threads;
WT_CONNECTION *conn;
WT_SESSION **sessions;
- pthread_t idle_table_cycle_thread;
+ WT_THREAD_CALLBACK(*pfunc)(void *);
+ wt_thread_t idle_table_cycle_thread;
uint64_t last_ckpts, last_inserts, last_reads, last_truncates;
uint64_t last_updates;
uint32_t interval, run_ops, run_time;
u_int i;
- int ret, t_ret;
- void *(*pfunc)(void *);
+ int ret;
opts = wtperf->opts;
@@ -1722,9 +1716,7 @@ execute_workload(WTPERF *wtperf)
sessions = NULL;
/* Start cycling idle tables. */
- if ((ret =
- start_idle_table_cycle(wtperf, &idle_table_cycle_thread)) != 0)
- return (ret);
+ start_idle_table_cycle(wtperf, &idle_table_cycle_thread);
if (opts->warmup != 0)
wtperf->in_warmup = true;
@@ -1768,9 +1760,8 @@ execute_workload(WTPERF *wtperf)
goto err;
/* Start the workload's threads. */
- if ((ret = start_threads(
- wtperf, workp, threads, (u_int)workp->threads, pfunc)) != 0)
- goto err;
+ start_threads(
+ wtperf, workp, threads, (u_int)workp->threads, pfunc);
threads += workp->threads;
}
@@ -1836,12 +1827,9 @@ execute_workload(WTPERF *wtperf)
err: wtperf->stop = true;
/* Stop cycling idle tables. */
- if ((ret = stop_idle_table_cycle(wtperf, idle_table_cycle_thread)) != 0)
- return (ret);
+ stop_idle_table_cycle(wtperf, idle_table_cycle_thread);
- if ((t_ret = stop_threads(wtperf,
- (u_int)wtperf->workers_cnt, wtperf->workers)) != 0 && ret == 0)
- ret = t_ret;
+ stop_threads((u_int)wtperf->workers_cnt, wtperf->workers);
/* Drop tables if configured to and this isn't an error path */
if (ret == 0 &&
@@ -2163,9 +2151,9 @@ start_all_runs(WTPERF *wtperf)
{
CONFIG_OPTS *opts;
WTPERF *next_wtperf, **wtperfs;
- pthread_t *threads;
size_t i, len;
- int ret, t_ret;
+ wt_thread_t *threads;
+ int ret;
opts = wtperf->opts;
wtperfs = NULL;
@@ -2178,7 +2166,7 @@ start_all_runs(WTPERF *wtperf)
wtperfs = dcalloc(opts->database_count, sizeof(WTPERF *));
/* Allocate an array to hold our thread IDs. */
- threads = dcalloc(opts->database_count, sizeof(pthread_t));
+ threads = dcalloc(opts->database_count, sizeof(*threads));
for (i = 0; i < opts->database_count; i++) {
wtperf_copy(wtperf, &next_wtperf);
@@ -2203,22 +2191,15 @@ start_all_runs(WTPERF *wtperf)
strcmp(next_wtperf->home, next_wtperf->monitor_dir) != 0)
recreate_dir(next_wtperf->monitor_dir);
- if ((ret = pthread_create(
- &threads[i], NULL, thread_run_wtperf, next_wtperf)) != 0) {
- lprintf(wtperf, ret, 0, "Error creating thread");
- goto err;
- }
+ testutil_check(__wt_thread_create(NULL,
+ &threads[i], thread_run_wtperf, next_wtperf));
}
/* Wait for threads to finish. */
for (i = 0; i < opts->database_count; i++)
- if ((t_ret = pthread_join(threads[i], NULL)) != 0) {
- lprintf(wtperf, ret, 0, "Error joining thread");
- if (ret == 0)
- ret = t_ret;
- }
+ testutil_check(__wt_thread_join(NULL, threads[i]));
-err: for (i = 0; i < opts->database_count && wtperfs[i] != NULL; i++) {
+ for (i = 0; i < opts->database_count && wtperfs[i] != NULL; i++) {
wtperf_free(wtperfs[i]);
free(wtperfs[i]);
}
@@ -2229,7 +2210,7 @@ err: for (i = 0; i < opts->database_count && wtperfs[i] != NULL; i++) {
}
/* Run an instance of wtperf for a given configuration. */
-static void *
+static WT_THREAD_RET
thread_run_wtperf(void *arg)
{
WTPERF *wtperf;
@@ -2238,14 +2219,14 @@ thread_run_wtperf(void *arg)
wtperf = (WTPERF *)arg;
if ((ret = start_run(wtperf)) != 0)
lprintf(wtperf, ret, 0, "Run failed for: %s.", wtperf->home);
- return (NULL);
+ return (WT_THREAD_RET_VALUE);
}
static int
start_run(WTPERF *wtperf)
{
CONFIG_OPTS *opts;
- pthread_t monitor_thread;
+ wt_thread_t monitor_thread;
uint64_t total_ops;
uint32_t run_time;
int monitor_created, ret, t_ret;
@@ -2272,12 +2253,8 @@ start_run(WTPERF *wtperf)
/* Start the monitor thread. */
if (opts->sample_interval != 0) {
- if ((ret = pthread_create(
- &monitor_thread, NULL, monitor, wtperf)) != 0) {
- lprintf(wtperf,
- ret, 0, "Error creating monitor thread.");
- goto err;
- }
+ testutil_check(__wt_thread_create(
+ NULL, &monitor_thread, monitor, wtperf));
monitor_created = 1;
}
@@ -2306,9 +2283,8 @@ start_run(WTPERF *wtperf)
opts->checkpoint_threads);
wtperf->ckptthreads = dcalloc(
opts->checkpoint_threads, sizeof(WTPERF_THREAD));
- if (start_threads(wtperf, NULL, wtperf->ckptthreads,
- opts->checkpoint_threads, checkpoint_worker) != 0)
- goto err;
+ start_threads(wtperf, NULL, wtperf->ckptthreads,
+ opts->checkpoint_threads, checkpoint_worker);
}
if (opts->pre_load_data && (ret = pre_load_data(wtperf)) != 0)
goto err;
@@ -2362,16 +2338,10 @@ err: if (ret == 0)
/* Notify the worker threads they are done. */
wtperf->stop = true;
- if ((t_ret = stop_threads(wtperf, 1, wtperf->ckptthreads)) != 0)
- if (ret == 0)
- ret = t_ret;
+ stop_threads(1, wtperf->ckptthreads);
- if (monitor_created != 0 &&
- (t_ret = pthread_join(monitor_thread, NULL)) != 0) {
- lprintf(wtperf, ret, 0, "Error joining monitor thread.");
- if (ret == 0)
- ret = t_ret;
- }
+ if (monitor_created != 0)
+ testutil_check(__wt_thread_join(NULL, monitor_thread));
if (wtperf->conn != NULL && opts->close_conn &&
(t_ret = wtperf->conn->close(wtperf->conn, NULL)) != 0) {
@@ -2728,14 +2698,13 @@ err: wtperf_free(wtperf);
return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}
-static int
-start_threads(WTPERF *wtperf,
- WORKLOAD *workp, WTPERF_THREAD *base, u_int num, void *(*func)(void *))
+static void
+start_threads(WTPERF *wtperf, WORKLOAD *workp,
+ WTPERF_THREAD *base, u_int num, WT_THREAD_CALLBACK(*func)(void *))
{
CONFIG_OPTS *opts;
WTPERF_THREAD *thread;
u_int i;
- int ret;
opts = wtperf->opts;
@@ -2779,29 +2748,20 @@ start_threads(WTPERF *wtperf,
/* Start the threads. */
for (i = 0, thread = base; i < num; ++i, ++thread)
- if ((ret = pthread_create(
- &thread->handle, NULL, func, thread)) != 0) {
- lprintf(wtperf, ret, 0, "Error creating thread");
- return (ret);
- }
-
- return (0);
+ testutil_check(__wt_thread_create(
+ NULL, &thread->handle, func, thread));
}
-static int
-stop_threads(WTPERF *wtperf, u_int num, WTPERF_THREAD *threads)
+static void
+stop_threads(u_int num, WTPERF_THREAD *threads)
{
u_int i;
- int ret;
if (num == 0 || threads == NULL)
- return (0);
+ return;
for (i = 0; i < num; ++i, ++threads) {
- if ((ret = pthread_join(threads->handle, NULL)) != 0) {
- lprintf(wtperf, ret, 0, "Error joining thread");
- return (ret);
- }
+ testutil_check(__wt_thread_join(NULL, threads->handle));
free(threads->key_buf);
threads->key_buf = NULL;
@@ -2815,7 +2775,6 @@ stop_threads(WTPERF *wtperf, u_int num, WTPERF_THREAD *threads)
* being read by the monitor thread (among others). As a standalone
* program, leaking memory isn't a concern, and it's simpler that way.
*/
- return (0);
}
static void
diff --git a/bench/wtperf/wtperf.h b/bench/wtperf/wtperf.h
index bd6c1e829ba..b17d082ddcf 100644
--- a/bench/wtperf/wtperf.h
+++ b/bench/wtperf/wtperf.h
@@ -232,7 +232,7 @@ struct __wtperf_thread { /* Per-thread structure */
WT_RAND_STATE rnd; /* Random number generation state */
- pthread_t handle; /* Handle */
+ wt_thread_t handle; /* Handle */
char *key_buf, *value_buf; /* Key/value memory */
@@ -269,8 +269,8 @@ int run_truncate(
int setup_log_file(WTPERF *);
void setup_throttle(WTPERF_THREAD *);
int setup_truncate(WTPERF *, WTPERF_THREAD *, WT_SESSION *);
-int start_idle_table_cycle(WTPERF *, pthread_t *);
-int stop_idle_table_cycle(WTPERF *, pthread_t);
+void start_idle_table_cycle(WTPERF *, wt_thread_t *);
+void stop_idle_table_cycle(WTPERF *, wt_thread_t);
void worker_throttle(WTPERF_THREAD *);
uint64_t sum_ckpt_ops(WTPERF *);
uint64_t sum_insert_ops(WTPERF *);