summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Gorrod <alexander.gorrod@mongodb.com>2017-05-15 16:24:39 -0400
committerGitHub <noreply@github.com>2017-05-15 16:24:39 -0400
commit0a0d1562708f78da356f5452d6ec7dbc203d1c2a (patch)
tree036499284f22bea534ccfda48b73d54f8c5ac18b
parentd2db615982db186e0291f05da9dbe758d36b20d3 (diff)
downloadmongo-0a0d1562708f78da356f5452d6ec7dbc203d1c2a.tar.gz
WT-3142 Add a workload generator application (#3408)
-rw-r--r--bench/workgen/Makefile.am30
-rwxr-xr-xbench/workgen/runner/example_simple.py31
-rw-r--r--bench/workgen/runner/example_txn.py29
-rw-r--r--bench/workgen/runner/insert_test.py94
-rw-r--r--bench/workgen/runner/multi_btree_heavy_stress.py102
-rw-r--r--bench/workgen/runner/runner/__init__.py92
-rw-r--r--bench/workgen/runner/runner/core.py101
-rw-r--r--bench/workgen/runner/runner/latency.py122
-rw-r--r--bench/workgen/runner/small_btree.py27
-rw-r--r--bench/workgen/setup.py70
-rw-r--r--bench/workgen/workgen.cxx1605
-rw-r--r--bench/workgen/workgen.h410
-rw-r--r--bench/workgen/workgen.swig233
-rw-r--r--bench/workgen/workgen/__init__.py42
-rw-r--r--bench/workgen/workgen_func.c89
-rw-r--r--bench/workgen/workgen_func.h44
-rw-r--r--bench/workgen/workgen_int.h205
-rw-r--r--bench/workgen/workgen_time.h201
-rw-r--r--build_posix/Make.subdirs6
-rw-r--r--build_posix/configure.ac.in1
-rwxr-xr-xbuild_posix/makemake16
-rw-r--r--dist/s_string.ok4
-rwxr-xr-xdist/s_whitespace1
23 files changed, 3550 insertions, 5 deletions
diff --git a/bench/workgen/Makefile.am b/bench/workgen/Makefile.am
new file mode 100644
index 00000000000..61512d65319
--- /dev/null
+++ b/bench/workgen/Makefile.am
@@ -0,0 +1,30 @@
+AM_CPPFLAGS = -I$(top_builddir)
+AM_CPPFLAGS += -I$(top_srcdir)/src/include
+AM_CPPFLAGS +=-I$(top_srcdir)/test/utility
+
+PYSRC = $(top_srcdir)/bench/workgen
+PYDIRS = -t $(abs_builddir) -I $(abs_top_srcdir):$(abs_top_builddir) -L $(abs_top_builddir)/.libs:$(abs_top_builddir)/bench/workgen/.libs
+all-local: _workgen.so libworkgen.la
+libworkgen_la_SOURCES = workgen.cxx workgen_func.c
+noinst_LTLIBRARIES = libworkgen.la
+
+# We keep generated Python sources under bench/workgen.
+$(PYSRC)/workgen_wrap.cxx: $(PYSRC)/workgen.h $(PYSRC)/workgen.swig
+ (cd $(PYSRC) && \
+ $(SWIG) -c++ -python -threads -O -Wall -I$(abs_top_builddir) -outdir ./workgen workgen.swig)
+
+_workgen.so: $(top_builddir)/libwiredtiger.la $(PYSRC)/workgen_wrap.cxx libworkgen.la $(PYSRC)/workgen.h $(PYSRC)/workgen_time.h
+ (cd $(PYSRC) && \
+ $(PYTHON) setup.py build_ext -f -b $(abs_builddir) $(PYDIRS))
+
+install-exec-local:
+ (cd $(PYSRC) && \
+ $(PYTHON) setup.py build_py -d $(abs_builddir)/build && \
+ $(PYTHON) setup.py build_ext -f -b $(abs_builddir)/build $(PYDIRS) && \
+ $(PYTHON) setup.py install_lib -b $(abs_builddir)/build --skip-build $(PYTHON_INSTALL_ARG))
+
+# We build in different places for an install vs running from the tree:
+# clean up both. Don't rely on "setup.py clean" -- everything that should
+# be removed is created under the build directory.
+clean-local:
+ rm -rf build _workgen.so workgen_wrap.o WT_TEST
diff --git a/bench/workgen/runner/example_simple.py b/bench/workgen/runner/example_simple.py
new file mode 100755
index 00000000000..de944cbe29e
--- /dev/null
+++ b/bench/workgen/runner/example_simple.py
@@ -0,0 +1,31 @@
+#!/usr/bin/python
+from runner import *
+from wiredtiger import *
+from workgen import *
+
+def show(tname):
+ print('')
+ print('<><><><> ' + tname + ' <><><><>')
+ c = s.open_cursor(tname, None)
+ for k,v in c:
+ print('key: ' + k)
+ print('value: ' + v)
+ print('<><><><><><><><><><><><>')
+ c.close()
+
+context = Context()
+conn = wiredtiger_open("WT_TEST", "create,cache_size=1G")
+s = conn.open_session()
+tname = 'table:simple'
+s.create(tname, 'key_format=S,value_format=S')
+
+ops = Operation(Operation.OP_INSERT, Table(tname), Key(Key.KEYGEN_APPEND, 10), Value(40))
+thread = Thread(ops)
+workload = Workload(context, thread)
+workload.run(conn)
+show(tname)
+
+thread = Thread(ops * 5)
+workload = Workload(context, thread)
+workload.run(conn)
+show(tname)
diff --git a/bench/workgen/runner/example_txn.py b/bench/workgen/runner/example_txn.py
new file mode 100644
index 00000000000..ef1d7a93941
--- /dev/null
+++ b/bench/workgen/runner/example_txn.py
@@ -0,0 +1,29 @@
+#!/usr/bin/python
+from runner import *
+from wiredtiger import *
+from workgen import *
+
+conn = wiredtiger_open("WT_TEST", "create,cache_size=500MB")
+s = conn.open_session()
+tname = "table:test"
+s.create(tname, 'key_format=S,value_format=S')
+table = Table(tname)
+table.options.key_size = 20
+table.options.value_size = 100
+
+context = Context()
+op = Operation(Operation.OP_INSERT, table)
+thread = Thread(op * 500000)
+pop_workload = Workload(context, thread)
+print('populate:')
+pop_workload.run(conn)
+
+opread = Operation(Operation.OP_SEARCH, table)
+opwrite = Operation(Operation.OP_INSERT, table)
+treader = Thread(opread)
+twriter = Thread(txn(opwrite * 2))
+workload = Workload(context, treader * 8 + twriter * 2)
+workload.options.run_time = 10
+workload.options.report_interval = 5
+print('transactional write workload:')
+workload.run(conn)
diff --git a/bench/workgen/runner/insert_test.py b/bench/workgen/runner/insert_test.py
new file mode 100644
index 00000000000..30f2818e91e
--- /dev/null
+++ b/bench/workgen/runner/insert_test.py
@@ -0,0 +1,94 @@
+#!/usr/bin/python
+from runner import *
+from wiredtiger import *
+from workgen import *
+
+def tablename(id):
+ return "table:test%06d" % id
+
+def show(tname):
+ print('')
+ print('<><><><> ' + tname + ' <><><><>')
+ c = s.open_cursor(tname, None)
+ for k,v in c:
+ print('key: ' + k)
+ print('value: ' + v)
+ print('<><><><><><><><><><><><>')
+ c.close()
+
+def expectException(expr):
+ gotit = False
+ try:
+ expr()
+ except BaseException as e:
+ print('got expected exception: ' + str(e))
+ gotit = True
+ if not gotit:
+ raise Exception("missing expected exception")
+
+context = Context()
+conn = wiredtiger_open("WT_TEST", "create,cache_size=1G")
+s = conn.open_session()
+tname0 = tablename(0)
+tname1 = tablename(1)
+s.create(tname0, 'key_format=S,value_format=S')
+s.create(tname1, 'key_format=S,value_format=S')
+
+ops = Operation(Operation.OP_INSERT, Table(tname0), Key(Key.KEYGEN_APPEND, 10), Value(100))
+workload = Workload(context, Thread(ops))
+
+print('RUN1')
+workload.run(conn)
+show(tname0)
+
+# The context has memory of how many keys are in all the tables.
+# truncate goes behind context's back, but it doesn't matter for
+# an insert-only test.
+s.truncate(tname0, None, None)
+
+# Show how to 'multiply' operations
+op = Operation(Operation.OP_INSERT, Table(tname0), Key(Key.KEYGEN_APPEND, 10), Value(100))
+op2 = Operation(Operation.OP_INSERT, Table(tname1), Key(Key.KEYGEN_APPEND, 20), Value(30))
+o = op2 * 10
+print 'op is: ' + str(op)
+print 'multiplying op is: ' + str(o)
+thread0 = Thread(o + op + op)
+workload = Workload(context, thread0)
+print('RUN2')
+workload.run(conn)
+show(tname0)
+show(tname1)
+
+s.truncate(tname0, None, None)
+s.truncate(tname1, None, None)
+
+# operations can be multiplied, added in any combination.
+op += Operation(Operation.OP_INSERT, Table(tname0), Key(Key.KEYGEN_APPEND, 10), Value(10))
+op *= 2
+op += Operation(Operation.OP_INSERT, Table(tname0), Key(Key.KEYGEN_APPEND, 10), Value(10))
+thread0 = Thread(op * 10 + op2 * 20)
+workload = Workload(context, thread0)
+print('RUN3')
+workload.run(conn)
+show(tname0)
+show(tname1)
+
+print('workload is ' + str(workload))
+print('thread0 is ' + str(thread0))
+
+def assignit(k, n):
+ k._size = n
+
+expectException(lambda: Operation(
+ Operation.OP_INSERT, Table('foo'), Key(Key.KEYGEN_APPEND, 10)))
+# we don't catch this exception here, but in Workload.run()
+k = Key(Key.KEYGEN_APPEND, 1)
+assignit(k, 30)
+assignit(k, 1) # we don't catch this exception here, but in Workload.run()
+op = Operation(Operation.OP_INSERT, Table(tname0), k, Value(10))
+workload = Workload(context, Thread(op))
+print('RUN4')
+expectException(lambda: workload.run(conn))
+
+print('HELP:')
+print(workload.options.help())
diff --git a/bench/workgen/runner/multi_btree_heavy_stress.py b/bench/workgen/runner/multi_btree_heavy_stress.py
new file mode 100644
index 00000000000..0993f60248d
--- /dev/null
+++ b/bench/workgen/runner/multi_btree_heavy_stress.py
@@ -0,0 +1,102 @@
+#!/usr/bin/python
+# Drive a constant high workload through, even if WiredTiger isn't keeping
+# up by dividing the workload across a lot of threads. This needs to be
+# tuned to the particular machine so the workload is close to capacity in the
+# steady state, but not overwhelming.
+#
+################
+# Note: as a proof of concept for workgen, this matches closely
+# bench/wtperf/runner/multi-btree-read-heavy-stress.wtperf .
+# Run time, #ops, #threads are ratcheted way down for testing.
+#
+from runner import *
+from wiredtiger import *
+from workgen import *
+
+def op_append(ops, op):
+ if ops == None:
+ ops = op
+ else:
+ ops += op
+ return ops
+
+def make_op(optype, table, key, value = None):
+ if value == None:
+ return Operation(optype, table, key)
+ else:
+ return Operation(optype, table, key, value)
+
+logkey = Key(Key.KEYGEN_APPEND, 8) ## should be 8 bytes format 'Q'
+def operations(optype, tables, key, value = None, ops_per_txn = 0, logtable = None):
+ txn_list = []
+ ops = None
+ nops = 0
+ for table in tables:
+ ops = op_append(ops, make_op(optype, table, key, value))
+ if logtable != None:
+ ops = op_append(ops, make_op(optype, logtable, logkey, value))
+ nops += 1
+ if ops_per_txn > 0 and nops % ops_per_txn == 0:
+ txn_list.append(txn(ops))
+ ops = None
+ if ops_per_txn > 0:
+ if ops != None:
+ txn_list.append(txn(ops))
+ ops = None
+ for t in txn_list:
+ ops = op_append(ops, t)
+ return ops
+
+context = Context()
+## cache_size=20GB
+conn_config="create,cache_size=1GB,session_max=1000,eviction=(threads_min=4,threads_max=8),log=(enabled=false),transaction_sync=(enabled=false),checkpoint_sync=true,checkpoint=(wait=60),statistics=(fast),statistics_log=(json,wait=1)"
+table_config="allocation_size=4k,memory_page_max=10MB,prefix_compression=false,split_pct=90,leaf_page_max=32k,internal_page_max=16k,type=file,block_compressor=snappy"
+conn_config += extensions_config(['compressors/snappy'])
+conn = wiredtiger_open("WT_TEST", conn_config)
+s = conn.open_session()
+
+tables = []
+for i in range(0, 8):
+ tname = "table:test" + str(i)
+ s.create(tname, 'key_format=S,value_format=S,' + table_config)
+ tables.append(Table(tname))
+tname = "table:log"
+# TODO: use table_config for the log file?
+s.create(tname, 'key_format=S,value_format=S,' + table_config)
+logtable = Table(tname)
+
+##icount=200000000 / 8
+icount=20000
+ins_ops = operations(Operation.OP_INSERT, tables, Key(Key.KEYGEN_APPEND, 20), Value(500))
+thread = Thread(ins_ops * icount)
+pop_workload = Workload(context, thread)
+print('populate:')
+pop_workload.run(conn)
+
+ins_ops = operations(Operation.OP_INSERT, tables, Key(Key.KEYGEN_APPEND, 20), Value(500), 0, logtable)
+upd_ops = operations(Operation.OP_UPDATE, tables, Key(Key.KEYGEN_UNIFORM, 20), Value(500), 0, logtable)
+read_ops = operations(Operation.OP_SEARCH, tables, Key(Key.KEYGEN_UNIFORM, 20), None, 3)
+
+ins_thread = Thread(ins_ops)
+upd_thread = Thread(upd_ops)
+read_thread = Thread(read_ops)
+ins_thread.options.throttle = 250
+ins_thread.options.name = "Insert"
+upd_thread.options.throttle = 250
+upd_thread.options.name = "Update"
+read_thread.options.throttle = 1000
+read_thread.options.name = "Read"
+##threads = [ins_thread] * 10 + [upd_thread] * 10 + [read_thread] * 80
+threads = ins_thread * 1 + upd_thread * 1 + read_thread * 2
+workload = Workload(context, threads)
+##workload.options.run_time = 3600
+workload.options.run_time = 30
+workload.options.report_interval = 1
+workload.options.sample_interval = 5
+workload.options.sample_rate = 1
+print('heavy stress workload:')
+workload.run(conn)
+
+latency_filename = conn.get_home() + '/latency.out'
+print('for latency output, see: ' + latency_filename)
+latency.workload_latency(workload, latency_filename)
diff --git a/bench/workgen/runner/runner/__init__.py b/bench/workgen/runner/runner/__init__.py
new file mode 100644
index 00000000000..67b547bc51b
--- /dev/null
+++ b/bench/workgen/runner/runner/__init__.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2017 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+# runner/__init__.py
+# Used as a first import by runners, does any common initialization.
+from __future__ import print_function
+
+import os, shutil, sys
+thisdir = os.path.dirname(os.path.abspath(__file__))
+workgen_src = os.path.dirname(os.path.dirname(thisdir))
+wt_dir = os.path.dirname(os.path.dirname(workgen_src))
+wt_builddir = os.path.join(wt_dir, 'build_posix')
+
+def _prepend_env_path(pathvar, s):
+ last = ''
+ try:
+ last = ':' + os.environ[pathvar]
+ except:
+ pass
+ os.environ[pathvar] = s + last
+
+# Initialize the python path so needed modules can be imported.
+# If the path already works, don't change it.
+try:
+ import wiredtiger
+except:
+ # We'll try hard to make the importing work, we'd like to runners
+ # to be executable directly without having to set environment variables.
+ sys.path.insert(0, os.path.join(wt_dir, 'lang', 'python'))
+ sys.path.insert(0, os.path.join(wt_builddir, 'lang', 'python'))
+ try:
+ import wiredtiger
+ except:
+ # If the .libs directory is not in our library search path,
+ # we need to set it and retry. However, the dynamic link
+ # library has already cached its value, our only option is
+ # to restart the Python interpreter.
+ if '_workgen_init' not in os.environ:
+ os.environ['_workgen_init'] = 'true'
+ dotlibs = os.path.join(wt_builddir, '.libs')
+ _prepend_env_path('LD_LIBRARY_PATH', dotlibs)
+ _prepend_env_path('DYLD_LIBRARY_PATH', dotlibs)
+ py_args = sys.argv
+ py_args.insert(0, sys.executable)
+ try:
+ os.execv(sys.executable, py_args)
+ except Exception, exception:
+ print('re-exec failed: ' + str(exception), file=sys.stderr)
+ print(' exec(' + sys.executable + ', ' + str(py_args) + ')')
+ print('Try adding "' + dotlibs + '" to the', file=sys.stderr)
+ print('LD_LIBRARY_PATH environment variable before running ' + \
+ 'this program again.', file=sys.stderr)
+ sys.exit(1)
+
+try:
+ import workgen
+except:
+ sys.path.insert(0, os.path.join(workgen_src, 'workgen'))
+ sys.path.insert(0, os.path.join(wt_builddir, 'bench', 'workgen'))
+ import workgen
+
+# Clear out the WT_TEST directory.
+shutil.rmtree('WT_TEST', True)
+os.mkdir('WT_TEST')
+
+from .core import txn, extensions_config
+from .latency import workload_latency
diff --git a/bench/workgen/runner/runner/core.py b/bench/workgen/runner/runner/core.py
new file mode 100644
index 00000000000..a0f0d4d77cd
--- /dev/null
+++ b/bench/workgen/runner/runner/core.py
@@ -0,0 +1,101 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2017 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+# runner/core.py
+# Core functions available to all runners
+import glob, os
+import workgen
+
+# txn --
+# Put the operation (and any suboperations) within a transaction.
+def txn(op, config=None):
+ t = workgen.Transaction(config)
+ op._transaction = t
+ return op
+
+# Check for a local build that contains the wt utility. First check in
+# current working directory, then in build_posix and finally in the disttop
+# directory. This isn't ideal - if a user has multiple builds in a tree we
+# could pick the wrong one.
+def _wiredtiger_builddir():
+ if os.path.isfile(os.path.join(os.getcwd(), 'wt')):
+ return os.getcwd()
+
+ # The directory of this file should be within the distribution tree.
+ thisdir = os.path.dirname(os.path.abspath(__file__))
+ wt_disttop = os.path.join(\
+ thisdir, os.pardir, os.pardir, os.pardir, os.pardir)
+ if os.path.isfile(os.path.join(wt_disttop, 'wt')):
+ return wt_disttop
+ if os.path.isfile(os.path.join(wt_disttop, 'build_posix', 'wt')):
+ return os.path.join(wt_disttop, 'build_posix')
+ if os.path.isfile(os.path.join(wt_disttop, 'wt.exe')):
+ return wt_disttop
+ raise Exception('Unable to find useable WiredTiger build')
+
+# Return the wiredtiger_open extension argument for any needed shared library.
+# Called with a list of extensions, e.g.
+# [ 'compressors/snappy', 'encryptors/rotn=config_string' ]
+def extensions_config(exts):
+ result = ''
+ extfiles = {}
+ errpfx = 'extensions_config'
+ builddir = _wiredtiger_builddir()
+ for ext in exts:
+ extconf = ''
+ if '=' in ext:
+ splits = ext.split('=', 1)
+ ext = splits[0]
+ extconf = '=' + splits[1]
+ splits = ext.split('/')
+ if len(splits) != 2:
+ raise Exception(errpfx + ": " + ext +
+ ": extension is not named <dir>/<name>")
+ libname = splits[1]
+ dirname = splits[0]
+ pat = os.path.join(builddir, 'ext',
+ dirname, libname, '.libs', 'libwiredtiger_*.so')
+ filenames = glob.glob(pat)
+ if len(filenames) == 0:
+ raise Exception(errpfx +
+ ": " + ext +
+ ": no extensions library found matching: " + pat)
+ elif len(filenames) > 1:
+ raise Exception(errpfx + ": " + ext +
+ ": multiple extensions libraries found matching: " + pat)
+ complete = '"' + filenames[0] + '"' + extconf
+ if ext in extfiles:
+ if extfiles[ext] != complete:
+ raise Exception(errpfx +
+ ": non-matching extension arguments in " +
+ str(exts))
+ else:
+ extfiles[ext] = complete
+ if len(extfiles) != 0:
+ result = ',extensions=[' + ','.join(extfiles.values()) + ']'
+ return result
diff --git a/bench/workgen/runner/runner/latency.py b/bench/workgen/runner/runner/latency.py
new file mode 100644
index 00000000000..46d9be9bad8
--- /dev/null
+++ b/bench/workgen/runner/runner/latency.py
@@ -0,0 +1,122 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2016 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+# runner/latency.py
+# Utility functions for showing latency statistics
+from __future__ import print_function
+import sys
+
+def _show_buckets(fh, title, mult, buckets, n):
+ shown = False
+ s = title + ': '
+ for count in range(0, n):
+ val = buckets[count]
+ if val != 0:
+ if shown:
+ s += ','
+ s += str(count*mult) + '=' + str(val)
+ shown = True
+ print(s, file=fh)
+
+def _latency_preprocess(arr, merge):
+ mx = 0
+ cur = 0
+ # SWIG arrays have a clunky interface
+ for i in range(0, arr.__len__()):
+ if i % merge == 0:
+ cur = 0
+ cur += arr[i]
+ if cur > mx:
+ mx = cur
+ arr.height = mx
+
+def _latency_plot(box, ch, left, width, arr, merge, scale):
+ pos = 0
+ for x in range(0, width):
+ t = 0
+ for i in range(0, merge):
+ t += arr[pos]
+ pos += 1
+ nch = scale * t
+ y = 0
+ while nch > 0.0:
+ box[y][left + x] = ch
+ nch -= 1.0
+ y += 1
+
+def _latency_optype(fh, name, ch, t):
+ if t.ops == 0:
+ return
+ if t.latency_ops == 0:
+ print('**** ' + name + ' operations: ' + str(t.ops), file=fh)
+ return
+ print('**** ' + name + ' operations: ' + str(t.ops) + \
+ ', latency operations: ' + str(t.latency_ops), file=fh)
+ print(' avg: ' + str(t.latency/t.latency_ops) + \
+ ', min: ' + str(t.min_latency) + ', max: ' + str(t.max_latency),
+ file=fh)
+ us = t.us()
+ ms = t.ms()
+ sec = t.sec()
+ _latency_preprocess(us, 40)
+ _latency_preprocess(ms, 40)
+ _latency_preprocess(sec, 4)
+ max_height = max(us.height, ms.height, sec.height)
+ if max_height == 0:
+ return
+ height = 20 # 20 chars high
+ # a list of a list of characters
+ box = [list(' ' * 80) for x in range(height)]
+ scale = (1.0 / (max_height + 1)) * height
+ _latency_plot(box, ch, 0, 25, us, 40, scale)
+ _latency_plot(box, ch, 27, 25, ms, 40, scale)
+ _latency_plot(box, ch, 54, 25, sec, 4, scale)
+ box.reverse()
+ for line in box:
+ print(''.join(line), file=fh)
+ dash25 = '-' * 25
+ print(' '.join([dash25] * 3), file=fh)
+ print(' 0 - 999 us (40/bucket) 1 - 999 ms (40/bucket) ' + \
+ '1 - 99 sec (4/bucket)', file=fh)
+ print('', file=fh)
+ _show_buckets(fh, name + ' us', 1, us, 1000)
+ _show_buckets(fh, name + ' ms', 1000, ms, 1000)
+ _show_buckets(fh, name + ' sec', 1000000, sec, 100)
+ print('', file=fh)
+
+def workload_latency(workload, outfilename = None):
+ if outfilename:
+ fh = open(outfilename, 'w')
+ else:
+ fh = sys.stdout
+ _latency_optype(fh, 'insert', 'I', workload.stats.insert)
+ _latency_optype(fh, 'read', 'R', workload.stats.read)
+ _latency_optype(fh, 'remove', 'X', workload.stats.remove)
+ _latency_optype(fh, 'update', 'U', workload.stats.update)
+ _latency_optype(fh, 'truncate', 'T', workload.stats.truncate)
+ _latency_optype(fh, 'not found', 'N', workload.stats.not_found)
diff --git a/bench/workgen/runner/small_btree.py b/bench/workgen/runner/small_btree.py
new file mode 100644
index 00000000000..d70f0d9e693
--- /dev/null
+++ b/bench/workgen/runner/small_btree.py
@@ -0,0 +1,27 @@
+#!/usr/bin/python
+from runner import *
+from wiredtiger import *
+from workgen import *
+
+context = Context()
+conn = wiredtiger_open("WT_TEST", "create,cache_size=500MB")
+s = conn.open_session()
+tname = "file:test.wt"
+s.create(tname, 'key_format=S,value_format=S')
+table = Table(tname)
+table.options.key_size = 20
+table.options.value_size = 100
+
+op = Operation(Operation.OP_INSERT, table)
+thread = Thread(op * 500000)
+pop_workload = Workload(context, thread)
+print('populate:')
+pop_workload.run(conn)
+
+op = Operation(Operation.OP_SEARCH, table)
+t = Thread(op)
+workload = Workload(context, t * 8)
+workload.options.run_time = 120
+workload.options.report_interval = 5
+print('read workload:')
+workload.run(conn)
diff --git a/bench/workgen/setup.py b/bench/workgen/setup.py
new file mode 100644
index 00000000000..9fb5fa7b73a
--- /dev/null
+++ b/bench/workgen/setup.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2017 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+
+from __future__ import print_function
+import re, os, sys
+from distutils.core import setup, Extension
+
+# OS X hack: turn off the Universal binary support that is built into the
+# Python build machinery, just build for the default CPU architecture.
+if not 'ARCHFLAGS' in os.environ:
+ os.environ['ARCHFLAGS'] = ''
+
+# Suppress warnings building SWIG generated code
+extra_cflags = [ '-w', '-Wno-sign-conversion', '-I../../src/include', \
+ '-I../../test/utility']
+
+dir = os.path.dirname(__file__)
+abs_dir = os.path.dirname(os.path.abspath(__file__))
+
+if abs_dir.endswith(os.sep + os.path.join('bench', 'workgen')):
+ wt_dir = os.path.dirname(os.path.dirname(abs_dir))
+else:
+ print(os.path.basename(__file__) + ": running from unknown dir", \
+ file=sys.stderr)
+ sys.exit(1)
+
+build_dir = os.path.join(wt_dir, 'build_posix')
+
+# Read the version information from the RELEASE_INFO file
+for l in open(os.path.join(dir, '..', '..', 'RELEASE_INFO')):
+ if re.match(r'WIREDTIGER_VERSION_(?:MAJOR|MINOR|PATCH)=', l):
+ exec(l)
+
+wt_ver = '%d.%d' % (WIREDTIGER_VERSION_MAJOR, WIREDTIGER_VERSION_MINOR)
+
+setup(name='workgen', version=wt_ver,
+ ext_modules=[Extension('_workgen',
+ [os.path.join(dir, 'workgen_wrap.cxx')],
+ libraries=['wiredtiger', 'pthread', 'workgen'],
+ extra_compile_args=extra_cflags,
+ )],
+ package_dir={'' : dir},
+ packages=['workgen'],
+)
diff --git a/bench/workgen/workgen.cxx b/bench/workgen/workgen.cxx
new file mode 100644
index 00000000000..c56acfd2989
--- /dev/null
+++ b/bench/workgen/workgen.cxx
@@ -0,0 +1,1605 @@
+/*-
+ * Public Domain 2014-2017 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#define __STDC_LIMIT_MACROS // needed to get UINT64_MAX in C++
+#include <iomanip>
+#include <iostream>
+#include <fstream>
+#include <sstream>
+#include "wiredtiger.h"
+#include "workgen.h"
+#include "workgen_int.h"
+#include "workgen_time.h"
+extern "C" {
+// Include some specific WT files, as some files included by wt_internal.h
+// have some C-ism's that don't work in C++.
+#include <pthread.h>
+#include <string.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <math.h>
+#include "error.h"
+#include "misc.h"
+}
+
+#define LATENCY_US_BUCKETS 1000
+#define LATENCY_MS_BUCKETS 1000
+#define LATENCY_SEC_BUCKETS 100
+
+#define THROTTLE_PER_SEC 20 // times per sec we will throttle
+
+#define MIN(a, b) ((a) < (b) ? (a) : (b))
+#define MAX(a, b) ((a) < (b) ? (b) : (a))
+#define TIMESPEC_DOUBLE(ts) ((double)(ts).tv_sec + ts.tv_nsec * 0.000000001)
+#define PCT(n, total) ((total) == 0 ? 0 : ((n) * 100) / (total))
+#define OPS_PER_SEC(ops, ts) (int) ((ts) == 0 ? 0.0 : \
+ (ops) / TIMESPEC_DOUBLE(ts))
+
+// Get the value of a STL container, even if it is not present
+#define CONTAINER_VALUE(container, idx, dfault) \
+ (((container).count(idx) > 0) ? (container)[idx] : (dfault))
+
+#define CROSS_USAGE(a, b) \
+ (((a & USAGE_READ) != 0 && (b & USAGE_WRITE) != 0) || \
+ ((a & USAGE_WRITE) != 0 && (b & USAGE_READ) != 0))
+
+#define ASSERT(cond) \
+ do { \
+ if (!(cond)) { \
+ fprintf(stderr, "%s:%d: ASSERT failed: %s\n", \
+ __FILE__, __LINE__, #cond); \
+ abort(); \
+ } \
+ } while(0)
+
+#define THROW_ERRNO(e, args) \
+ do { \
+ std::stringstream __sstm; \
+ __sstm << args; \
+ WorkgenException __wge(e, __sstm.str().c_str()); \
+ throw(__wge); \
+ } while(0)
+
+#define THROW(args) THROW_ERRNO(0, args)
+
+#define VERBOSE(runner, args) \
+ do { \
+ if ((runner)._context->_verbose) \
+ std::cout << args << std::endl; \
+ } while(0)
+
+#define OP_HAS_VALUE(op) \
+ ((op)->_optype == Operation::OP_INSERT || \
+ (op)->_optype == Operation::OP_UPDATE)
+
+namespace workgen {
+
+// The number of contexts. Normally there is one context created, but it will
+// be possible to use several eventually. More than one is not yet
+// implemented, but we must at least guard against the caller creating more
+// than one.
+static uint32_t context_count = 0;
+
+static void *thread_runner_main(void *arg) {
+ ThreadRunner *runner = (ThreadRunner *)arg;
+ try {
+ runner->_errno = runner->run();
+ } catch (WorkgenException &wge) {
+ runner->_exception = wge;
+ }
+ return (NULL);
+}
+
+static void *monitor_main(void *arg) {
+ Monitor *monitor = (Monitor *)arg;
+ try {
+ monitor->_errno = monitor->run();
+ } catch (WorkgenException &wge) {
+ monitor->_exception = wge;
+ }
+ return (NULL);
+}
+
+// Exponentiate (like the pow function), except that it returns an exact
+// integral 64 bit value, and if it overflows, returns the maximum possible
+// value for the return type.
+static uint64_t power64(int base, int exp) {
+ uint64_t last, result;
+
+ result = 1;
+ for (int i = 0; i < exp; i++) {
+ last = result;
+ result *= base;
+ if (result < last)
+ return UINT64_MAX;
+ }
+ return result;
+}
+
+OptionsList::OptionsList() : _option_map() {}
+OptionsList::OptionsList(const OptionsList &other) :
+ _option_map(other._option_map) {}
+
+void OptionsList::add_option(const char *name, const std::string typestr,
+ const char *desc) {
+ TypeDescPair pair(typestr, desc);
+ _option_map[name] = pair;
+}
+
+void OptionsList::add_int(const char *name, int default_value,
+ const char *desc) {
+ std::stringstream sstm;
+ sstm << "int, default=" << default_value;
+ add_option(name, sstm.str(), desc);
+}
+
+void OptionsList::add_bool(const char *name, bool default_value,
+ const char *desc) {
+ std::stringstream sstm;
+ sstm << "boolean, default=" << (default_value ? "true" : "false");
+ add_option(name, sstm.str(), desc);
+}
+
+void OptionsList::add_double(const char *name, double default_value,
+ const char *desc) {
+ std::stringstream sstm;
+ sstm << "double, default=" << default_value;
+ add_option(name, sstm.str(), desc);
+}
+
+void OptionsList::add_string(const char *name,
+ const std::string &default_value, const char *desc) {
+ std::stringstream sstm;
+ sstm << "string, default=\"" << default_value << "\"";
+ add_option(name, sstm.str(), desc);
+}
+
+static void
+pretty_print(const char *p, const char *indent, std::stringstream &sstm)
+{
+ const char *t;
+
+ for (;; p = t + 1) {
+ if (strlen(p) <= 70)
+ break;
+ for (t = p + 70; t > p && *t != ' '; --t)
+ ;
+ if (t == p) /* No spaces? */
+ break;
+ if (indent != NULL)
+ sstm << indent;
+ std::string line(p, (size_t)(t - p));
+ sstm << line << std::endl;
+ }
+ if (*p != '\0') {
+ if (indent != NULL)
+ sstm << indent;
+ sstm << p << std::endl;
+ }
+}
+
+std::string OptionsList::help() const {
+ std::stringstream sstm;
+ for (std::map<std::string, TypeDescPair>::const_iterator i =
+ _option_map.begin(); i != _option_map.end(); i++) {
+ sstm << i->first << " (" << i->second.first << ")" << std::endl;
+ pretty_print(i->second.second.c_str(), "\t", sstm);
+ }
+ return sstm.str();
+}
+
+std::string OptionsList::help_description(const char *option_name) const {
+ const std::string key(option_name);
+ if (_option_map.count(key) == 0)
+ return (std::string(""));
+ else
+ return (_option_map.find(key)->second.second);
+}
+
+std::string OptionsList::help_type(const char *option_name) const {
+ const std::string key(option_name);
+ if (_option_map.count(key) == 0)
+ return std::string("");
+ else
+ return (_option_map.find(key)->second.first);
+}
+
+Context::Context() : _verbose(false), _internal(new ContextInternal()) {}
+Context::~Context() { delete _internal; }
+Context& Context::operator=(const Context &other) {
+ _verbose = other._verbose;
+ *_internal = *other._internal;
+ return (*this);
+}
+
+ContextInternal::ContextInternal() : _tint(), _table_names(),
+ _recno(NULL), _recno_alloced(0), _tint_last(0), _context_count(0) {
+ uint32_t count;
+ if ((count = workgen_atomic_add32(&context_count, 1)) != 1)
+ THROW("multiple Contexts not supported");
+ _context_count = count;
+}
+
+ContextInternal::~ContextInternal() {
+ if (_recno != NULL)
+ delete _recno;
+}
+
+int ContextInternal::create_all() {
+ if (_recno_alloced != _tint_last) {
+ // The array references are 1-based, we'll waste one entry.
+ uint64_t *new_recno = new uint64_t[_tint_last + 1];
+ memcpy(new_recno, _recno, sizeof(uint64_t) * _recno_alloced);
+ memset(&new_recno[_recno_alloced], 0,
+ sizeof(uint64_t) * (_tint_last - _recno_alloced + 1));
+ delete _recno;
+ _recno = new_recno;
+ _recno_alloced = _tint_last;
+ }
+ return (0);
+}
+
+Monitor::Monitor(WorkloadRunner &wrunner) :
+ _errno(0), _exception(), _wrunner(wrunner), _stop(false), _handle() {}
+Monitor::~Monitor() {}
+
+int Monitor::run() {
+ struct timespec t;
+ struct tm *tm, _tm;
+ char time_buf[64];
+ Stats prev_totals;
+ WorkloadOptions *options = &_wrunner._workload->options;
+ uint64_t latency_max = (uint64_t)options->max_latency;
+
+ (*_out) << "#time,"
+ << "totalsec,"
+ << "read ops per second,"
+ << "insert ops per second,"
+ << "update ops per second,"
+ << "checkpoints,"
+ << "read average latency(uS),"
+ << "read minimum latency(uS),"
+ << "read maximum latency(uS),"
+ << "insert average latency(uS),"
+ << "insert min latency(uS),"
+ << "insert maximum latency(uS),"
+ << "update average latency(uS),"
+ << "update min latency(uS),"
+ << "update maximum latency(uS)"
+ << std::endl;
+
+ Stats prev_interval;
+ while (!_stop) {
+ for (int i = 0; i < options->sample_interval && !_stop; i++)
+ sleep(1);
+ if (_stop)
+ break;
+
+ workgen_epoch(&t);
+ tm = localtime_r(&t.tv_sec, &_tm);
+ (void)strftime(time_buf, sizeof(time_buf), "%b %d %H:%M:%S", tm);
+
+ Stats new_totals(true);
+ for (std::vector<ThreadRunner>::iterator tr =
+ _wrunner._trunners.begin(); tr != _wrunner._trunners.end(); tr++)
+ new_totals.add(tr->_stats, true);
+ Stats interval(new_totals);
+ interval.subtract(prev_totals);
+ interval.smooth(prev_interval);
+
+ int interval_secs = options->sample_interval;
+ uint64_t cur_reads = interval.read.ops / interval_secs;
+ uint64_t cur_inserts = interval.insert.ops / interval_secs;
+ uint64_t cur_updates = interval.update.ops / interval_secs;
+
+ uint64_t totalsec = ts_sec(t - _wrunner._start);
+ (*_out) << time_buf
+ << "," << totalsec
+ << "," << cur_reads
+ << "," << cur_inserts
+ << "," << cur_updates
+ << "," << 'N' // checkpoint in progress
+ << "," << interval.read.average_latency()
+ << "," << interval.read.min_latency
+ << "," << interval.read.max_latency
+ << "," << interval.insert.average_latency()
+ << "," << interval.insert.min_latency
+ << "," << interval.insert.max_latency
+ << "," << interval.update.average_latency()
+ << "," << interval.update.min_latency
+ << "," << interval.update.max_latency
+ << std::endl;
+
+ uint64_t read_max = interval.read.max_latency;
+ uint64_t insert_max = interval.read.max_latency;
+ uint64_t update_max = interval.read.max_latency;
+
+ if (latency_max != 0 &&
+ (read_max > latency_max || insert_max > latency_max ||
+ update_max > latency_max)) {
+ std::cerr << "WARNING: max latency exceeded:"
+ << " threshold " << latency_max
+ << " read max " << read_max
+ << " insert max " << insert_max
+ << " update max " << update_max << std::endl;
+ }
+
+ prev_interval.assign(interval);
+ prev_totals.assign(new_totals);
+ }
+ return (0);
+}
+
+ThreadRunner::ThreadRunner() :
+ _errno(0), _exception(), _thread(NULL), _context(NULL), _icontext(NULL),
+ _workload(NULL), _wrunner(NULL), _rand_state(NULL),
+ _throttle(NULL), _throttle_ops(0), _throttle_limit(0),
+ _in_transaction(false), _number(0), _stats(false), _table_usage(),
+ _cursors(NULL), _stop(false), _session(NULL), _keybuf(NULL),
+ _valuebuf(NULL), _repeat(false) {
+}
+
+ThreadRunner::~ThreadRunner() {
+ free_all();
+}
+
+int ThreadRunner::create_all(WT_CONNECTION *conn) {
+ size_t keysize, valuesize;
+
+ WT_RET(close_all());
+ ASSERT(_session == NULL);
+ WT_RET(conn->open_session(conn, NULL, NULL, &_session));
+ _table_usage.clear();
+ _stats.track_latency(_workload->options.sample_interval > 0);
+ WT_RET(workgen_random_alloc(_session, &_rand_state));
+ _throttle_ops = 0;
+ _throttle_limit = 0;
+ _in_transaction = 0;
+ keysize = 1;
+ valuesize = 1;
+ op_create_all(&_thread->_op, keysize, valuesize);
+ _keybuf = new char[keysize];
+ _valuebuf = new char[valuesize];
+ _keybuf[keysize - 1] = '\0';
+ _valuebuf[valuesize - 1] = '\0';
+ return (0);
+}
+
+int ThreadRunner::open_all() {
+ typedef WT_CURSOR *WT_CURSOR_PTR;
+ if (_cursors != NULL)
+ delete _cursors;
+ _cursors = new WT_CURSOR_PTR[_icontext->_tint_last + 1];
+ memset(_cursors, 0, sizeof (WT_CURSOR *) * (_icontext->_tint_last + 1));
+ for (std::map<uint32_t, uint32_t>::iterator i = _table_usage.begin();
+ i != _table_usage.end(); i++) {
+ uint32_t tindex = i->first;
+ const char *uri = _icontext->_table_names[tindex].c_str();
+ WT_RET(_session->open_cursor(_session, uri, NULL, NULL,
+ &_cursors[tindex]));
+ }
+ return (0);
+}
+
+int ThreadRunner::close_all() {
+ if (_throttle != NULL) {
+ delete _throttle;
+ _throttle = NULL;
+ }
+ if (_session != NULL) {
+ WT_RET(_session->close(_session, NULL));
+ _session = NULL;
+ }
+ free_all();
+ return (0);
+}
+
+void ThreadRunner::free_all() {
+ if (_rand_state != NULL) {
+ workgen_random_free(_rand_state);
+ _rand_state = NULL;
+ }
+ if (_cursors != NULL) {
+ delete _cursors;
+ _cursors = NULL;
+ }
+ if (_keybuf != NULL) {
+ delete _keybuf;
+ _keybuf = NULL;
+ }
+ if (_valuebuf != NULL) {
+ delete _valuebuf;
+ _valuebuf = NULL;
+ }
+}
+
+int ThreadRunner::cross_check(std::vector<ThreadRunner> &runners) {
+ std::map<uint32_t, uint32_t> usage;
+
+ // Determine which tables have cross usage
+ for (std::vector<ThreadRunner>::iterator r = runners.begin();
+ r != runners.end(); r++) {
+ for (std::map<uint32_t, uint32_t>::iterator i = r->_table_usage.begin();
+ i != r->_table_usage.end(); i++) {
+ uint32_t tindex = i->first;
+ uint32_t thisusage = i->second;
+ uint32_t curusage = CONTAINER_VALUE(usage, tindex, 0);
+ if (CROSS_USAGE(curusage, thisusage))
+ curusage |= USAGE_MIXED;
+ usage[tindex] = curusage;
+ }
+ }
+ for (std::map<uint32_t, uint32_t>::iterator i = usage.begin();
+ i != usage.end(); i++) {
+ if ((i->second & USAGE_MIXED) != 0) {
+ for (std::vector<ThreadRunner>::iterator r = runners.begin();
+ r != runners.end(); r++) {
+ r->_table_usage[i->first] |= USAGE_MIXED;
+ }
+ }
+ }
+ return (0);
+}
+
+int ThreadRunner::run() {
+ WT_DECL_RET;
+ ThreadOptions *options = &_thread->options;
+ std::string name = options->name;
+
+ VERBOSE(*this, "thread " << name << " running");
+ if (options->throttle != 0) {
+ _throttle = new Throttle(*this, options->throttle,
+ options->throttle_burst);
+ }
+ for (int cnt = 0; !_stop && (_repeat || cnt < 1) && ret == 0; cnt++)
+ WT_ERR(op_run(&_thread->_op));
+
+err:
+#ifdef _DEBUG
+ {
+ std::string messages = this->get_debug();
+ if (!messages.empty())
+ std::cerr << "DEBUG (thread " << name << "): "
+ << messages << std::endl;
+ }
+#endif
+ if (ret != 0)
+ std::cerr << "thread " << name << " failed err=" << ret << std::endl;
+ VERBOSE(*this, "thread " << name << "finished");
+ return (ret);
+}
+
+void ThreadRunner::get_static_counts(Stats &stats) {
+ _thread->_op.get_static_counts(stats, 1);
+}
+
+void ThreadRunner::op_create_all(Operation *op, size_t &keysize,
+ size_t &valuesize) {
+ tint_t tint;
+
+ op->size_check();
+ if (op->_optype != Operation::OP_NONE) {
+ op->kv_compute_max(true);
+ if (OP_HAS_VALUE(op))
+ op->kv_compute_max(false);
+ op->kv_size_buffer(true, keysize);
+ op->kv_size_buffer(false, valuesize);
+
+ // Note: to support multiple contexts we'd need a generation
+ // count whenever we execute.
+ if (op->_table._internal->_context_count != 0 &&
+ op->_table._internal->_context_count != _icontext->_context_count)
+ THROW("multiple Contexts not supported");
+ if ((tint = op->_table._internal->_tint) == 0) {
+ std::string uri = op->_table._uri;
+
+ // We are single threaded in this function, so do not have
+ // to worry about locking.
+ if (_icontext->_tint.count(uri) == 0) {
+ // TODO: don't use atomic add, it's overkill.
+ tint = workgen_atomic_add32(&_icontext->_tint_last, 1);
+ _icontext->_tint[uri] = tint;
+ _icontext->_table_names[tint] = uri;
+ } else
+ tint = _icontext->_tint[uri];
+ op->_table._internal->_tint = tint;
+ }
+ uint32_t usage_flags = CONTAINER_VALUE(_table_usage,
+ op->_table._internal->_tint, 0);
+ if (op->_optype == Operation::OP_SEARCH)
+ usage_flags |= ThreadRunner::USAGE_READ;
+ else
+ usage_flags |= ThreadRunner::USAGE_WRITE;
+ _table_usage[op->_table._internal->_tint] = usage_flags;
+ }
+ if (op->_group != NULL)
+ for (std::vector<Operation>::iterator i = op->_group->begin();
+ i != op->_group->end(); i++)
+ op_create_all(&*i, keysize, valuesize);
+}
+
+uint64_t ThreadRunner::op_get_key_recno(Operation *op, tint_t tint) {
+ uint64_t recno_count;
+ uint32_t rand;
+
+ recno_count = _icontext->_recno[tint];
+ if (recno_count == 0)
+ // The file has no entries, returning 0 forces a WT_NOTFOUND return.
+ return (0);
+ rand = workgen_random(_rand_state);
+ return (rand % recno_count + 1); // recnos are one-based.
+}
+
+int ThreadRunner::op_run(Operation *op) {
+ Track *track;
+ tint_t tint = op->_table._internal->_tint;
+ WT_CURSOR *cursor = _cursors[tint];
+ WT_DECL_RET;
+ uint64_t recno;
+ bool measure_latency;
+
+ recno = 0;
+ track = NULL;
+ if (_throttle != NULL) {
+ if (_throttle_ops >= _throttle_limit && !_in_transaction) {
+ WT_ERR(_throttle->throttle(_throttle_ops,
+ &_throttle_limit));
+ _throttle_ops = 0;
+ }
+ if (op->_optype != Operation::OP_NONE)
+ ++_throttle_ops;
+ }
+
+ // A potential race: thread1 is inserting, and increments
+ // Context->_recno[] for fileX.wt. thread2 is doing one of
+ // remove/search/update and grabs the new value of Context->_recno[]
+ // for fileX.wt. thread2 randomly chooses the highest recno (which
+ // has not yet been inserted by thread1), and when it accesses
+ // the record will get WT_NOTFOUND. It should be somewhat rare
+ // (and most likely when the threads are first beginning). Any
+ // WT_NOTFOUND returns are allowed and get their own statistic bumped.
+ switch (op->_optype) {
+ case Operation::OP_INSERT:
+ track = &_stats.insert;
+ recno = workgen_atomic_add64(&_icontext->_recno[tint], 1);
+ break;
+ case Operation::OP_REMOVE:
+ track = &_stats.remove;
+ recno = op_get_key_recno(op, tint);
+ break;
+ case Operation::OP_SEARCH:
+ track = &_stats.read;
+ recno = op_get_key_recno(op, tint);
+ break;
+ case Operation::OP_UPDATE:
+ track = &_stats.update;
+ recno = op_get_key_recno(op, tint);
+ break;
+ case Operation::OP_NONE:
+ recno = 0;
+ break;
+ }
+
+ measure_latency = track != NULL && track->ops != 0 &&
+ track->track_latency() &&
+ (track->ops % _workload->options.sample_rate == 0);
+
+ timespec start;
+ if (measure_latency)
+ workgen_epoch(&start);
+
+ if (op->_transaction != NULL) {
+ if (_in_transaction)
+ THROW("nested transactions not supported");
+ _session->begin_transaction(_session,
+ op->_transaction->_begin_config.c_str());
+ _in_transaction = true;
+ }
+ if (op->_optype != Operation::OP_NONE) {
+ op->kv_gen(true, recno, _keybuf);
+ cursor->set_key(cursor, _keybuf);
+ if (OP_HAS_VALUE(op)) {
+ op->kv_gen(false, recno, _valuebuf);
+ cursor->set_value(cursor, _valuebuf);
+ }
+ switch (op->_optype) {
+ case Operation::OP_INSERT:
+ WT_ERR(cursor->insert(cursor));
+ break;
+ case Operation::OP_REMOVE:
+ WT_ERR_NOTFOUND_OK(cursor->remove(cursor));
+ break;
+ case Operation::OP_SEARCH:
+ ret = cursor->search(cursor);
+ break;
+ case Operation::OP_UPDATE:
+ WT_ERR_NOTFOUND_OK(cursor->update(cursor));
+ break;
+ default:
+ ASSERT(false);
+ }
+ if (ret != 0) {
+ track = &_stats.not_found;
+ ret = 0; // WT_NOTFOUND allowed.
+ }
+ cursor->reset(cursor);
+ }
+ if (measure_latency) {
+ timespec stop;
+ workgen_epoch(&stop);
+ track->incr_with_latency(ts_us(stop - start));
+ } else if (track != NULL)
+ track->incr();
+
+ if (op->_group != NULL)
+ for (int count = 0; !_stop && count < op->_repeatgroup; count++)
+ for (std::vector<Operation>::iterator i = op->_group->begin();
+ i != op->_group->end(); i++)
+ WT_ERR(op_run(&*i));
+err:
+ if (op->_transaction != NULL) {
+ if (ret != 0 || op->_transaction->_rollback)
+ WT_TRET(_session->rollback_transaction(_session, NULL));
+ else
+ ret = _session->commit_transaction(_session,
+ op->_transaction->_commit_config.c_str());
+ _in_transaction = false;
+ }
+ return (ret);
+}
+
+#ifdef _DEBUG
+std::string ThreadRunner::get_debug() {
+ return (_debug_messages.str());
+}
+#endif
+
+Throttle::Throttle(ThreadRunner &runner, double throttle,
+ double throttle_burst) : _runner(runner), _throttle(throttle),
+ _burst(throttle_burst), _next_div(), _ops_delta(0), _ops_prev(0),
+ _ops_per_div(0), _ms_per_div(0), _started(false) {
+ ts_clear(_next_div);
+ _ms_per_div = ceill(1000.0 / THROTTLE_PER_SEC);
+ _ops_per_div = ceill(_throttle / THROTTLE_PER_SEC);
+}
+
+Throttle::~Throttle() {}
+
+// Given a random 32-bit value, return a float value equally distributed
+// between -1.0 and 1.0.
+static float rand_signed(uint32_t r) {
+ int sign = ((r & 0x1) == 0 ? 1 : -1);
+ return (((float)r * sign) / UINT32_MAX);
+}
+
+// Each time throttle is called, we sleep and return a number of operations to
+// perform next. To implement this we keep a time calculation in _next_div set
+// initially to the current time + 1/THROTTLE_PER_SEC. Each call to throttle
+// advances _next_div by 1/THROTTLE_PER_SEC, and if _next_div is in the future,
+// we sleep for the difference between the _next_div and the current_time. We
+// always return (Thread.options.throttle / THROTTLE_PER_SEC) as the number of
+// operations.
+//
+// The only variation is that the amount of individual sleeps is modified by a
+// random amount (which varies more widely as Thread.options.throttle_burst is
+// greater). This has the effect of randomizing how much clumping happens, and
+// ensures that multiple threads aren't executing in lock step.
+//
+int Throttle::throttle(uint64_t op_count, uint64_t *op_limit) {
+ uint64_t ops;
+ int64_t sleep_ms;
+ timespec now;
+
+ workgen_epoch(&now);
+ DEBUG_CAPTURE(_runner, "throttle: ops=" << op_count);
+ if (!_started) {
+ _next_div = ts_add_ms(now, _ms_per_div);
+ _started = true;
+ } else {
+ _ops_delta += (op_count - _ops_prev);
+ if (now < _next_div) {
+ sleep_ms = ts_ms(_next_div - now);
+ sleep_ms += (_ms_per_div * _burst *
+ rand_signed(workgen_random(_runner._rand_state)));
+ if (sleep_ms > 0) {
+ DEBUG_CAPTURE(_runner, ", sleep=" << sleep_ms);
+ usleep((useconds_t)ms_to_us(sleep_ms));
+ }
+ }
+ _next_div = ts_add_ms(_next_div, _ms_per_div);
+ }
+ ops = _ops_per_div;
+ if (_ops_delta < (int64_t)ops) {
+ ops -= _ops_delta;
+ _ops_delta = 0;
+ } else {
+ _ops_delta -= ops;
+ ops = 0;
+ }
+ *op_limit = ops;
+ _ops_prev = ops;
+ DEBUG_CAPTURE(_runner, ", return=" << ops << std::endl);
+ return (0);
+}
+
+ThreadOptions::ThreadOptions() : name(), throttle(0.0), throttle_burst(1.0),
+ _options() {
+ _options.add_string("name", name, "name of the thread");
+ _options.add_double("throttle", throttle,
+ "Limit to this number of operations per second");
+ _options.add_double("throttle_burst", throttle_burst,
+ "Changes characteristic of throttling from smooth (0.0) "
+ "to having large bursts with lulls (10.0 or larger)");
+}
+ThreadOptions::ThreadOptions(const ThreadOptions &other) :
+ name(other.name), throttle(other.throttle),
+ throttle_burst(other.throttle_burst), _options(other._options) {}
+ThreadOptions::~ThreadOptions() {}
+
+void
+ThreadListWrapper::extend(const ThreadListWrapper &other) {
+ for (std::vector<Thread>::const_iterator i = other._threads.begin();
+ i != other._threads.end(); i++)
+ _threads.push_back(*i);
+}
+
+void
+ThreadListWrapper::append(const Thread &t) {
+ _threads.push_back(t);
+}
+
+void
+ThreadListWrapper::multiply(const int n) {
+ if (n == 0) {
+ _threads.clear();
+ } else {
+ std::vector<Thread> copy(_threads);
+ for (int cnt = 1; cnt < n; cnt++)
+ extend(copy);
+ }
+}
+
+Thread::Thread() : options(), _op() {
+}
+
+Thread::Thread(const Operation &op) : options(), _op(op) {
+}
+
+Thread::Thread(const Thread &other) : options(other.options), _op(other._op) {
+}
+
+Thread::~Thread() {
+}
+
+void Thread::describe(std::ostream &os) const {
+ os << "Thread: [" << std::endl;
+ _op.describe(os); os << std::endl;
+ os << "]";
+}
+
+Operation::Operation() :
+ _optype(OP_NONE), _table(), _key(), _value(), _transaction(NULL),
+ _group(NULL), _repeatgroup(0),
+ _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+}
+
+Operation::Operation(OpType optype, Table table, Key key, Value value) :
+ _optype(optype), _table(table), _key(key), _value(value),
+ _transaction(NULL), _group(NULL), _repeatgroup(0),
+ _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ size_check();
+}
+
+Operation::Operation(OpType optype, Table table, Key key) :
+ _optype(optype), _table(table), _key(key), _value(), _transaction(NULL),
+ _group(NULL), _repeatgroup(0),
+ _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ size_check();
+}
+
+Operation::Operation(OpType optype, Table table) :
+ _optype(optype), _table(table), _key(), _value(), _transaction(NULL),
+ _group(NULL), _repeatgroup(0),
+ _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ size_check();
+}
+
+Operation::Operation(const Operation &other) :
+ _optype(other._optype), _table(other._table), _key(other._key),
+ _value(other._value), _transaction(other._transaction),
+ _group(other._group), _repeatgroup(other._repeatgroup),
+ _keysize(other._keysize), _valuesize(other._valuesize),
+ _keymax(other._keymax), _valuemax(other._valuemax) {
+ // Creation and destruction of _group and _transaction is managed
+ // by Python.
+}
+
+Operation::~Operation() {
+ // Creation and destruction of _group, _transaction is managed by Python.
+}
+
+Operation& Operation::operator=(const Operation &other) {
+ _optype = other._optype;
+ _table = other._table;
+ _key = other._key;
+ _value = other._value;
+ _transaction = other._transaction;
+ _group = other._group;
+ _repeatgroup = other._repeatgroup;
+ _keysize = other._keysize;
+ _valuesize = other._valuesize;
+ _keymax = other._keymax;
+ _valuemax = other._valuemax;
+ return (*this);
+}
+
+void Operation::describe(std::ostream &os) const {
+ os << "Operation: " << _optype;
+ if (_optype != OP_NONE) {
+ os << ", "; _table.describe(os);
+ os << ", "; _key.describe(os);
+ os << ", "; _value.describe(os);
+ }
+ if (_transaction != NULL) {
+ os << ", ["; _transaction->describe(os); os << "]";
+ }
+ if (_group != NULL) {
+ os << ", group[" << _repeatgroup << "]: {";
+ bool first = true;
+ for (std::vector<Operation>::const_iterator i = _group->begin();
+ i != _group->end(); i++) {
+ if (!first)
+ os << "}, {";
+ i->describe(os);
+ first = false;
+ }
+ os << "}";
+ }
+}
+
+void Operation::get_static_counts(Stats &stats, int multiplier) {
+ switch (_optype) {
+ case OP_NONE:
+ break;
+ case OP_INSERT:
+ stats.insert.ops += multiplier;
+ break;
+ case OP_REMOVE:
+ stats.remove.ops += multiplier;
+ break;
+ case OP_SEARCH:
+ stats.read.ops += multiplier;
+ break;
+ case OP_UPDATE:
+ stats.update.ops += multiplier;
+ break;
+ default:
+ ASSERT(false);
+ }
+ if (_group != NULL)
+ for (std::vector<Operation>::iterator i = _group->begin();
+ i != _group->end(); i++)
+ i->get_static_counts(stats, multiplier * _repeatgroup);
+}
+
+void Operation::kv_compute_max(bool iskey) {
+ uint64_t max;
+ int size;
+
+ size = iskey ? _key._size : _value._size;
+ if (size == 0)
+ size = iskey ? _table.options.key_size : _table.options.value_size;
+
+ if (iskey && size < 2)
+ THROW("Key.size too small for table '" << _table._uri << "'");
+ if (!iskey && size < 1)
+ THROW("Value.size too small for table '" << _table._uri << "'");
+
+ if (size > 1)
+ max = power64(10, (size - 1)) - 1;
+ else
+ max = 0;
+
+ if (iskey) {
+ _keysize = size;
+ _keymax = max;
+ } else {
+ _valuesize = size;
+ _valuemax = max;
+ }
+}
+
+void Operation::kv_size_buffer(bool iskey, size_t &maxsize) const {
+ if (iskey) {
+ if ((size_t)_keysize > maxsize)
+ maxsize = _keysize;
+ } else {
+ if ((size_t)_valuesize > maxsize)
+ maxsize = _valuesize;
+ }
+}
+
+void Operation::kv_gen(bool iskey, uint64_t n, char *result) const {
+ uint64_t max;
+ int size;
+
+ size = iskey ? _keysize : _valuesize;
+ max = iskey ? _keymax : _valuemax;
+ if (n > max)
+ THROW((iskey ? "Key" : "Value") << " (" << n
+ << ") too large for size (" << size << ")");
+ workgen_u64_to_string_zf(n, result, size);
+}
+
+void Operation::size_check() const {
+ if (_optype != OP_NONE && _key._size == 0 && _table.options.key_size == 0)
+ THROW("operation requires a key size");
+ if (OP_HAS_VALUE(this) && _value._size == 0 &&
+ _table.options.value_size == 0)
+ THROW("operation requires a value size");
+}
+
+Track::Track(bool latency_tracking) : ops(0), latency_ops(0), latency(0),
+ min_latency(0), max_latency(0), us(NULL), ms(NULL), sec(NULL) {
+ track_latency(latency_tracking);
+}
+
+Track::Track(const Track &other) : ops(other.ops),
+ latency_ops(other.latency_ops), latency(other.latency),
+ min_latency(other.min_latency), max_latency(other.max_latency),
+ us(NULL), ms(NULL), sec(NULL) {
+ if (other.us != NULL) {
+ us = new uint32_t[LATENCY_US_BUCKETS];
+ ms = new uint32_t[LATENCY_MS_BUCKETS];
+ sec = new uint32_t[LATENCY_SEC_BUCKETS];
+ memcpy(us, other.us, sizeof(uint32_t) * LATENCY_US_BUCKETS);
+ memcpy(ms, other.ms, sizeof(uint32_t) * LATENCY_MS_BUCKETS);
+ memcpy(sec, other.sec, sizeof(uint32_t) * LATENCY_SEC_BUCKETS);
+ }
+}
+
+Track::~Track() {
+ if (us != NULL) {
+ delete us;
+ delete ms;
+ delete sec;
+ }
+}
+
+void Track::add(Track &other, bool reset) {
+ ops += other.ops;
+ latency_ops += other.latency_ops;
+ latency += other.latency;
+
+ min_latency = MIN(min_latency, other.min_latency);
+ if (reset)
+ other.min_latency = 0;
+ max_latency = MAX(max_latency, other.max_latency);
+ if (reset)
+ other.max_latency = 0;
+
+ if (us != NULL && other.us != NULL) {
+ for (int i = 0; i < LATENCY_US_BUCKETS; i++)
+ us[i] += other.us[i];
+ for (int i = 0; i < LATENCY_MS_BUCKETS; i++)
+ ms[i] += other.ms[i];
+ for (int i = 0; i < LATENCY_SEC_BUCKETS; i++)
+ sec[i] += other.sec[i];
+ }
+}
+
+void Track::assign(const Track &other) {
+ ops = other.ops;
+ latency_ops = other.latency_ops;
+ latency = other.latency;
+ min_latency = other.min_latency;
+ max_latency = other.max_latency;
+
+ if (other.us == NULL && us != NULL) {
+ delete us;
+ delete ms;
+ delete sec;
+ us = NULL;
+ ms = NULL;
+ sec = NULL;
+ }
+ else if (other.us != NULL && us == NULL) {
+ us = new uint32_t[LATENCY_US_BUCKETS];
+ ms = new uint32_t[LATENCY_MS_BUCKETS];
+ sec = new uint32_t[LATENCY_SEC_BUCKETS];
+ }
+ if (us != NULL) {
+ memcpy(us, other.us, sizeof(uint32_t) * LATENCY_US_BUCKETS);
+ memcpy(ms, other.ms, sizeof(uint32_t) * LATENCY_MS_BUCKETS);
+ memcpy(sec, other.sec, sizeof(uint32_t) * LATENCY_SEC_BUCKETS);
+ }
+}
+
+uint64_t Track::average_latency() const {
+ if (latency_ops == 0)
+ return (0);
+ else
+ return (latency / latency_ops);
+}
+
+void Track::clear() {
+ ops = 0;
+ latency_ops = 0;
+ latency = 0;
+ min_latency = 0;
+ max_latency = 0;
+ if (us != NULL) {
+ memset(us, 0, sizeof(uint32_t) * LATENCY_US_BUCKETS);
+ memset(ms, 0, sizeof(uint32_t) * LATENCY_MS_BUCKETS);
+ memset(sec, 0, sizeof(uint32_t) * LATENCY_SEC_BUCKETS);
+ }
+}
+
+void Track::incr() {
+ ops++;
+}
+
+void Track::incr_with_latency(uint64_t usecs) {
+ ASSERT(us != NULL);
+
+ ops++;
+ latency_ops++;
+ latency += usecs;
+ if (usecs > max_latency)
+ max_latency = (uint32_t)usecs;
+ if (usecs < min_latency)
+ min_latency = (uint32_t)usecs;
+
+ // Update a latency bucket.
+ // First buckets: usecs from 100us to 1000us at 100us each.
+ if (usecs < LATENCY_US_BUCKETS)
+ us[usecs]++;
+
+ // Second buckets: milliseconds from 1ms to 1000ms, at 1ms each.
+ else if (usecs < ms_to_us(LATENCY_MS_BUCKETS))
+ ms[us_to_ms(usecs)]++;
+
+ // Third buckets are seconds from 1s to 100s, at 1s each.
+ else if (usecs < sec_to_us(LATENCY_SEC_BUCKETS))
+ sec[us_to_sec(usecs)]++;
+
+ // >100 seconds, accumulate in the biggest bucket. */
+ else
+ sec[LATENCY_SEC_BUCKETS - 1]++;
+}
+
+void Track::subtract(const Track &other) {
+ ops -= other.ops;
+ latency_ops -= other.latency_ops;
+ latency -= other.latency;
+
+ // There's no sensible thing to be done for min/max_latency.
+
+ if (us != NULL && other.us != NULL) {
+ for (int i = 0; i < LATENCY_US_BUCKETS; i++)
+ us[i] -= other.us[i];
+ for (int i = 0; i < LATENCY_MS_BUCKETS; i++)
+ ms[i] -= other.ms[i];
+ for (int i = 0; i < LATENCY_SEC_BUCKETS; i++)
+ sec[i] -= other.sec[i];
+ }
+}
+
+// If there are no entries in this Track, take them from
+// a previous Track. Used to smooth graphs. We don't worry
+// about latency buckets here.
+void Track::smooth(const Track &other) {
+ if (latency_ops == 0) {
+ ops = other.ops;
+ latency = other.latency;
+ latency_ops = other.latency_ops;
+ min_latency = other.min_latency;
+ max_latency = other.max_latency;
+ }
+}
+
+void Track::track_latency(bool newval) {
+ if (newval) {
+ if (us == NULL) {
+ us = new uint32_t[LATENCY_US_BUCKETS];
+ ms = new uint32_t[LATENCY_MS_BUCKETS];
+ sec = new uint32_t[LATENCY_SEC_BUCKETS];
+ memset(us, 0, sizeof(uint32_t) * LATENCY_US_BUCKETS);
+ memset(ms, 0, sizeof(uint32_t) * LATENCY_MS_BUCKETS);
+ memset(sec, 0, sizeof(uint32_t) * LATENCY_SEC_BUCKETS);
+ }
+ } else {
+ if (us != NULL) {
+ delete us;
+ delete ms;
+ delete sec;
+ us = NULL;
+ ms = NULL;
+ sec = NULL;
+ }
+ }
+}
+
+void Track::_get_us(long *result) {
+ if (us != NULL) {
+ for (int i = 0; i < LATENCY_US_BUCKETS; i++)
+ result[i] = (long)us[i];
+ } else
+ memset(result, 0, sizeof(long) * LATENCY_US_BUCKETS);
+}
+void Track::_get_ms(long *result) {
+ if (ms != NULL) {
+ for (int i = 0; i < LATENCY_MS_BUCKETS; i++)
+ result[i] = (long)ms[i];
+ } else
+ memset(result, 0, sizeof(long) * LATENCY_MS_BUCKETS);
+}
+void Track::_get_sec(long *result) {
+ if (sec != NULL) {
+ for (int i = 0; i < LATENCY_SEC_BUCKETS; i++)
+ result[i] = (long)sec[i];
+ } else
+ memset(result, 0, sizeof(long) * LATENCY_SEC_BUCKETS);
+}
+
+Stats::Stats(bool latency) : insert(latency), not_found(latency),
+ read(latency), remove(latency), update(latency), truncate(latency) {
+}
+
+Stats::Stats(const Stats &other) : insert(other.insert),
+ not_found(other.not_found), read(other.read), remove(other.remove),
+ update(other.update), truncate(other.truncate) {
+}
+
+Stats::~Stats() {}
+
+void Stats::add(Stats &other, bool reset) {
+ insert.add(other.insert, reset);
+ not_found.add(other.not_found, reset);
+ read.add(other.read, reset);
+ remove.add(other.remove, reset);
+ update.add(other.update, reset);
+ truncate.add(other.truncate, reset);
+}
+
+void Stats::assign(const Stats &other) {
+ insert.assign(other.insert);
+ not_found.assign(other.not_found);
+ read.assign(other.read);
+ remove.assign(other.remove);
+ update.assign(other.update);
+ truncate.assign(other.truncate);
+}
+
+void Stats::clear() {
+ insert.clear();
+ not_found.clear();
+ read.clear();
+ remove.clear();
+ update.clear();
+ truncate.clear();
+}
+
+void Stats::describe(std::ostream &os) const {
+ os << "Stats: reads " << read.ops;
+ if (not_found.ops > 0) {
+ os << " (" << not_found.ops << " not found)";
+ }
+ os << ", inserts " << insert.ops;
+ os << ", updates " << update.ops;
+ os << ", truncates " << truncate.ops;
+ os << ", removes " << remove.ops;
+}
+
+void Stats::final_report(std::ostream &os, timespec &totalsecs) const {
+ uint64_t ops = 0;
+ ops += read.ops;
+ ops += not_found.ops;
+ ops += insert.ops;
+ ops += update.ops;
+ ops += truncate.ops;
+ ops += remove.ops;
+
+#define FINAL_OUTPUT(os, field, singular, ops, totalsecs) \
+ os << "Executed " << field << " " #singular " operations (" \
+ << PCT(field, ops) << "%) " << OPS_PER_SEC(field, totalsecs) \
+ << " ops/sec" << std::endl
+
+ FINAL_OUTPUT(os, read.ops, read, ops, totalsecs);
+ FINAL_OUTPUT(os, not_found.ops, not found, ops, totalsecs);
+ FINAL_OUTPUT(os, insert.ops, insert, ops, totalsecs);
+ FINAL_OUTPUT(os, update.ops, update, ops, totalsecs);
+ FINAL_OUTPUT(os, truncate.ops, truncate, ops, totalsecs);
+ FINAL_OUTPUT(os, remove.ops, remove, ops, totalsecs);
+}
+
+void Stats::report(std::ostream &os) const {
+ os << read.ops << " reads";
+ if (not_found.ops > 0) {
+ os << " (" << not_found.ops << " not found)";
+ }
+ os << ", " << insert.ops << " inserts, ";
+ os << update.ops << " updates, ";
+ os << truncate.ops << " truncates, ";
+ os << remove.ops << " removes";
+}
+
+void Stats::smooth(const Stats &other) {
+ insert.smooth(other.insert);
+ not_found.smooth(other.not_found);
+ read.smooth(other.read);
+ remove.smooth(other.remove);
+ update.smooth(other.update);
+ truncate.smooth(other.truncate);
+}
+
+void Stats::subtract(const Stats &other) {
+ insert.subtract(other.insert);
+ not_found.subtract(other.not_found);
+ read.subtract(other.read);
+ remove.subtract(other.remove);
+ update.subtract(other.update);
+ truncate.subtract(other.truncate);
+}
+
+void Stats::track_latency(bool latency) {
+ insert.track_latency(latency);
+ not_found.track_latency(latency);
+ read.track_latency(latency);
+ remove.track_latency(latency);
+ update.track_latency(latency);
+ truncate.track_latency(latency);
+}
+
+TableOptions::TableOptions() : key_size(0), value_size(0), _options() {
+ _options.add_int("key_size", key_size,
+ "default size of the key, unless overridden by Key.size");
+ _options.add_int("value_size", value_size,
+ "default size of the value, unless overridden by Value.size");
+}
+TableOptions::TableOptions(const TableOptions &other) :
+ key_size(other.key_size), value_size(other.value_size),
+ _options(other._options) {}
+TableOptions::~TableOptions() {}
+
+Table::Table() : options(), _uri(), _internal(new TableInternal()) {
+}
+Table::Table(const char *uri) : options(), _uri(uri),
+ _internal(new TableInternal()) {
+}
+Table::Table(const Table &other) : options(other.options), _uri(other._uri),
+ _internal(new TableInternal(*other._internal)) {
+}
+Table::~Table() { delete _internal; }
+Table& Table::operator=(const Table &other) {
+ options = other.options;
+ _uri = other._uri;
+ *_internal = *other._internal;
+ return (*this);
+}
+
+void Table::describe(std::ostream &os) const {
+ os << "Table: " << _uri;
+}
+
+TableInternal::TableInternal() : _tint(0), _context_count(0) {}
+TableInternal::TableInternal(const TableInternal &other) : _tint(other._tint),
+ _context_count(other._context_count) {}
+TableInternal::~TableInternal() {}
+
+WorkloadOptions::WorkloadOptions() : max_latency(0),
+ report_file("workload.stat"), report_interval(0),
+ run_time(0), sample_interval(0), sample_rate(1),
+ _options() {
+ _options.add_int("max_latency", max_latency,
+ "prints warning if any latency measured exceeds this number of "
+ "milliseconds. Requires sample_interval to be configured.");
+ _options.add_int("report_interval", report_interval,
+ "output throughput information every interval seconds, 0 to disable");
+ _options.add_string("report_file", report_file,
+ "file name for collecting run output, "
+ "including output from the report_interval option. "
+ "The file name is relative to the connection's home directory. "
+ "When set to the empty string, stdout is used.");
+ _options.add_int("run_time", run_time, "total workload seconds");
+ _options.add_int("sample_interval", sample_interval,
+ "performance logging every interval seconds, 0 to disable");
+ _options.add_int("sample_rate", sample_rate,
+ "how often the latency of operations is measured. 1 for every operation, "
+ "2 for every second operation, 3 for every third operation etc.");
+}
+
+WorkloadOptions::WorkloadOptions(const WorkloadOptions &other) :
+ max_latency(other.max_latency), report_interval(other.report_interval),
+ run_time(other.run_time), sample_interval(other.sample_interval),
+ sample_rate(other.sample_rate), _options(other._options) {}
+WorkloadOptions::~WorkloadOptions() {}
+
+Workload::Workload(Context *context, const ThreadListWrapper &tlw) :
+ options(), stats(), _context(context), _threads(tlw._threads) {
+ if (context == NULL)
+ THROW("Workload contructor requires a Context");
+}
+
+Workload::Workload(Context *context, const Thread &thread) :
+ options(), stats(), _context(context), _threads() {
+ if (context == NULL)
+ THROW("Workload contructor requires a Context");
+ _threads.push_back(thread);
+}
+
+Workload::Workload(const Workload &other) :
+ options(other.options), stats(other.stats), _context(other._context),
+ _threads(other._threads) {}
+Workload::~Workload() {}
+
+Workload& Workload::operator=(const Workload &other) {
+ options = other.options;
+ stats.assign(other.stats);
+ *_context = *other._context;
+ _threads = other._threads;
+ return (*this);
+}
+
+int Workload::run(WT_CONNECTION *conn) {
+ WorkloadRunner runner(this);
+
+ return (runner.run(conn));
+}
+
+WorkloadRunner::WorkloadRunner(Workload *workload) :
+ _workload(workload), _trunners(workload->_threads.size()),
+ _report_out(&std::cout), _start() {
+ ts_clear(_start);
+}
+WorkloadRunner::~WorkloadRunner() {}
+
+int WorkloadRunner::run(WT_CONNECTION *conn) {
+ WT_DECL_RET;
+ WorkloadOptions *options = &_workload->options;
+ std::ofstream report_out;
+
+ _wt_home = conn->get_home(conn);
+ if (options->sample_interval > 0 && options->sample_rate <= 0)
+ THROW("Workload.options.sample_rate must be positive");
+ if (!options->report_file.empty()) {
+ open_report_file(report_out, options->report_file.c_str(),
+ "Workload.options.report_file");
+ _report_out = &report_out;
+ }
+ WT_ERR(create_all(conn, _workload->_context));
+ WT_ERR(open_all());
+ WT_ERR(ThreadRunner::cross_check(_trunners));
+ WT_ERR(run_all());
+ err:
+ //TODO: (void)close_all();
+ _report_out = &std::cout;
+ return (ret);
+}
+
+int WorkloadRunner::open_all() {
+ for (size_t i = 0; i < _trunners.size(); i++) {
+ WT_RET(_trunners[i].open_all());
+ }
+ return (0);
+}
+
+void WorkloadRunner::open_report_file(std::ofstream &of, const char *filename,
+ const char *desc) {
+ std::stringstream sstm;
+
+ if (!_wt_home.empty())
+ sstm << _wt_home << "/";
+ sstm << filename;
+ of.open(sstm.str().c_str(), std::fstream::app);
+ if (!of)
+ THROW_ERRNO(errno, desc << ": \"" << sstm.str()
+ << "\" could not be opened");
+}
+
+int WorkloadRunner::create_all(WT_CONNECTION *conn, Context *context) {
+ for (size_t i = 0; i < _trunners.size(); i++) {
+ ThreadRunner *runner = &_trunners[i];
+ std::stringstream sstm;
+ Thread *thread = &_workload->_threads[i];
+ if (thread->options.name.empty()) {
+ sstm << "thread" << i;
+ thread->options.name = sstm.str();
+ }
+ runner->_thread = thread;
+ runner->_context = context;
+ runner->_icontext = context->_internal;
+ runner->_workload = _workload;
+ runner->_wrunner = this;
+ runner->_number = (uint32_t)i;
+ // TODO: recover from partial failure here
+ WT_RET(runner->create_all(conn));
+ }
+ WT_RET(context->_internal->create_all());
+ return (0);
+}
+
+int WorkloadRunner::close_all() {
+ for (size_t i = 0; i < _trunners.size(); i++)
+ _trunners[i].close_all();
+
+ return (0);
+}
+
+void WorkloadRunner::get_stats(Stats *result) {
+ for (size_t i = 0; i < _trunners.size(); i++)
+ result->add(_trunners[i]._stats);
+}
+
+void WorkloadRunner::report(time_t interval, time_t totalsecs,
+ Stats *prev_totals) {
+ std::ostream &out = *_report_out;
+ Stats new_totals(prev_totals->track_latency());
+
+ get_stats(&new_totals);
+ Stats diff(new_totals);
+ diff.subtract(*prev_totals);
+ prev_totals->assign(new_totals);
+ diff.report(out);
+ out << " in " << interval << " secs ("
+ << totalsecs << " total secs)" << std::endl;
+}
+
+void WorkloadRunner::final_report(timespec &totalsecs) {
+ std::ostream &out = *_report_out;
+ Stats *stats = &_workload->stats;
+
+ stats->clear();
+ stats->track_latency(_workload->options.sample_interval > 0);
+
+ get_stats(stats);
+ stats->final_report(out, totalsecs);
+ out << "Run completed: " << totalsecs << " seconds" << std::endl;
+}
+
+int WorkloadRunner::run_all() {
+ void *status;
+ std::vector<pthread_t> thread_handles;
+ Stats counts(false);
+ WorkgenException *exception;
+ WorkloadOptions *options = &_workload->options;
+ Monitor monitor(*this);
+ std::ofstream monitor_out;
+ std::ostream &out = *_report_out;
+ WT_DECL_RET;
+
+ for (size_t i = 0; i < _trunners.size(); i++)
+ _trunners[i].get_static_counts(counts);
+ out << "Starting workload: " << _trunners.size() << " threads, ";
+ counts.report(out);
+ out << std::endl;
+
+ workgen_epoch(&_start);
+ timespec end = _start + options->run_time;
+ timespec next_report = _start + options->report_interval;
+
+ // Start all threads
+ if (options->sample_interval > 0) {
+ open_report_file(monitor_out, "monitor", "monitor output file");
+ monitor._out = &monitor_out;
+
+ if ((ret = pthread_create(&monitor._handle, NULL, monitor_main,
+ &monitor)) != 0) {
+ std::cerr << "monitor thread failed err=" << ret << std::endl;
+ return (ret);
+ }
+ }
+
+ for (size_t i = 0; i < _trunners.size(); i++) {
+ pthread_t thandle;
+ ThreadRunner *runner = &_trunners[i];
+ runner->_stop = false;
+ runner->_repeat = (options->run_time != 0);
+ if ((ret = pthread_create(&thandle, NULL, thread_runner_main,
+ runner)) != 0) {
+ std::cerr << "pthread_create failed err=" << ret << std::endl;
+ std::cerr << "Stopping all threads." << std::endl;
+ for (size_t j = 0; j < thread_handles.size(); j++) {
+ _trunners[j]._stop = true;
+ (void)pthread_join(thread_handles[j], &status);
+ _trunners[j].close_all();
+ }
+ return (ret);
+ }
+ thread_handles.push_back(thandle);
+ runner->_stats.clear();
+ }
+
+ // Let the test run, reporting as needed.
+ Stats curstats(false);
+ timespec now = _start;
+ while (now < end) {
+ timespec sleep_amt;
+
+ sleep_amt = end - now;
+ if (next_report != 0) {
+ timespec next_diff = next_report - now;
+ if (next_diff < next_report)
+ sleep_amt = next_diff;
+ }
+ if (sleep_amt.tv_sec > 0)
+ sleep((unsigned int)sleep_amt.tv_sec);
+ else
+ usleep((useconds_t)((sleep_amt.tv_nsec + 999)/ 1000));
+
+ workgen_epoch(&now);
+ if (now >= next_report && now < end && options->report_interval != 0) {
+ report(options->report_interval, (now - _start).tv_sec, &curstats);
+ while (now >= next_report)
+ next_report += options->report_interval;
+ }
+ }
+
+ // signal all threads to stop
+ if (options->run_time != 0)
+ for (size_t i = 0; i < _trunners.size(); i++)
+ _trunners[i]._stop = true;
+ if (options->sample_interval > 0)
+ monitor._stop = true;
+
+ // wait for all threads
+ exception = NULL;
+ for (size_t i = 0; i < _trunners.size(); i++) {
+ WT_TRET(pthread_join(thread_handles[i], &status));
+ if (_trunners[i]._errno != 0)
+ VERBOSE(_trunners[i],
+ "Thread " << i << " has errno " << _trunners[i]._errno);
+ WT_TRET(_trunners[i]._errno);
+ _trunners[i].close_all();
+ if (exception == NULL && !_trunners[i]._exception._str.empty())
+ exception = &_trunners[i]._exception;
+ }
+ if (options->sample_interval > 0) {
+ WT_TRET(pthread_join(monitor._handle, &status));
+ if (monitor._errno != 0)
+ std::cerr << "Monitor thread has errno " << monitor._errno
+ << std::endl;
+ if (exception == NULL && !monitor._exception._str.empty())
+ exception = &monitor._exception;
+ }
+
+ // issue the final report
+ timespec finalsecs = now - _start;
+ final_report(finalsecs);
+
+ if (ret != 0)
+ std::cerr << "run_all failed err=" << ret << std::endl;
+ (*_report_out) << std::endl;
+ if (exception != NULL)
+ throw *exception;
+ return (ret);
+}
+
+};
diff --git a/bench/workgen/workgen.h b/bench/workgen/workgen.h
new file mode 100644
index 00000000000..c1ae01ed5a4
--- /dev/null
+++ b/bench/workgen/workgen.h
@@ -0,0 +1,410 @@
+/*-
+ * Public Domain 2014-2017 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+#include <ostream>
+#include <string>
+#include <vector>
+#include <map>
+
+namespace workgen {
+
+struct ContextInternal;
+struct TableInternal;
+struct Thread;
+struct Transaction;
+
+#ifndef SWIG
+struct OptionsList {
+ OptionsList();
+ OptionsList(const OptionsList &other);
+
+ void add_int(const char *name, int default_value, const char *desc);
+ void add_bool(const char *name, bool default_value, const char *desc);
+ void add_double(const char *name, double default_value, const char *desc);
+ void add_string(const char *name, const std::string &default_value,
+ const char *desc);
+
+ std::string help() const;
+ std::string help_description(const char *option_name) const;
+ std::string help_type(const char *option_name) const;
+
+private:
+ void add_option(const char *name, const std::string typestr,
+ const char *desc);
+ typedef std::pair<std::string, std::string> TypeDescPair;
+ std::map<std::string, TypeDescPair> _option_map;
+};
+#endif
+
+// These classes are all exposed to Python via SWIG. While they may contain
+// data that is private to C++, such data must not prevent the objects from
+// being shared. Tables, Keys, Values, Operations and Threads can be shared: a
+// single Key object might appear in many operations; Operations may appear
+// multiple times in a Thread or in different Threads; the same Thread may
+// appear multiple times in a Workload list, etc.
+//
+// Certain kinds of state are allowed: A Table contains a unique pointer that
+// is used within the internal part of the Context. Stats contain lots
+// of state, but is made available after a Workload.run().
+//
+// Python controls the lifetime of (nearly) all objects of these classes.
+// The exception is Stat/Track objects, which are also created/used
+// internally to calculate and show statistics during a run.
+//
+struct Track {
+ // Threads maintain the total thread operation and total latency they've
+ // experienced.
+
+ uint64_t ops; // Total operations */
+ uint64_t latency_ops; // Total ops sampled for latency
+ uint64_t latency; // Total latency */
+
+ // Minimum/maximum latency, shared with the monitor thread, that is, the
+ // monitor thread clears it so it's recalculated again for each period.
+
+ uint32_t min_latency; // Minimum latency (uS)
+ uint32_t max_latency; // Maximum latency (uS)
+
+ Track(bool latency_tracking = false);
+ Track(const Track &other);
+ ~Track();
+
+ void add(Track&, bool reset = false);
+ void assign(const Track&);
+ uint64_t average_latency() const;
+ void clear();
+ void incr();
+ void incr_with_latency(uint64_t usecs);
+ void smooth(const Track&);
+ void subtract(const Track&);
+ void track_latency(bool);
+ bool track_latency() const { return (us != NULL); }
+
+ void _get_us(long *);
+ void _get_ms(long *);
+ void _get_sec(long *);
+
+private:
+ // Latency buckets. From python, accessed via methods us(), ms(), sec()
+ uint32_t *us; // < 1us ... 1000us
+ uint32_t *ms; // < 1ms ... 1000ms
+ uint32_t *sec; // < 1s 2s ... 100s
+
+ Track & operator=(const Track &other); // use explicit assign method
+};
+
+struct Stats {
+ Track insert;
+ Track not_found;
+ Track read;
+ Track remove;
+ Track update;
+ Track truncate;
+
+ Stats(bool latency = false);
+ Stats(const Stats &other);
+ ~Stats();
+
+ void add(Stats&, bool reset = false);
+ void assign(const Stats&);
+ void clear();
+ void describe(std::ostream &os) const;
+#ifndef SWIG
+ void final_report(std::ostream &os, timespec &totalsecs) const;
+ void report(std::ostream &os) const;
+#endif
+ void smooth(const Stats&);
+ void subtract(const Stats&);
+ void track_latency(bool);
+ bool track_latency() const { return (insert.track_latency()); }
+
+private:
+ Stats & operator=(const Stats &other); // use explicit assign method
+};
+
+// A Context tracks the current record number for each uri, used
+// for key generation.
+//
+struct Context {
+ bool _verbose;
+ ContextInternal *_internal;
+
+ Context();
+ ~Context();
+ void describe(std::ostream &os) const {
+ os << "Context: verbose " << (_verbose ? "true" : "false");
+ }
+
+#ifndef SWIG
+ Context& operator=(const Context &other);
+#endif
+};
+
+// To prevent silent errors, this class is set up in Python so that new
+// properties are prevented, only existing properties can be set.
+//
+struct TableOptions {
+ int key_size;
+ int value_size;
+
+ TableOptions();
+ TableOptions(const TableOptions &other);
+ ~TableOptions();
+
+ void describe(std::ostream &os) const {
+ os << "key_size " << key_size;
+ os << ", value_size " << value_size;
+ }
+
+ std::string help() const { return _options.help(); }
+ std::string help_description(const char *option_name) const {
+ return _options.help_description(option_name); }
+ std::string help_type(const char *option_name) const {
+ return _options.help_type(option_name); }
+
+private:
+ OptionsList _options;
+};
+
+struct Table {
+ TableOptions options;
+ std::string _uri;
+ TableInternal *_internal;
+
+ /* XXX select table from range */
+
+ Table();
+ Table(const char *tablename);
+ Table(const Table &other);
+ ~Table();
+
+ void describe(std::ostream &os) const;
+
+#ifndef SWIG
+ Table& operator=(const Table &other);
+#endif
+};
+
+struct Key {
+ typedef enum {
+ KEYGEN_AUTO, KEYGEN_APPEND, KEYGEN_PARETO, KEYGEN_UNIFORM } KeyType;
+ KeyType _keytype;
+ int _size;
+
+ /* XXX specify more about key distribution */
+ Key() : _keytype(KEYGEN_AUTO), _size(0) {}
+ Key(KeyType keytype, int size) : _keytype(keytype), _size(size) {}
+ Key(const Key &other) : _keytype(other._keytype), _size(other._size) {}
+ ~Key() {}
+
+ void describe(std::ostream &os) const {
+ os << "Key: type " << _keytype << ", size " << _size; }
+};
+
+struct Value {
+ int _size;
+
+ /* XXX specify how value is calculated */
+ Value() : _size(0) {}
+ Value(int size) : _size(size) {}
+ Value(const Value &other) : _size(other._size) {}
+ ~Value() {}
+
+ void describe(std::ostream &os) const { os << "Value: size " << _size; }
+};
+
+struct Operation {
+ enum OpType {
+ OP_NONE, OP_INSERT, OP_REMOVE, OP_SEARCH, OP_UPDATE };
+ OpType _optype;
+
+ Table _table;
+ Key _key;
+ Value _value;
+ Transaction *_transaction;
+ std::vector<Operation> *_group;
+ int _repeatgroup;
+
+#ifndef SWIG
+ int _keysize; // derived from Key._size and Table.options.key_size
+ int _valuesize;
+ uint64_t _keymax;
+ uint64_t _valuemax;
+#endif
+
+ Operation();
+ Operation(OpType optype, Table table, Key key, Value value);
+ Operation(OpType optype, Table table, Key key);
+ Operation(OpType optype, Table table);
+ Operation(const Operation &other);
+ ~Operation();
+
+ void describe(std::ostream &os) const;
+#ifndef SWIG
+ Operation& operator=(const Operation &other);
+ void get_static_counts(Stats &stats, int multiplier);
+ void kv_compute_max(bool);
+ void kv_gen(bool, uint64_t, char *) const;
+ void kv_size_buffer(bool iskey, size_t &size) const;
+ void size_check() const;
+#endif
+};
+
+// To prevent silent errors, this class is set up in Python so that new
+// properties are prevented, only existing properties can be set.
+//
+struct ThreadOptions {
+ std::string name;
+ double throttle;
+ double throttle_burst;
+
+ ThreadOptions();
+ ThreadOptions(const ThreadOptions &other);
+ ~ThreadOptions();
+
+ void describe(std::ostream &os) const {
+ os << "throttle " << throttle;
+ }
+
+ std::string help() const { return _options.help(); }
+ std::string help_description(const char *option_name) const {
+ return _options.help_description(option_name); }
+ std::string help_type(const char *option_name) const {
+ return _options.help_type(option_name); }
+
+private:
+ OptionsList _options;
+};
+
+// This is a list of threads, which may be used in the Workload constructor.
+// It participates with ThreadList defined on the SWIG/Python side and
+// some Python operators added to Thread to allow Threads to be easily
+// composed using '+' and multiplied (by integer counts) using '*'.
+// Users of the workgen API in Python don't ever need to use
+// ThreadListWrapper or ThreadList.
+struct ThreadListWrapper {
+ std::vector<Thread> _threads;
+
+ ThreadListWrapper() : _threads() {}
+ ThreadListWrapper(const ThreadListWrapper &other) :
+ _threads(other._threads) {}
+ ThreadListWrapper(const std::vector<Thread> &threads) : _threads(threads) {}
+ void extend(const ThreadListWrapper &);
+ void append(const Thread &);
+ void multiply(const int);
+};
+
+struct Thread {
+ ThreadOptions options;
+ Operation _op;
+
+ Thread();
+ Thread(const Operation &op);
+ Thread(const Thread &other);
+ ~Thread();
+
+ void describe(std::ostream &os) const;
+};
+
+struct Transaction {
+ bool _rollback;
+ std::string _begin_config;
+ std::string _commit_config;
+
+ Transaction(const char *_config = NULL) : _rollback(false),
+ _begin_config(_config == NULL ? "" : _config), _commit_config() {}
+
+ void describe(std::ostream &os) const {
+ os << "Transaction: ";
+ if (_rollback)
+ os << "(rollback) ";
+ os << "begin_config: " << _begin_config;
+ if (!_commit_config.empty())
+ os << ", commit_config: " << _commit_config;
+ }
+};
+
+// To prevent silent errors, this class is set up in Python so that new
+// properties are prevented, only existing properties can be set.
+//
+struct WorkloadOptions {
+ int max_latency;
+ std::string report_file;
+ int report_interval;
+ int run_time;
+ int sample_interval;
+ int sample_rate;
+
+ WorkloadOptions();
+ WorkloadOptions(const WorkloadOptions &other);
+ ~WorkloadOptions();
+
+ void describe(std::ostream &os) const {
+ os << "run_time " << run_time;
+ os << ", report_interval " << report_interval;
+ }
+
+ std::string help() const { return _options.help(); }
+ std::string help_description(const char *option_name) const {
+ return _options.help_description(option_name); }
+ std::string help_type(const char *option_name) const {
+ return _options.help_type(option_name); }
+
+private:
+ OptionsList _options;
+};
+
+struct Workload {
+ WorkloadOptions options;
+ Stats stats;
+ Context *_context;
+ std::vector<Thread> _threads;
+
+ Workload(Context *context, const ThreadListWrapper &threadlist);
+ Workload(Context *context, const Thread &thread);
+ Workload(const Workload &other);
+ ~Workload();
+
+#ifndef SWIG
+ Workload& operator=(const Workload &other);
+#endif
+
+ void describe(std::ostream &os) const {
+ os << "Workload: ";
+ _context->describe(os);
+ os << ", ";
+ options.describe(os);
+ os << ", [" << std::endl;
+ for (std::vector<Thread>::const_iterator i = _threads.begin(); i != _threads.end(); i++) {
+ os << " "; i->describe(os); os << std::endl;
+ }
+ os << "]";
+ }
+ int run(WT_CONNECTION *conn);
+};
+
+};
diff --git a/bench/workgen/workgen.swig b/bench/workgen/workgen.swig
new file mode 100644
index 00000000000..0f74942169c
--- /dev/null
+++ b/bench/workgen/workgen.swig
@@ -0,0 +1,233 @@
+/*-
+ * Public Domain 2014-2017 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+/*
+ * workgen.swig
+ * The SWIG interface file defining the workgen python API.
+ */
+
+%include "typemaps.i"
+%include "std_vector.i"
+%include "std_string.i"
+%include "stdint.i"
+%include "attribute.i"
+%include "carrays.i"
+
+/* We only need to reference WiredTiger types. */
+%import "wiredtiger.h"
+
+%{
+#include <ostream>
+#include <sstream>
+#include <signal.h>
+#include "wiredtiger.h"
+#include "workgen.h"
+#include "workgen_int.h"
+%}
+
+%pythoncode %{
+import numbers
+%}
+
+%exception {
+ try {
+ $action
+ }
+ catch (workgen::WorkgenException &wge) {
+ SWIG_exception_fail(SWIG_RuntimeError, wge._str.c_str());
+ }
+}
+
+/*
+ * Some functions are long running, turn off signal handling that was enabled
+ * by the Python interpreter. This means that a signal handler coded in Python
+ * won't work when spanning a call to one of these long running functions, but
+ * it's doubtful our test scripts need signals at all. This could be made to
+ * work, it's just not worth the trouble.
+ */
+%define InterruptableFunction(funcname)
+%exception funcname {
+ try {
+ void (*savesig)(int) = signal(SIGINT, SIG_DFL);
+ $action
+ (void)signal(SIGINT, savesig);
+ }
+ catch (workgen::WorkgenException &wge) {
+ SWIG_exception_fail(SWIG_RuntimeError, wge._str.c_str());
+ }
+}
+%enddef
+
+/*
+ * Define a __str__ function for all public workgen classes.
+ */
+%define WorkgenClass(classname)
+%extend workgen::classname {
+ const std::string __str__() {
+ std::ostringstream out;
+ $self->describe(out);
+ return out.str();
+ }
+};
+%enddef
+
+/*
+ * To forestall errors, make it impossible to add new attributes to certain
+ * classes. This trick relies on the implementation of SWIG providing
+ * predictably named functions in the _workgen namespace to set attributes.
+ */
+%define WorkgenFrozenClass(classname)
+%extend workgen::classname {
+%pythoncode %{
+ def __setattr__(self, attr, val):
+ if getattr(self, attr) == None:
+ raise AttributeError("'" + #classname +
+ "' object has no attribute '" + attr + "'")
+ f = _workgen.__dict__[#classname + '_' + attr + '_set']
+ f(self, val)
+%}
+};
+%enddef
+
+InterruptableFunction(workgen::execute)
+InterruptableFunction(workgen::Workload::run)
+
+%module workgen
+/* Parse the header to generate wrappers. */
+%include "workgen.h"
+
+%template(OpList) std::vector<workgen::Operation>;
+%template(ThreadList) std::vector<workgen::Thread>;
+%array_class(uint32_t, uint32Array);
+%array_class(long, longArray);
+
+WorkgenClass(Key)
+WorkgenClass(Operation)
+WorkgenClass(Stats)
+WorkgenClass(Table)
+WorkgenClass(TableOptions)
+WorkgenClass(Thread)
+WorkgenClass(ThreadOptions)
+WorkgenClass(Transaction)
+WorkgenClass(Value)
+WorkgenClass(Workload)
+WorkgenClass(WorkloadOptions)
+WorkgenClass(Context)
+
+WorkgenFrozenClass(TableOptions)
+WorkgenFrozenClass(ThreadOptions)
+WorkgenFrozenClass(WorkloadOptions)
+
+%extend workgen::Operation {
+%pythoncode %{
+ def __mul__(self, other):
+ if not isinstance(other, numbers.Integral):
+ raise Exception('Operation.__mul__ requires an integral number')
+ op = Operation()
+ op._group = OpList([self])
+ op._repeatgroup = other
+ return op
+
+ __rmul__ = __mul__
+
+ def __add__(self, other):
+ if not isinstance(other, Operation):
+ raise Exception('Operation.__sum__ requires an Operation')
+ if self._group == None or self._repeatgroup != 1 or self._transaction != None:
+ op = Operation()
+ op._group = OpList([self, other])
+ op._repeatgroup = 1
+ return op
+ else:
+ self._group.append(other)
+ return self
+%}
+};
+
+%extend workgen::Thread {
+%pythoncode %{
+ def __mul__(self, other):
+ if not isinstance(other, numbers.Integral):
+ raise Exception('Thread.__mul__ requires an integral number')
+ return ThreadListWrapper(ThreadList([self] * other))
+
+ __rmul__ = __mul__
+
+ def __add__(self, other):
+ if type(self) != type(other):
+ raise Exception('Thread.__sum__ requires an Thread')
+ return ThreadListWrapper(ThreadList([self, other]))
+%}
+};
+
+%extend workgen::ThreadListWrapper {
+%pythoncode %{
+ def __mul__(self, other):
+ if not isinstance(other, numbers.Integral):
+ raise Exception('ThreadList.__mul__ requires an integral number')
+ tlw = ThreadListWrapper(self)
+ tlw.multiply(other)
+ return tlw
+
+ __rmul__ = __mul__
+
+ def __add__(self, other):
+ tlw = ThreadListWrapper(self)
+ if isinstance(other, ThreadListWrapper):
+ tlw.extend(other)
+ elif isinstance(other, Thread):
+ tlw.append(other)
+ else:
+ raise Exception('ThreadList.__sum__ requires an Thread or ThreadList')
+ return tlw
+%}
+};
+
+%extend workgen::Track {
+%pythoncode %{
+ def __longarray(self, size):
+ result = longArray(size)
+ result.__len__ = lambda: size
+ return result
+
+ def us(self):
+ result = self.__longarray(1000)
+ self._get_us(result)
+ return result
+
+ def ms(self):
+ result = self.__longarray(1000)
+ self._get_ms(result)
+ return result
+
+ def sec(self):
+ result = self.__longarray(100)
+ self._get_sec(result)
+ return result
+%}
+};
diff --git a/bench/workgen/workgen/__init__.py b/bench/workgen/workgen/__init__.py
new file mode 100644
index 00000000000..00e8f257546
--- /dev/null
+++ b/bench/workgen/workgen/__init__.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2016 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+# __init__.py
+# initialization for workgen module
+#
+import os, sys
+
+# After importing the SWIG-generated file, copy all symbols from from it
+# to this module so they will appear in the workgen namespace.
+me = sys.modules[__name__]
+sys.path.append(os.path.dirname(__file__)) # needed for Python3
+import workgen, workgen_util
+for module in workgen:
+ for name in dir(module):
+ value = getattr(module, name)
+ setattr(me, name, value)
diff --git a/bench/workgen/workgen_func.c b/bench/workgen/workgen_func.c
new file mode 100644
index 00000000000..2e1271a515e
--- /dev/null
+++ b/bench/workgen/workgen_func.c
@@ -0,0 +1,89 @@
+/*-
+ * Public Domain 2014-2017 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+#include "wiredtiger.h"
+#include "test_util.h"
+#include "workgen_func.h"
+
+/* workgen_random_state is used as an opaque type handle. */
+typedef struct workgen_random_state {
+ WT_RAND_STATE state;
+} workgen_random_state;
+
+/*
+ * These functions call their WiredTiger equivalents.
+ */
+uint32_t
+workgen_atomic_add32(uint32_t *vp, uint32_t v)
+{
+ return (__wt_atomic_add32(vp, v));
+}
+
+uint64_t
+workgen_atomic_add64(uint64_t *vp, uint64_t v)
+{
+ return (__wt_atomic_add64(vp, v));
+}
+
+void
+workgen_epoch(struct timespec *tsp)
+{
+ __wt_epoch(NULL, tsp);
+}
+
+uint32_t
+workgen_random(workgen_random_state volatile * rnd_state)
+{
+ return (__wt_random(&rnd_state->state));
+}
+
+int
+workgen_random_alloc(WT_SESSION *session, workgen_random_state **rnd_state)
+{
+ workgen_random_state *state;
+
+ state = malloc(sizeof(workgen_random_state));
+ if (state == NULL) {
+ *rnd_state = NULL;
+ return (ENOMEM);
+ }
+ __wt_random_init_seed((WT_SESSION_IMPL *)session, &state->state);
+ *rnd_state = state;
+ return (0);
+}
+
+void
+workgen_random_free(workgen_random_state *rnd_state)
+{
+ free(rnd_state);
+}
+
+extern void
+workgen_u64_to_string_zf(uint64_t n, char *buf, size_t len)
+{
+ u64_to_string_zf(n, buf, len);
+}
diff --git a/bench/workgen/workgen_func.h b/bench/workgen/workgen_func.h
new file mode 100644
index 00000000000..20ebf2632cc
--- /dev/null
+++ b/bench/workgen/workgen_func.h
@@ -0,0 +1,44 @@
+/*-
+ * Public Domain 2014-2017 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+struct workgen_random_state;
+
+extern uint32_t
+workgen_atomic_add32(uint32_t *vp, uint32_t v);
+extern uint64_t
+workgen_atomic_add64(uint64_t *vp, uint64_t v);
+extern void
+workgen_epoch(struct timespec *tsp);
+extern uint32_t
+workgen_random(struct workgen_random_state volatile *rnd_state);
+extern int
+workgen_random_alloc(WT_SESSION *session,
+ struct workgen_random_state **rnd_state);
+extern void
+workgen_random_free(struct workgen_random_state *rnd_state);
+extern void
+workgen_u64_to_string_zf(uint64_t n, char *buf, size_t len);
diff --git a/bench/workgen/workgen_int.h b/bench/workgen/workgen_int.h
new file mode 100644
index 00000000000..01fb727691b
--- /dev/null
+++ b/bench/workgen/workgen_int.h
@@ -0,0 +1,205 @@
+/*-
+ * Public Domain 2014-2017 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+#include <ostream>
+#include <string>
+#include <vector>
+#include <map>
+#include <set>
+#ifndef SWIG
+extern "C" {
+#include "workgen_func.h"
+}
+#endif
+
+namespace workgen {
+
+// A 'tint' or ('table integer') is a unique small value integer
+// assigned to each table URI in use. Currently, we assign it once,
+// and its value persists through the lifetime of the Context.
+typedef uint32_t tint_t;
+
+struct ThreadRunner;
+struct WorkloadRunner;
+
+// A exception generated by the workgen classes. Methods generally return an
+// int errno, so this is useful primarily for notifying the caller about
+// failures in constructors.
+struct WorkgenException {
+ std::string _str;
+ WorkgenException() : _str() {}
+ WorkgenException(int err, const char *msg = NULL) : _str() {
+ if (err != 0)
+ _str += wiredtiger_strerror(err);
+ if (msg != NULL) {
+ if (!_str.empty())
+ _str += ": ";
+ _str += msg;
+ }
+ }
+ WorkgenException(const WorkgenException &other) : _str(other._str) {}
+ ~WorkgenException() {}
+};
+
+struct Throttle {
+ ThreadRunner &_runner;
+ double _throttle;
+ double _burst;
+ timespec _next_div;
+ int64_t _ops_delta;
+ uint64_t _ops_prev; // previously returned value
+ uint64_t _ops_per_div; // statically calculated.
+ uint64_t _ms_per_div; // statically calculated.
+ bool _started;
+
+ Throttle(ThreadRunner &runner, double throttle, double burst);
+ ~Throttle();
+
+ // Called with the number of operations since the last throttle.
+ // Sleeps for any needed amount and returns the number operations the
+ // caller should perform before the next call to throttle.
+ int throttle(uint64_t op_count, uint64_t *op_limit);
+};
+
+// There is one of these per Thread object. It exists for the duration of a
+// call to Workload::run() method.
+struct ThreadRunner {
+ int _errno;
+ WorkgenException _exception;
+ Thread *_thread;
+ Context *_context;
+ ContextInternal *_icontext;
+ Workload *_workload;
+ WorkloadRunner *_wrunner;
+ workgen_random_state *_rand_state;
+ Throttle *_throttle;
+ uint64_t _throttle_ops;
+ uint64_t _throttle_limit;
+ bool _in_transaction;
+ uint32_t _number;
+ Stats _stats;
+
+ typedef enum {
+ USAGE_READ = 0x1, USAGE_WRITE = 0x2, USAGE_MIXED = 0x4 } Usage;
+ std::map<tint_t, uint32_t> _table_usage; // value is Usage
+ WT_CURSOR **_cursors; // indexed by tint_t
+ volatile bool _stop;
+ WT_SESSION *_session;
+ char *_keybuf;
+ char *_valuebuf;
+ bool _repeat;
+
+ ThreadRunner();
+ ~ThreadRunner();
+
+ void free_all();
+ static int cross_check(std::vector<ThreadRunner> &runners);
+
+ int close_all();
+ int create_all(WT_CONNECTION *conn);
+ void get_static_counts(Stats &);
+ int open_all();
+ int run();
+
+ void op_create_all(Operation *, size_t &keysize, size_t &valuesize);
+ uint64_t op_get_key_recno(Operation *, tint_t tint);
+ void op_get_static_counts(Operation *, Stats &, int);
+ int op_run(Operation *);
+
+#ifdef _DEBUG
+ std::stringstream _debug_messages;
+ std::string get_debug();
+#define DEBUG_CAPTURE(runner, expr) runner._debug_messages << expr
+#else
+#define DEBUG_CAPTURE(runner, expr)
+#endif
+};
+
+struct Monitor {
+ int _errno;
+ WorkgenException _exception;
+ WorkloadRunner &_wrunner;
+ volatile bool _stop;
+ pthread_t _handle;
+ std::ostream *_out;
+
+ Monitor(WorkloadRunner &wrunner);
+ ~Monitor();
+ int run();
+};
+
+struct ContextInternal {
+ std::map<std::string, tint_t> _tint; // maps uri -> tint_t
+ std::map<tint_t, std::string> _table_names; // reverse mapping
+ uint64_t *_recno; // # entries per tint_t
+ uint32_t _recno_alloced; // length of allocated _recno
+ tint_t _tint_last; // last tint allocated
+ // unique id per context, to work with multiple contexts, starts at 1.
+ uint32_t _context_count;
+
+ ContextInternal();
+ ~ContextInternal();
+ int create_all();
+};
+
+struct TableInternal {
+ tint_t _tint;
+ uint32_t _context_count;
+
+ TableInternal();
+ TableInternal(const TableInternal &other);
+ ~TableInternal();
+};
+
+// An instance of this class only exists for the duration of one call to a
+// Workload::run() method.
+struct WorkloadRunner {
+ Workload *_workload;
+ std::vector<ThreadRunner> _trunners;
+ std::ostream *_report_out;
+ std::string _wt_home;
+ timespec _start;
+
+ WorkloadRunner(Workload *);
+ ~WorkloadRunner();
+ int run(WT_CONNECTION *conn);
+
+private:
+ int close_all();
+ int create_all(WT_CONNECTION *conn, Context *context);
+ void final_report(timespec &);
+ void get_stats(Stats *stats);
+ int open_all();
+ void open_report_file(std::ofstream &, const char *, const char *);
+ void report(time_t, time_t, Stats *stats);
+ int run_all();
+
+ WorkloadRunner(const WorkloadRunner &); // disallowed
+ WorkloadRunner& operator=(const WorkloadRunner &other); // disallowed
+};
+
+};
diff --git a/bench/workgen/workgen_time.h b/bench/workgen/workgen_time.h
new file mode 100644
index 00000000000..f33eb64d9c9
--- /dev/null
+++ b/bench/workgen/workgen_time.h
@@ -0,0 +1,201 @@
+/*-
+ * Public Domain 2014-2017 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+#define THOUSAND (1000ULL)
+#define MILLION (1000000ULL)
+#define BILLION (1000000000ULL)
+
+#define NSEC_PER_SEC BILLION
+#define USEC_PER_SEC MILLION
+#define MSEC_PER_SEC THOUSAND
+
+#define ns_to_ms(v) ((v) / MILLION)
+#define ns_to_sec(v) ((v) / BILLION)
+#define ns_to_us(v) ((v) / THOUSAND)
+
+#define us_to_ms(v) ((v) / THOUSAND)
+#define us_to_ns(v) ((v) * THOUSAND)
+#define us_to_sec(v) ((v) / MILLION)
+
+#define ms_to_ns(v) ((v) * MILLION)
+#define ms_to_us(v) ((v) * THOUSAND)
+#define ms_to_sec(v) ((v) / THOUSAND)
+
+#define sec_to_ns(v) ((v) * BILLION)
+#define sec_to_us(v) ((v) * MILLION)
+#define sec_to_ms(v) ((v) * THOUSAND)
+
+inline std::ostream&
+operator<<(std::ostream &os, const timespec &ts)
+{
+ char oldfill;
+ std::streamsize oldwidth;
+
+ os << ts.tv_sec << ".";
+ oldfill = os.fill('0');
+ oldwidth = os.width(3);
+ os << (int)ns_to_ms(ts.tv_nsec);
+ os.fill(oldfill);
+ os.width(oldwidth);
+ return (os);
+}
+
+inline timespec
+operator-(const timespec &lhs, const timespec &rhs)
+{
+ timespec ts;
+
+ if (lhs.tv_nsec < rhs.tv_nsec) {
+ ts.tv_sec = lhs.tv_sec - rhs.tv_sec - 1;
+ ts.tv_nsec = lhs.tv_nsec - rhs.tv_nsec + NSEC_PER_SEC;
+ } else {
+ ts.tv_sec = lhs.tv_sec - rhs.tv_sec;
+ ts.tv_nsec = lhs.tv_nsec - rhs.tv_nsec;
+ }
+ return (ts);
+}
+
+inline timespec
+operator+(const timespec &lhs, const int n)
+{
+ timespec ts = lhs;
+ ts.tv_sec += n;
+ return (ts);
+}
+
+inline bool
+operator<(const timespec &lhs, const timespec &rhs)
+{
+ if (lhs.tv_sec == rhs.tv_sec)
+ return (lhs.tv_nsec < rhs.tv_nsec);
+ else
+ return (lhs.tv_sec < rhs.tv_sec);
+}
+
+inline bool
+operator>(const timespec &lhs, const timespec &rhs)
+{
+ if (lhs.tv_sec == rhs.tv_sec)
+ return (lhs.tv_nsec > rhs.tv_nsec);
+ else
+ return (lhs.tv_sec > rhs.tv_sec);
+}
+
+inline bool
+operator>=(const timespec &lhs, const timespec &rhs)
+{
+ return (!(lhs < rhs));
+}
+
+inline bool
+operator<=(const timespec &lhs, const timespec &rhs)
+{
+ return (!(lhs > rhs));
+}
+
+inline bool
+operator==(const timespec &lhs, int n)
+{
+ return (lhs.tv_sec == n && lhs.tv_nsec == 0);
+}
+
+inline bool
+operator!=(const timespec &lhs, int n)
+{
+ return (lhs.tv_sec != n || lhs.tv_nsec != 0);
+}
+
+inline timespec &
+operator+=(timespec &lhs, const int n)
+{
+ lhs.tv_sec += n;
+ return (lhs);
+}
+
+inline bool
+operator==(const timespec &lhs, const timespec &rhs)
+{
+ return (lhs.tv_sec == rhs.tv_sec && lhs.tv_nsec == rhs.tv_nsec);
+}
+
+inline timespec &
+operator-=(timespec &lhs, const timespec &rhs)
+{
+ lhs.tv_sec -= rhs.tv_sec;
+ lhs.tv_nsec -= rhs.tv_nsec;
+ if (lhs.tv_nsec < 0) {
+ lhs.tv_nsec += NSEC_PER_SEC;
+ lhs.tv_sec -= 1;
+ }
+ return (lhs);
+}
+
+inline timespec
+ts_add_ms(const timespec &lhs, const uint64_t n)
+{
+ timespec ts;
+
+ ts.tv_sec = lhs.tv_sec + ms_to_sec(n);
+ ts.tv_nsec = lhs.tv_nsec + ms_to_ns(n % THOUSAND);
+ while ((unsigned long)ts.tv_nsec > NSEC_PER_SEC) {
+ ts.tv_nsec -= NSEC_PER_SEC;
+ ts.tv_sec++;
+ }
+ return (ts);
+}
+
+inline void
+ts_assign(timespec &lhs, const timespec &rhs)
+{
+ lhs.tv_sec = rhs.tv_sec;
+ lhs.tv_nsec = rhs.tv_nsec;
+}
+
+inline void
+ts_clear(timespec &ts)
+{
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+}
+
+inline uint64_t
+ts_sec(const timespec &ts)
+{
+ return (ns_to_sec(ts.tv_nsec) + ts.tv_sec);
+}
+
+inline uint64_t
+ts_ms(const timespec &ts)
+{
+ return (ns_to_ms(ts.tv_nsec) + sec_to_ms(ts.tv_sec));
+}
+
+inline uint64_t
+ts_us(const timespec &ts)
+{
+ return (ns_to_us(ts.tv_nsec) + sec_to_us(ts.tv_sec));
+}
diff --git a/build_posix/Make.subdirs b/build_posix/Make.subdirs
index 4ecec37ca6c..ec928a9ead2 100644
--- a/build_posix/Make.subdirs
+++ b/build_posix/Make.subdirs
@@ -1,10 +1,11 @@
# List of sub-directories, used by makemake to create Makefile.am
#
# The format is:
-# <dir> [<condition>]
+# <dir> [<condition> ...]
#
# If the directory exists, it is added to AUTO_SUBDIRS.
-# If a condition is included, the subdir is made conditional via AM_CONDITIONAL
+# If condition(s) are included, the subdir is made conditional via
+# AM_CONDITIONAL. All conditions must be true to include the directory.
ext/collators/reverse
ext/collators/revint
ext/compressors/lz4 LZ4
@@ -45,4 +46,5 @@ test/syscall
test/thread
# Benchmark programs.
+bench/workgen PYTHON HAVE_CXX
bench/wtperf
diff --git a/build_posix/configure.ac.in b/build_posix/configure.ac.in
index 0fef587b4b8..68b4d84ba59 100644
--- a/build_posix/configure.ac.in
+++ b/build_posix/configure.ac.in
@@ -24,6 +24,7 @@ AM_PROG_AS(as gas)
define([AC_LIBTOOL_LANG_CXX_CONFIG], [:])dnl
define([AC_LIBTOOL_LANG_F77_CONFIG], [:])dnl
+AM_CONDITIONAL([HAVE_CXX], [test "$CXX" != ""])
LT_PREREQ(2.2.6)
LT_INIT([pic-only])
diff --git a/build_posix/makemake b/build_posix/makemake
index 506420b4aaf..73d6b6bcfb1 100755
--- a/build_posix/makemake
+++ b/build_posix/makemake
@@ -7,14 +7,24 @@
(sed -n '1,/BEGIN SUBDIRS/p' Make.base
echo "SUBDIRS ="
-sed -e 's/#.*$//' -e '/^$/d' Make.subdirs | while read dir cond ; do
+sed -e 's/#.*$//' -e '/^$/d' Make.subdirs | while read dir conds ; do
test -d ../$dir || continue
- if test -n "$cond" ; then
- cat <<END_CONDITIONAL
+ if test -n "$conds" ; then
+ # Multiple conditions are allowed, they will appear
+ # as nested 'if' statements.
+ for cond in $conds; do
+ cat <<END_CONDITIONAL
if ${cond}
+END_CONDITIONAL
+ done
+ cat <<END_CONDITIONAL
SUBDIRS += $dir
+END_CONDITIONAL
+ for cond in $conds; do
+ cat <<END_CONDITIONAL
endif
END_CONDITIONAL
+ done
else
echo "SUBDIRS += $dir"
fi
diff --git a/dist/s_string.ok b/dist/s_string.ok
index ce4e9f963b0..7c409e0e46d 100644
--- a/dist/s_string.ok
+++ b/dist/s_string.ok
@@ -353,6 +353,8 @@ TORTIOUS
TSO
TXN
TXNC
+ThreadList
+ThreadListWrapper
Timespec
Timestamp
TryCV
@@ -1194,6 +1196,7 @@ txnid
txnmin
txt
typedef
+typemaps
uB
uS
ui
@@ -1274,6 +1277,7 @@ whitespace
wiredTiger
wiredtiger
workFactor
+workgen
wrapup
writeable
writelock
diff --git a/dist/s_whitespace b/dist/s_whitespace
index 0de59bc5825..874074dfb50 100755
--- a/dist/s_whitespace
+++ b/dist/s_whitespace
@@ -8,6 +8,7 @@ trap 'rm -f $t' 0 1 2 3 13 15
# into a single line, discard trailing empty lines.
whitespace()
{
+ ! head $1 | grep -q 'automatically generated by SWIG' || return
sed -e 's/[ ][ ]*$//' < $1 | \
cat -s | \
sed -e '${' -e '/^$/d' -e '}' > $t