diff options
author | Alex Gorrod <alexander.gorrod@mongodb.com> | 2017-06-14 19:39:53 +0000 |
---|---|---|
committer | Alex Gorrod <alexander.gorrod@mongodb.com> | 2017-06-14 19:39:53 +0000 |
commit | 11ca50ae96399aa8b0eaeee5dc115398d78fee2b (patch) | |
tree | a54af4da3e0dc8f216c191ce23e650e318ce00ac /src/third_party/wiredtiger/bench | |
parent | a16bc9371921766d41e7a86c3ca10080c677ce6d (diff) | |
download | mongo-11ca50ae96399aa8b0eaeee5dc115398d78fee2b.tar.gz |
Import wiredtiger: 47e8c3d1d22018eaaa09f91dfd78addb49e0b49b from branch mongodb-3.6
ref: 7aaeaaa054..47e8c3d1d2
for: 3.5.9
WT-2596 Document behavior after a crash with a backup cursor open
WT-3169 Add ability to log a message when the lookaside file is used
WT-3326 workgen: run wtperf "runner" files
WT-3332 Add statistic tracking connection wide transaction conflicts
WT-3346 workgen: create JSON output for latency sampling
WT-3349 Add timing stats to rwlocks
WT-3361 Resolve Windows build warnings, build more test programs on Windows.
WT-3362 Cursor opens should never block for the duration of a checkpoint
WT-3369 WT_CURSOR->uri should always match the URI used to open the cursor
Diffstat (limited to 'src/third_party/wiredtiger/bench')
12 files changed, 770 insertions, 147 deletions
diff --git a/src/third_party/wiredtiger/bench/workgen/runner/runner/__init__.py b/src/third_party/wiredtiger/bench/workgen/runner/runner/__init__.py index 67b547bc51b..ed21fffe8dc 100644 --- a/src/third_party/wiredtiger/bench/workgen/runner/runner/__init__.py +++ b/src/third_party/wiredtiger/bench/workgen/runner/runner/__init__.py @@ -88,5 +88,5 @@ except: shutil.rmtree('WT_TEST', True) os.mkdir('WT_TEST') -from .core import txn, extensions_config +from .core import txn, extensions_config, op_group_transaction, op_log_like, op_multi_table from .latency import workload_latency diff --git a/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py b/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py index a0f0d4d77cd..2c8311c4ca7 100644 --- a/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py +++ b/src/third_party/wiredtiger/bench/workgen/runner/runner/core.py @@ -29,12 +29,12 @@ # runner/core.py # Core functions available to all runners import glob, os -import workgen +from workgen import Key, Operation, OpList, Table, Transaction, Value # txn -- # Put the operation (and any suboperations) within a transaction. def txn(op, config=None): - t = workgen.Transaction(config) + t = Transaction(config) op._transaction = t return op @@ -99,3 +99,103 @@ def extensions_config(exts): if len(extfiles) != 0: result = ',extensions=[' + ','.join(extfiles.values()) + ']' return result + +def _op_multi_table_as_list(ops_arg, tables): + result = [] + if ops_arg._optype != Operation.OP_NONE: + for table in tables: + result.append(Operation(ops_arg._optype, table, ops_arg._key, ops_arg._value)) + else: + for op in ops._group: + result.extend(_op_multi_table_as_list(op, tables)) + return result + +# A convenient way to build a list of operations +def op_append(op1, op2): + if op1 == None: + op1 = op2 + else: + op1 += op2 + return op1 + +# Emulate wtperf's table_count option. Spread the given operations over +# a set of tables. +def op_multi_table(ops_arg, tables): + ops = None + for op in _op_multi_table_as_list(ops_arg, tables): + ops = op_append(ops, op) + return ops + +# should be 8 bytes format 'Q' +_logkey = Key(Key.KEYGEN_APPEND, 8) +def _op_log_op(op, log_table): + keysize = op._key._size + if keysize == 0: + keysize = op._table.options.key_size + valuesize = op._value._size + if valuesize == 0: + valuesize = op._table.options.value_size + v = Value(keysize + valuesize) + return Operation(Operation.OP_INSERT, log_table, _logkey, v) + +def _optype_is_write(optype): + return optype == Operation.OP_INSERT or optype == Operation.OP_UPDATE or \ + optype == Operation.OP_REMOVE + +# Emulate wtperf's log_like option. For all operations, add a second +# insert operation going to a log table. +def op_log_like(op, log_table, ops_per_txn): + if op._optype != Operation.OP_NONE: + if _optype_is_write(op._optype): + op += _op_log_op(op, log_table) + if ops_per_txn == 0: + op = txn(op) # txn for each action. + else: + oplist = [] + for op2 in op._group: + if op2._optype == Operation.OP_NONE: + oplist.append(op_log_like(op2, log_table)) + elif ops_per_txn == 0 and _optype_is_write(op2._optype): + op2 += _op_log_op(op2, log_table) + oplist.append(txn(op2)) # txn for each action. + else: + oplist.append(op2) + if _optype_is_write(op2._optype): + oplist.append(_op_log_op(op2, log_table)) + op._group = OpList(oplist) + return op + +def _op_transaction_list(oplist, txn_config): + result = None + for op in oplist: + result = op_append(result, op) + return txn(result, txn_config) + +# Emulate wtperf's ops_per_txn option. Create transactions around +# groups of operations of the indicated size. +def op_group_transaction(ops_arg, ops_per_txn, txn_config): + if ops_arg != Operation.OP_NONE: + return txn(ops_arg, txn_config) + if ops_arg._transaction != None: + raise Exception('nested transactions not supported') + if ops_arg._repeatgroup != None: + raise Exception('grouping transactions with multipliers not supported') + + oplist = [] + ops = None + nops = 0 + txgroup = [] + for op in ops_arg._group: + if op.optype == Operation.OP_NONE: + oplist.append(_op_transaction_list(txgroup, txn_config)) + txgroup = [] + oplist.append(op) + else: + txgroup.append(op) + if len(txgroup) >= ops_per_txn: + oplist.append(_op_transaction_list(txgroup, txn_config)) + txgroup = [] + if len(txgroup) > 0: + oplist.append(_op_transaction_list(txgroup, txn_config)) + ops_arg._group = OpList(oplist) + return ops_arg diff --git a/src/third_party/wiredtiger/bench/workgen/runner/workgen_stat.sh b/src/third_party/wiredtiger/bench/workgen/runner/workgen_stat.sh new file mode 100755 index 00000000000..1739c29859e --- /dev/null +++ b/src/third_party/wiredtiger/bench/workgen/runner/workgen_stat.sh @@ -0,0 +1,75 @@ +#!/bin/bash +# +# workgen_stat.sh - combine JSON time series output from WT and workgen. +# +Usage() { + cat <<EOF +Usage: $0 [ options ] +Options: + -h <WT_home_directory> # set the WiredTiger home directory + -e <analyzer_name> # run analyzer on the combined files + -o <output_file> # output file for result + +At least one of '-t2' or '-o' must be selected. +EOF + exit 1 +} + +Filter() { + sed -e 's/"version" *: *"[^"]*",//' "$@" +} + +wthome=. +outfile= +analyze= + +while [ "$#" != 0 ]; do + arg="$1" + shift + case "$arg" in + -h ) + if [ $# = 0 ]; then + Usage + fi + wthome="$1" + shift + ;; + -o ) + if [ $# = 0 ]; then + Usage + fi + outfile="$1" + shift + ;; + -e ) + if [ $# = 0 ]; then + Usage + fi + analyze="$1" + shift + ;; + esac +done +if [ ! -d "$wthome" ]; then + echo "$wthome: WT home directory does not exist" + exit 1 +fi +if [ ! -f "$wthome/WiredTiger.wt" ]; then + echo "$wthome: directory is not a WiredTiger home directory" + exit 1 +fi +if [ "$outfile" = '' ]; then + if [ "$analyze" = false ]; then + Usage + fi + outfile="$wthome/stat_tmp.json" +fi +(cd $wthome; Filter WiredTigerStat.* sample.json) | sort > $outfile +if [ "$analyze" != '' ]; then + sysname=`uname -s` + if [ "$sysname" = Darwin ]; then + open -a "$analyze" "$outfile" + else + "$analyze" "$outfile" + fi +fi diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.cxx b/src/third_party/wiredtiger/bench/workgen/workgen.cxx index c56acfd2989..880b8ca6467 100644 --- a/src/third_party/wiredtiger/bench/workgen/workgen.cxx +++ b/src/third_party/wiredtiger/bench/workgen/workgen.cxx @@ -267,16 +267,18 @@ int ContextInternal::create_all() { } Monitor::Monitor(WorkloadRunner &wrunner) : - _errno(0), _exception(), _wrunner(wrunner), _stop(false), _handle() {} + _errno(0), _exception(), _wrunner(wrunner), _stop(false), _handle(), + _out(NULL), _json(NULL) {} Monitor::~Monitor() {} int Monitor::run() { struct timespec t; struct tm *tm, _tm; - char time_buf[64]; + char time_buf[64], version[100]; Stats prev_totals; WorkloadOptions *options = &_wrunner._workload->options; uint64_t latency_max = (uint64_t)options->max_latency; + bool first; (*_out) << "#time," << "totalsec," @@ -295,6 +297,8 @@ int Monitor::run() { << "update maximum latency(uS)" << std::endl; + first = true; + workgen_version(version, sizeof(version)); Stats prev_interval; while (!_stop) { for (int i = 0; i < options->sample_interval && !_stop; i++) @@ -337,6 +341,32 @@ int Monitor::run() { << "," << interval.update.max_latency << std::endl; + if (_json != NULL) { +#define WORKGEN_TIMESTAMP_JSON "%Y-%m-%dT%H:%M:%S.000Z" + (void)strftime(time_buf, sizeof(time_buf), + WORKGEN_TIMESTAMP_JSON, tm); + +#define TRACK_JSON(name, t) \ + "\"" << (name) << "\":{" \ + << "\"ops per sec\":" << ((t).ops / interval_secs) \ + << ",\"average latency\":" << (t).average_latency() \ + << ",\"min latency\":" << (t).min_latency \ + << ",\"max latency\":" << (t).max_latency \ + << "}" + + (*_json) << "{"; + if (first) { + (*_json) << "\"version\":\"" << version << "\","; + first = false; + } + (*_json) << "\"localTime\":\"" << time_buf + << "\",\"workgen\":{" + << TRACK_JSON("read", interval.read) << "," + << TRACK_JSON("insert", interval.insert) << "," + << TRACK_JSON("update", interval.update) + << "}}" << std::endl; + } + uint64_t read_max = interval.read.max_latency; uint64_t insert_max = interval.read.max_latency; uint64_t update_max = interval.read.max_latency; @@ -1315,8 +1345,8 @@ TableInternal::TableInternal(const TableInternal &other) : _tint(other._tint), TableInternal::~TableInternal() {} WorkloadOptions::WorkloadOptions() : max_latency(0), - report_file("workload.stat"), report_interval(0), - run_time(0), sample_interval(0), sample_rate(1), + report_file("workload.stat"), report_interval(0), run_time(0), + sample_file("sample.json"), sample_interval(0), sample_rate(1), _options() { _options.add_int("max_latency", max_latency, "prints warning if any latency measured exceeds this number of " @@ -1329,6 +1359,11 @@ WorkloadOptions::WorkloadOptions() : max_latency(0), "The file name is relative to the connection's home directory. " "When set to the empty string, stdout is used."); _options.add_int("run_time", run_time, "total workload seconds"); + _options.add_string("sample_file", sample_file, + "file name for collecting latency output in a JSON-like format, " + "enabled by the report_interval option. " + "The file name is relative to the connection's home directory. " + "When set to the empty string, no JSON is emitted."); _options.add_int("sample_interval", sample_interval, "performance logging every interval seconds, 0 to disable"); _options.add_int("sample_rate", sample_rate, @@ -1492,6 +1527,7 @@ int WorkloadRunner::run_all() { WorkloadOptions *options = &_workload->options; Monitor monitor(*this); std::ofstream monitor_out; + std::ofstream monitor_json; std::ostream &out = *_report_out; WT_DECL_RET; @@ -1510,6 +1546,12 @@ int WorkloadRunner::run_all() { open_report_file(monitor_out, "monitor", "monitor output file"); monitor._out = &monitor_out; + if (!options->sample_file.empty()) { + open_report_file(monitor_json, options->sample_file.c_str(), + "sample JSON output file"); + monitor._json = &monitor_json; + } + if ((ret = pthread_create(&monitor._handle, NULL, monitor_main, &monitor)) != 0) { std::cerr << "monitor thread failed err=" << ret << std::endl; @@ -1588,6 +1630,10 @@ int WorkloadRunner::run_all() { << std::endl; if (exception == NULL && !monitor._exception._str.empty()) exception = &monitor._exception; + + monitor_out.close(); + if (!options->sample_file.empty()) + monitor_json.close(); } // issue the final report diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.h b/src/third_party/wiredtiger/bench/workgen/workgen.h index c1ae01ed5a4..c7be8ee0035 100644 --- a/src/third_party/wiredtiger/bench/workgen/workgen.h +++ b/src/third_party/wiredtiger/bench/workgen/workgen.h @@ -358,6 +358,7 @@ struct WorkloadOptions { int run_time; int sample_interval; int sample_rate; + std::string sample_file; WorkloadOptions(); WorkloadOptions(const WorkloadOptions &other); diff --git a/src/third_party/wiredtiger/bench/workgen/workgen_func.c b/src/third_party/wiredtiger/bench/workgen/workgen_func.c index 2e1271a515e..5ce2146a8e4 100644 --- a/src/third_party/wiredtiger/bench/workgen/workgen_func.c +++ b/src/third_party/wiredtiger/bench/workgen/workgen_func.c @@ -87,3 +87,16 @@ workgen_u64_to_string_zf(uint64_t n, char *buf, size_t len) { u64_to_string_zf(n, buf, len); } + +#define WORKGEN_VERSION_PREFIX "workgen-" +extern void +workgen_version(char *buf, size_t len) +{ + size_t prefix_len; + + prefix_len = strlen(WORKGEN_VERSION_PREFIX); + (void)strncpy(buf, WORKGEN_VERSION_PREFIX, len); + if (len > prefix_len) + (void)strncpy(&buf[prefix_len], WIREDTIGER_VERSION_STRING, + len - prefix_len); +} diff --git a/src/third_party/wiredtiger/bench/workgen/workgen_func.h b/src/third_party/wiredtiger/bench/workgen/workgen_func.h index 20ebf2632cc..ec7ecf0a504 100644 --- a/src/third_party/wiredtiger/bench/workgen/workgen_func.h +++ b/src/third_party/wiredtiger/bench/workgen/workgen_func.h @@ -42,3 +42,5 @@ extern void workgen_random_free(struct workgen_random_state *rnd_state); extern void workgen_u64_to_string_zf(uint64_t n, char *buf, size_t len); +extern void +workgen_version(char *buf, size_t len); diff --git a/src/third_party/wiredtiger/bench/workgen/workgen_int.h b/src/third_party/wiredtiger/bench/workgen/workgen_int.h index 01fb727691b..9283aea1d7b 100644 --- a/src/third_party/wiredtiger/bench/workgen/workgen_int.h +++ b/src/third_party/wiredtiger/bench/workgen/workgen_int.h @@ -146,6 +146,7 @@ struct Monitor { volatile bool _stop; pthread_t _handle; std::ostream *_out; + std::ostream *_json; Monitor(WorkloadRunner &wrunner); ~Monitor(); diff --git a/src/third_party/wiredtiger/bench/workgen/wtperf.py b/src/third_party/wiredtiger/bench/workgen/wtperf.py new file mode 100644 index 00000000000..3a196fe7b57 --- /dev/null +++ b/src/third_party/wiredtiger/bench/workgen/wtperf.py @@ -0,0 +1,440 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2017 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# + +# wtperf.py +# A partial emulation of wtperf. Translates a .wtperf file into a Python +# script that uses the workgen module, and runs the script. Errors are +# issued for any .wtperf directives that are not known. +# See also the usage() function. +# +from __future__ import print_function +import os, sys, tempfile + +def eprint(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) + +class OptionValue: + def __init__(self, value, filename, linenum): + self.value = value + self.filename = filename + self.linenum = linenum + +class TranslateException(Exception): + pass + +class Options(object): + pass + +class Translator: + def __init__(self, filename, prefix, verbose): + self.filename = filename + self.prefix = prefix + self.verbose = verbose + self.linenum = 0 + self.opts = {} + self.used_opts = {} + self.has_error = False + + def error_file_line(self, fname, linenum, msg): + self.has_error = True + eprint(fname + ':' + str(linenum) + ': error: ' + msg) + + # Report an error and continue + def error(self, msg): + self.error_file_line(self.filename, self.linenum, msg) + + # Report an error and unwind the stack + def fatal_error(self, msg, errtype): + self.error(msg) + raise TranslateException(errtype) + + supported_opt_list = [ 'compression', 'conn_config', 'icount', + 'key_sz', 'log_like_table', + 'populate_ops_per_txn', 'populate_threads', + 'reopen_connection', + 'table_config', 'table_count', + 'threads', 'transaction_config', 'value_sz' ] + + def set_opt(self, optname, val): + if optname not in self.supported_opt_list: + self.error("unknown option: " + optname) + return + elif val[0] == '"' and val[-1] == '"': + v = val[1:-1] + elif val == 'true': + v = True + elif val == 'false': + v = False + elif val[0] == '(': + v = val # config string stored as is + else: + try: + v = int(val) # it might be an integer + except ValueError: + v = val # it's a string after all + self.opts[optname] = OptionValue(v, self.filename, self.linenum) + + def get_opt(self, optname, dfault): + if optname in self.opts: + ret = self.opts[optname] + self.filename = ret.filename + self.linenum = ret.linenum + self.used_opts[optname] = 1 + return ret.value + else: + return dfault + + def get_int_opt(self, optname, dfault): + return self.get_opt(optname, dfault) + 0 + + def get_boolean_opt(self, optname, dfault): + return not not self.get_opt(optname, dfault) + + # Split a string 'left_side=right_side' into two parts + def split_assign(self, s): + equalpos = s.find('=') + if equalpos < 0: + self.error("missing '=' for line: " + line) + return (None, None) + else: + return s.split('=', 1) + + # Split a config string honoring nesting e.g. + # "(abc=123,def=234,ghi=(hi=1,bye=2))" would return 3 items. + def split_config_parens(self, s): + if s[0:1] != '(': + import pdb + pdb.set_trace() + self.fatal_error('missing left paren', 'config parse error') + if s[-1:] != ')': + self.fatal_error('missing right paren', 'config parse error') + s = s[1:-1] + result = [] + level = 0 + cur = '' + for ch in s: + if ch == ',' and level == 0: + result.append(cur) + cur = '' + else: + cur += ch + if ch == '(': + level += 1 + elif ch == ')': + level -= 1 + if level < 0: + self.fatal_error('unbalanced paren', 'config parse error') + if level != 0: + self.fatal_error('unbalanced paren', 'config parse error') + if len(cur) != 0: + result.append(cur) + return result + + def assign_str(self, left, right): + return left + '=' + str(right) + '\n' + + def add_operation_str(self, count, opname, multi): + result = '' + tablename = 'tables[0]' if multi else 'table' + if count > 1: + result += str(count) + ' * ' + if count > 0: + result += 'Operation(Operation.' + opname + ', ' + \ + tablename + ') + \\\n' + result += ' ' + return result + + # Wtperf's throttle is based on the number of regular operations, + # not including log_like operations. Workgen counts all operations, + # it doesn't treat log operations any differently. Adjust the throttle + # number to account for the difference. + def calc_throttle(self, thread_opts, log_like_table): + throttle = thread_opts.throttle + if not log_like_table: + return (throttle, '') + modify = thread_opts.inserts + thread_opts.updates + regular = modify + thread_opts.reads + total = regular + modify + factor = (total + 0.0) / regular + new_throttle = int(throttle * factor) + if new_throttle == throttle: + comment = '' + else: + comment = '# wtperf throttle=' + str(throttle) + ' adjusted by ' + \ + str(factor) + ' to compensate for log_like operations.\n' + return (new_throttle, comment) + + def parse_threads(self, threads_config): + tdecls = '' + tlist = self.split_config_parens(threads_config) + table_count = self.get_int_opt('table_count', 1) + log_like_table = self.get_boolean_opt('log_like_table', False) + txn_config = self.get_opt('transaction_config', '') + if log_like_table: + tdecls += 'log_name = "table:log"\n' + tdecls += 's.create(log_name, "key_format=S,value_format=S," +' + \ + ' compress_table_config)\n' + tdecls += 'log_table = Table(log_name)\n\n' + thread_count = 0 + tnames = '' + multi = (table_count > 1) + for t in tlist: + thread_name = 'thread' + str(thread_count) + thread_count += 1 + + # For wtperf compatibility, we allow both 'insert/inserts' etc. + topts = Options() + topts.count = 1 + topts.insert = 0 + topts.inserts = 0 + topts.ops_per_txn = 0 + topts.read = 0 + topts.reads = 0 + topts.throttle = 0 + topts.update = 0 + topts.updates = 0 + + for o in self.split_config_parens(t): + (k, v) = self.split_assign(o) + if hasattr(topts, k): + try: + setattr(topts, k, int(v)) + except ValueError: + self.error('thread option ' + k + ': integer expected') + else: + self.error('unknown thread option: ' + k) + + topts.inserts += topts.insert; topts.insert = 0 + topts.updates += topts.update; topts.update = 0 + topts.reads += topts.read; topts.read = 0 + if topts.count == 0: + continue + + if topts.inserts + topts.reads + topts.updates == 0: + self.fatal_error('need read/insert/update/...', + 'thread config error') + tdecls += 'ops = ' + tdecls += self.add_operation_str(topts.inserts, 'OP_INSERT', multi) + tdecls += self.add_operation_str(topts.reads, 'OP_SEARCH', multi) + tdecls += self.add_operation_str(topts.updates, 'OP_UPDATE', multi) + tdecls = tdecls.rstrip(' \n\\+') + '\n' + if multi: + tdecls += 'ops = op_multi_table(ops, tables)\n' + if topts.ops_per_txn > 0: + tdecls += 'ops = op_group_transaction(ops, ' + \ + str(topts.ops_per_txn) + ', "' + txn_config + '")\n' + if log_like_table: + tdecls += 'ops = op_log_like(ops, log_table, ' + \ + str(topts.ops_per_txn) + ')\n' + tdecls += thread_name + ' = Thread(ops)\n' + if topts.throttle > 0: + (throttle, comment) = self.calc_throttle(topts, log_like_table) + tdecls += comment + tdecls += self.assign_str(thread_name + '.options.throttle', + throttle) + tdecls += '\n' + if topts.count > 1: + tnames += str(topts.count) + ' * ' + tnames += thread_name + ' + ' + + tnames = tnames.rstrip(' +') + return (tdecls, tnames) + + def translate(self): + try: + return self.translate_inner() + except TranslateException: + # An error has already been reported + return None + + def translate_inner(self): + workloadopts = '' + with open(self.filename) as fin: + for line in fin: + self.linenum += 1 + commentpos = line.find('#') + if commentpos >= 0: + line = line[0:commentpos] + line = line.strip() + if len(line) == 0: + continue + (key, val) = self.split_assign(line) + if key in [ 'max_latency', 'report_file', 'report_interval', + 'run_time', 'sample_interval', 'sample_rate' ]: + workloadopts += 'workload.options.' + key + '=' + val + '\n' + else: + self.set_opt(key, val) + + table_count = self.get_int_opt('table_count', 1) + conn_config = self.get_opt('conn_config', '') + table_config = self.get_opt('table_config', '') + key_sz = self.get_int_opt('key_sz', 20) + value_sz = self.get_int_opt('value_sz', 100) + reopen = self.get_boolean_opt('reopen_connection', False) + compression = self.get_opt('compression', '') + txn_config = self.get_opt('transaction_config', '') + + s = '#/usr/bin/env python\n' + s += '# generated from ' + self.filename + '\n' + s += self.prefix + s += 'from runner import *\n' + s += 'from wiredtiger import *\n' + s += 'from workgen import *\n' + s += '\n' + s += 'context = Context()\n' + s += 'conn_config = "' + conn_config + '"\n' + if compression != '': + s += 'conn_config += extensions_config(["compressors/' + \ + compression + '"])\n' + compression = 'block_compressor=' + compression + ',' + s += 'conn = wiredtiger_open("WT_TEST", "create," + conn_config)\n' + s += 's = conn.open_session()\n' + s += '\n' + s += 'wtperf_table_config = "key_format=S,value_format=S,type=lsm," +\\\n' + s += ' "exclusive=true,allocation_size=4kb," +\\\n' + s += ' "internal_page_max=64kb,leaf_page_max=4kb,split_pct=100,"\n' + s += 'compress_table_config = "' + compression + '"\n' + s += 'table_config = "' + table_config + '"\n' + if table_count == 1: + s += 'tname = "file:test.wt"\n' + s += 's.create(tname, wtperf_table_config +\\\n' + s += ' compress_table_config + table_config)\n' + s += 'table = Table(tname)\n' + s += 'table.options.key_size = ' + str(key_sz) + '\n' + s += 'table.options.value_size = ' + str(value_sz) + '\n' + else: + s += 'table_count = ' + str(table_count) + '\n' + s += 'tables = []\n' + s += 'for i in range(0, table_count):\n' + s += ' tname = "file:test" + str(i) + ".wt"\n' + s += ' s.create(tname, ' + \ + 'wtperf_table_config + ' + \ + 'compress_table_config + table_config)\n' + s += ' t = Table(tname)\n' + s += ' t.options.key_size = ' + str(key_sz) + '\n' + s += ' t.options.value_size = ' + str(value_sz) + '\n' + s += ' tables.append(t)\n' + s += '\n' + + icount = self.get_int_opt('icount', 0) + pop_thread = self.get_int_opt('populate_threads', 1) + pop_per_txn = self.get_int_opt('populate_ops_per_txn', 0) + if icount != 0: + if pop_thread == 0: + self.fatal_error('icount != 0 and populate_threads == 0: ' +\ + 'cannot populate entries with no threads') + elif pop_thread == 1: + mult = '' + else: + mult = str(pop_thread) + ' * ' + + # if there are multiple tables to be filled during populate, + # the icount is split between them all. + nops_per_thread = icount / (pop_thread * table_count) + if table_count == 1: + s += 'pop_ops = Operation(Operation.OP_INSERT, table)\n' + else: + s += 'pop_ops = Operation(Operation.OP_INSERT, tables[0])\n' + s += 'pop_ops = op_multi_table(pop_ops, tables)\n' + if pop_per_txn > 0: + s += 'pop_ops = op_group_transaction(pop_ops, ' + \ + str(pop_per_txn) + ', "' + txn_config + '")\n' + s += 'pop_thread = Thread(pop_ops * ' + str(nops_per_thread) + ')\n' + s += 'pop_workload = Workload(context, ' + mult + 'pop_thread)\n' + if self.verbose > 0: + s += 'print("populate:")\n' + s += 'pop_workload.run(conn)\n' + else: + if self.get_int_opt('populate_threads', 0) != 0: + self.error("populate_threads > 0, icount == 0") + + thread_config = self.get_opt('threads', '') + if thread_config != '': + (t_create, t_var) = self.parse_threads(thread_config) + s += '\n' + t_create + if reopen: + s += '\n# reopen the connection\n' + s += 'conn.close()\n' + s += 'conn = wiredtiger_open(' + \ + '"WT_TEST", "create," + conn_config)\n' + s += '\n' + s += 'workload = Workload(context, ' + t_var + ')\n' + s += workloadopts + if self.verbose > 0: + s += 'print("workload:")\n' + s += 'workload.run(conn)\n' + + for o in self.used_opts: + del self.opts[o] + if len(self.opts) != 0: + self.error('internal error, options not handled: ' + str(self.opts)) + return s + +def usage(): + eprint(( + 'Usage: python wtperf.py [ options ] file.wtperf ...\n' + '\n' + 'Options:\n' + ' --python Python output generated on stdout\n' + ' -v --verbose Verbose output\n' + '\n' + 'If --python is not specified, the resulting workload is run.')) + +verbose = 0 +py_out = False +workgen_dir = os.path.dirname(os.path.abspath(__file__)) +runner_dir = os.path.join(workgen_dir, 'runner') +prefix = ( + '# The next lines are unneeded if this script is in the runner directory.\n' + 'import sys\n' + 'sys.path.append("' + runner_dir + '")\n\n') + +exit_status = 0 +for arg in sys.argv[1:]: + if arg == '--python': + py_out = True + elif arg == '--verbose' or arg == '-v': + verbose += 1 + elif arg.endswith('.wtperf'): + translator = Translator(arg, prefix, verbose) + pysrc = translator.translate() + if translator.has_error: + exit_status = 1 + elif py_out: + print(pysrc) + else: + (outfd, tmpfile) = tempfile.mkstemp(suffix='.py') + os.write(outfd, pysrc) + os.close(outfd) + execfile(tmpfile) + os.remove(tmpfile) + else: + usage() + sys.exit(1) +sys.exit(exit_status) diff --git a/src/third_party/wiredtiger/bench/wtperf/idle_table_cycle.c b/src/third_party/wiredtiger/bench/wtperf/idle_table_cycle.c index ce64049ce89..d0baa786ba9 100644 --- a/src/third_party/wiredtiger/bench/wtperf/idle_table_cycle.c +++ b/src/third_party/wiredtiger/bench/wtperf/idle_table_cycle.c @@ -57,7 +57,7 @@ check_timing(WTPERF *wtperf, * Measure how long each step takes, and flag an error if it exceeds the * configured maximum. */ -static void * +static WT_THREAD_RET cycle_idle_tables(void *arg) { struct timespec start, stop; @@ -76,7 +76,7 @@ cycle_idle_tables(void *arg) wtperf->conn, NULL, opts->sess_config, &session)) != 0) { lprintf(wtperf, ret, 0, "Error opening a session on %s", wtperf->home); - return (NULL); + return (WT_THREAD_RET_VALUE); } for (cycle_count = 0; wtperf->idle_cycle_run; ++cycle_count) { @@ -96,10 +96,10 @@ cycle_idle_tables(void *arg) lprintf(wtperf, ret, 0, "Table create failed in cycle_idle_tables."); wtperf->error = true; - return (NULL); + return (WT_THREAD_RET_VALUE); } if (check_timing(wtperf, "create", start, &stop) != 0) - return (NULL); + return (WT_THREAD_RET_VALUE); start = stop; /* Open and close cursor. */ @@ -108,16 +108,16 @@ cycle_idle_tables(void *arg) lprintf(wtperf, ret, 0, "Cursor open failed in cycle_idle_tables."); wtperf->error = true; - return (NULL); + return (WT_THREAD_RET_VALUE); } if ((ret = cursor->close(cursor)) != 0) { lprintf(wtperf, ret, 0, "Cursor close failed in cycle_idle_tables."); wtperf->error = true; - return (NULL); + return (WT_THREAD_RET_VALUE); } if (check_timing(wtperf, "cursor", start, &stop) != 0) - return (NULL); + return (WT_THREAD_RET_VALUE); start = stop; #if 1 @@ -133,14 +133,14 @@ cycle_idle_tables(void *arg) lprintf(wtperf, ret, 0, "Table drop failed in cycle_idle_tables."); wtperf->error = true; - return (NULL); + return (WT_THREAD_RET_VALUE); } if (check_timing(wtperf, "drop", start, &stop) != 0) - return (NULL); + return (WT_THREAD_RET_VALUE); #endif } - return (NULL); + return (WT_THREAD_RET_VALUE); } /* @@ -150,47 +150,33 @@ cycle_idle_tables(void *arg) * structure. Should reshuffle the configuration structure so explicit static * initialization isn't necessary. */ -int -start_idle_table_cycle(WTPERF *wtperf, pthread_t *idle_table_cycle_thread) +void +start_idle_table_cycle(WTPERF *wtperf, wt_thread_t *idle_table_cycle_thread) { CONFIG_OPTS *opts; - pthread_t thread_id; - int ret; + wt_thread_t thread_id; opts = wtperf->opts; if (opts->idle_table_cycle == 0) - return (0); + return; wtperf->idle_cycle_run = true; - if ((ret = pthread_create( - &thread_id, NULL, cycle_idle_tables, wtperf)) != 0) { - lprintf(wtperf, - ret, 0, "Error creating idle table cycle thread."); - wtperf->idle_cycle_run = false; - return (ret); - } + testutil_check(__wt_thread_create( + NULL, &thread_id, cycle_idle_tables, wtperf)); *idle_table_cycle_thread = thread_id; - - return (0); } -int -stop_idle_table_cycle(WTPERF *wtperf, pthread_t idle_table_cycle_thread) +void +stop_idle_table_cycle(WTPERF *wtperf, wt_thread_t idle_table_cycle_thread) { CONFIG_OPTS *opts; - int ret; opts = wtperf->opts; if (opts->idle_table_cycle == 0 || !wtperf->idle_cycle_run) - return (0); + return; wtperf->idle_cycle_run = false; - if ((ret = pthread_join(idle_table_cycle_thread, NULL)) != 0) { - lprintf( - wtperf, ret, 0, "Error joining idle table cycle thread."); - return (ret); - } - return (0); + testutil_check(__wt_thread_join(NULL, idle_table_cycle_thread)); } diff --git a/src/third_party/wiredtiger/bench/wtperf/wtperf.c b/src/third_party/wiredtiger/bench/wtperf/wtperf.c index 68bc08226c2..a8d3f135280 100644 --- a/src/third_party/wiredtiger/bench/wtperf/wtperf.c +++ b/src/third_party/wiredtiger/bench/wtperf/wtperf.c @@ -32,23 +32,23 @@ #define DEFAULT_HOME "WT_TEST" #define DEFAULT_MONITOR_DIR "WT_TEST" -static void *checkpoint_worker(void *); +static WT_THREAD_RET checkpoint_worker(void *); static int drop_all_tables(WTPERF *); static int execute_populate(WTPERF *); static int execute_workload(WTPERF *); static int find_table_count(WTPERF *); -static void *monitor(void *); -static void *populate_thread(void *); +static WT_THREAD_RET monitor(void *); +static WT_THREAD_RET populate_thread(void *); static void randomize_value(WTPERF_THREAD *, char *); static void recreate_dir(const char *); static int start_all_runs(WTPERF *); static int start_run(WTPERF *); -static int start_threads(WTPERF *, - WORKLOAD *, WTPERF_THREAD *, u_int, void *(*)(void *)); -static int stop_threads(WTPERF *, u_int, WTPERF_THREAD *); -static void *thread_run_wtperf(void *); +static void start_threads(WTPERF *, WORKLOAD *, + WTPERF_THREAD *, u_int, WT_THREAD_CALLBACK(*)(void *)); +static void stop_threads(u_int, WTPERF_THREAD *); +static WT_THREAD_RET thread_run_wtperf(void *); static void update_value_delta(WTPERF_THREAD *); -static void *worker(void *); +static WT_THREAD_RET worker(void *); static uint64_t wtperf_rand(WTPERF_THREAD *); static uint64_t wtperf_value_range(WTPERF *); @@ -312,7 +312,7 @@ op_name(uint8_t *op) /* NOTREACHED */ } -static void * +static WT_THREAD_RET worker_async(void *arg) { CONFIG_OPTS *opts; @@ -420,7 +420,7 @@ op_err: lprintf(wtperf, ret, 0, if (0) { err: wtperf->error = wtperf->stop = true; } - return (NULL); + return (WT_THREAD_RET_VALUE); } /* @@ -513,7 +513,7 @@ err: lprintf(wtperf, ret, 0, "Pre-workload traverse error"); return (ret); } -static void * +static WT_THREAD_RET worker(void *arg) { struct timespec start, stop; @@ -893,7 +893,7 @@ err: wtperf->error = wtperf->stop = true; } free(cursors); - return (NULL); + return (WT_THREAD_RET_VALUE); } /* @@ -1014,7 +1014,7 @@ run_mix_schedule(WTPERF *wtperf, WORKLOAD *workp) return (0); } -static void * +static WT_THREAD_RET populate_thread(void *arg) { struct timespec start, stop; @@ -1163,10 +1163,10 @@ err: wtperf->error = wtperf->stop = true; } free(cursors); - return (NULL); + return (WT_THREAD_RET_VALUE); } -static void * +static WT_THREAD_RET populate_async(void *arg) { struct timespec start, stop; @@ -1261,10 +1261,10 @@ populate_async(void *arg) if (0) { err: wtperf->error = wtperf->stop = true; } - return (NULL); + return (WT_THREAD_RET_VALUE); } -static void * +static WT_THREAD_RET monitor(void *arg) { struct timespec t; @@ -1426,10 +1426,10 @@ err: wtperf->error = wtperf->stop = true; (void)fclose(fp); free(path); - return (NULL); + return (WT_THREAD_RET_VALUE); } -static void * +static WT_THREAD_RET checkpoint_worker(void *arg) { CONFIG_OPTS *opts; @@ -1490,7 +1490,7 @@ checkpoint_worker(void *arg) err: wtperf->error = wtperf->stop = true; } - return (NULL); + return (WT_THREAD_RET_VALUE); } static int @@ -1498,15 +1498,15 @@ execute_populate(WTPERF *wtperf) { struct timespec start, stop; CONFIG_OPTS *opts; - WTPERF_THREAD *popth; WT_ASYNC_OP *asyncop; - pthread_t idle_table_cycle_thread; + WTPERF_THREAD *popth; + WT_THREAD_CALLBACK(*pfunc)(void *); size_t i; uint64_t last_ops, msecs, print_ops_sec; uint32_t interval, tables; + wt_thread_t idle_table_cycle_thread; double print_secs; int elapsed, ret; - void *(*pfunc)(void *); opts = wtperf->opts; @@ -1516,9 +1516,7 @@ execute_populate(WTPERF *wtperf) opts->populate_threads, opts->icount); /* Start cycling idle tables if configured. */ - if ((ret = - start_idle_table_cycle(wtperf, &idle_table_cycle_thread)) != 0) - return (ret); + start_idle_table_cycle(wtperf, &idle_table_cycle_thread); wtperf->insert_key = 0; @@ -1530,9 +1528,8 @@ execute_populate(WTPERF *wtperf) pfunc = populate_async; } else pfunc = populate_thread; - if ((ret = start_threads(wtperf, NULL, - wtperf->popthreads, opts->populate_threads, pfunc)) != 0) - return (ret); + start_threads(wtperf, NULL, + wtperf->popthreads, opts->populate_threads, pfunc); __wt_epoch(NULL, &start); for (elapsed = 0, interval = 0, last_ops = 0; @@ -1568,10 +1565,8 @@ execute_populate(WTPERF *wtperf) */ popth = wtperf->popthreads; wtperf->popthreads = NULL; - ret = stop_threads(wtperf, opts->populate_threads, popth); + stop_threads(opts->populate_threads, popth); free(popth); - if (ret != 0) - return (ret); /* Report if any worker threads didn't finish. */ if (wtperf->error) { @@ -1640,8 +1635,7 @@ execute_populate(WTPERF *wtperf) } /* Stop cycling idle tables. */ - if ((ret = stop_idle_table_cycle(wtperf, idle_table_cycle_thread)) != 0) - return (ret); + stop_idle_table_cycle(wtperf, idle_table_cycle_thread); return (0); } @@ -1701,13 +1695,13 @@ execute_workload(WTPERF *wtperf) WTPERF_THREAD *threads; WT_CONNECTION *conn; WT_SESSION **sessions; - pthread_t idle_table_cycle_thread; + WT_THREAD_CALLBACK(*pfunc)(void *); + wt_thread_t idle_table_cycle_thread; uint64_t last_ckpts, last_inserts, last_reads, last_truncates; uint64_t last_updates; uint32_t interval, run_ops, run_time; u_int i; - int ret, t_ret; - void *(*pfunc)(void *); + int ret; opts = wtperf->opts; @@ -1722,9 +1716,7 @@ execute_workload(WTPERF *wtperf) sessions = NULL; /* Start cycling idle tables. */ - if ((ret = - start_idle_table_cycle(wtperf, &idle_table_cycle_thread)) != 0) - return (ret); + start_idle_table_cycle(wtperf, &idle_table_cycle_thread); if (opts->warmup != 0) wtperf->in_warmup = true; @@ -1768,9 +1760,8 @@ execute_workload(WTPERF *wtperf) goto err; /* Start the workload's threads. */ - if ((ret = start_threads( - wtperf, workp, threads, (u_int)workp->threads, pfunc)) != 0) - goto err; + start_threads( + wtperf, workp, threads, (u_int)workp->threads, pfunc); threads += workp->threads; } @@ -1836,12 +1827,9 @@ execute_workload(WTPERF *wtperf) err: wtperf->stop = true; /* Stop cycling idle tables. */ - if ((ret = stop_idle_table_cycle(wtperf, idle_table_cycle_thread)) != 0) - return (ret); + stop_idle_table_cycle(wtperf, idle_table_cycle_thread); - if ((t_ret = stop_threads(wtperf, - (u_int)wtperf->workers_cnt, wtperf->workers)) != 0 && ret == 0) - ret = t_ret; + stop_threads((u_int)wtperf->workers_cnt, wtperf->workers); /* Drop tables if configured to and this isn't an error path */ if (ret == 0 && @@ -2163,9 +2151,9 @@ start_all_runs(WTPERF *wtperf) { CONFIG_OPTS *opts; WTPERF *next_wtperf, **wtperfs; - pthread_t *threads; size_t i, len; - int ret, t_ret; + wt_thread_t *threads; + int ret; opts = wtperf->opts; wtperfs = NULL; @@ -2178,7 +2166,7 @@ start_all_runs(WTPERF *wtperf) wtperfs = dcalloc(opts->database_count, sizeof(WTPERF *)); /* Allocate an array to hold our thread IDs. */ - threads = dcalloc(opts->database_count, sizeof(pthread_t)); + threads = dcalloc(opts->database_count, sizeof(*threads)); for (i = 0; i < opts->database_count; i++) { wtperf_copy(wtperf, &next_wtperf); @@ -2203,22 +2191,15 @@ start_all_runs(WTPERF *wtperf) strcmp(next_wtperf->home, next_wtperf->monitor_dir) != 0) recreate_dir(next_wtperf->monitor_dir); - if ((ret = pthread_create( - &threads[i], NULL, thread_run_wtperf, next_wtperf)) != 0) { - lprintf(wtperf, ret, 0, "Error creating thread"); - goto err; - } + testutil_check(__wt_thread_create(NULL, + &threads[i], thread_run_wtperf, next_wtperf)); } /* Wait for threads to finish. */ for (i = 0; i < opts->database_count; i++) - if ((t_ret = pthread_join(threads[i], NULL)) != 0) { - lprintf(wtperf, ret, 0, "Error joining thread"); - if (ret == 0) - ret = t_ret; - } + testutil_check(__wt_thread_join(NULL, threads[i])); -err: for (i = 0; i < opts->database_count && wtperfs[i] != NULL; i++) { + for (i = 0; i < opts->database_count && wtperfs[i] != NULL; i++) { wtperf_free(wtperfs[i]); free(wtperfs[i]); } @@ -2229,7 +2210,7 @@ err: for (i = 0; i < opts->database_count && wtperfs[i] != NULL; i++) { } /* Run an instance of wtperf for a given configuration. */ -static void * +static WT_THREAD_RET thread_run_wtperf(void *arg) { WTPERF *wtperf; @@ -2238,14 +2219,14 @@ thread_run_wtperf(void *arg) wtperf = (WTPERF *)arg; if ((ret = start_run(wtperf)) != 0) lprintf(wtperf, ret, 0, "Run failed for: %s.", wtperf->home); - return (NULL); + return (WT_THREAD_RET_VALUE); } static int start_run(WTPERF *wtperf) { CONFIG_OPTS *opts; - pthread_t monitor_thread; + wt_thread_t monitor_thread; uint64_t total_ops; uint32_t run_time; int monitor_created, ret, t_ret; @@ -2272,12 +2253,8 @@ start_run(WTPERF *wtperf) /* Start the monitor thread. */ if (opts->sample_interval != 0) { - if ((ret = pthread_create( - &monitor_thread, NULL, monitor, wtperf)) != 0) { - lprintf(wtperf, - ret, 0, "Error creating monitor thread."); - goto err; - } + testutil_check(__wt_thread_create( + NULL, &monitor_thread, monitor, wtperf)); monitor_created = 1; } @@ -2306,9 +2283,8 @@ start_run(WTPERF *wtperf) opts->checkpoint_threads); wtperf->ckptthreads = dcalloc( opts->checkpoint_threads, sizeof(WTPERF_THREAD)); - if (start_threads(wtperf, NULL, wtperf->ckptthreads, - opts->checkpoint_threads, checkpoint_worker) != 0) - goto err; + start_threads(wtperf, NULL, wtperf->ckptthreads, + opts->checkpoint_threads, checkpoint_worker); } if (opts->pre_load_data && (ret = pre_load_data(wtperf)) != 0) goto err; @@ -2362,16 +2338,10 @@ err: if (ret == 0) /* Notify the worker threads they are done. */ wtperf->stop = true; - if ((t_ret = stop_threads(wtperf, 1, wtperf->ckptthreads)) != 0) - if (ret == 0) - ret = t_ret; + stop_threads(1, wtperf->ckptthreads); - if (monitor_created != 0 && - (t_ret = pthread_join(monitor_thread, NULL)) != 0) { - lprintf(wtperf, ret, 0, "Error joining monitor thread."); - if (ret == 0) - ret = t_ret; - } + if (monitor_created != 0) + testutil_check(__wt_thread_join(NULL, monitor_thread)); if (wtperf->conn != NULL && opts->close_conn && (t_ret = wtperf->conn->close(wtperf->conn, NULL)) != 0) { @@ -2728,14 +2698,13 @@ err: wtperf_free(wtperf); return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE); } -static int -start_threads(WTPERF *wtperf, - WORKLOAD *workp, WTPERF_THREAD *base, u_int num, void *(*func)(void *)) +static void +start_threads(WTPERF *wtperf, WORKLOAD *workp, + WTPERF_THREAD *base, u_int num, WT_THREAD_CALLBACK(*func)(void *)) { CONFIG_OPTS *opts; WTPERF_THREAD *thread; u_int i; - int ret; opts = wtperf->opts; @@ -2779,29 +2748,20 @@ start_threads(WTPERF *wtperf, /* Start the threads. */ for (i = 0, thread = base; i < num; ++i, ++thread) - if ((ret = pthread_create( - &thread->handle, NULL, func, thread)) != 0) { - lprintf(wtperf, ret, 0, "Error creating thread"); - return (ret); - } - - return (0); + testutil_check(__wt_thread_create( + NULL, &thread->handle, func, thread)); } -static int -stop_threads(WTPERF *wtperf, u_int num, WTPERF_THREAD *threads) +static void +stop_threads(u_int num, WTPERF_THREAD *threads) { u_int i; - int ret; if (num == 0 || threads == NULL) - return (0); + return; for (i = 0; i < num; ++i, ++threads) { - if ((ret = pthread_join(threads->handle, NULL)) != 0) { - lprintf(wtperf, ret, 0, "Error joining thread"); - return (ret); - } + testutil_check(__wt_thread_join(NULL, threads->handle)); free(threads->key_buf); threads->key_buf = NULL; @@ -2815,7 +2775,6 @@ stop_threads(WTPERF *wtperf, u_int num, WTPERF_THREAD *threads) * being read by the monitor thread (among others). As a standalone * program, leaking memory isn't a concern, and it's simpler that way. */ - return (0); } static void diff --git a/src/third_party/wiredtiger/bench/wtperf/wtperf.h b/src/third_party/wiredtiger/bench/wtperf/wtperf.h index bd6c1e829ba..b17d082ddcf 100644 --- a/src/third_party/wiredtiger/bench/wtperf/wtperf.h +++ b/src/third_party/wiredtiger/bench/wtperf/wtperf.h @@ -232,7 +232,7 @@ struct __wtperf_thread { /* Per-thread structure */ WT_RAND_STATE rnd; /* Random number generation state */ - pthread_t handle; /* Handle */ + wt_thread_t handle; /* Handle */ char *key_buf, *value_buf; /* Key/value memory */ @@ -269,8 +269,8 @@ int run_truncate( int setup_log_file(WTPERF *); void setup_throttle(WTPERF_THREAD *); int setup_truncate(WTPERF *, WTPERF_THREAD *, WT_SESSION *); -int start_idle_table_cycle(WTPERF *, pthread_t *); -int stop_idle_table_cycle(WTPERF *, pthread_t); +void start_idle_table_cycle(WTPERF *, wt_thread_t *); +void stop_idle_table_cycle(WTPERF *, wt_thread_t); void worker_throttle(WTPERF_THREAD *); uint64_t sum_ckpt_ops(WTPERF *); uint64_t sum_insert_ops(WTPERF *); |