summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/bench/wtperf/wtperf.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/bench/wtperf/wtperf.c')
-rw-r--r--src/third_party/wiredtiger/bench/wtperf/wtperf.c5107
1 files changed, 2428 insertions, 2679 deletions
diff --git a/src/third_party/wiredtiger/bench/wtperf/wtperf.c b/src/third_party/wiredtiger/bench/wtperf/wtperf.c
index 27f9582dc25..3a076d4244a 100644
--- a/src/third_party/wiredtiger/bench/wtperf/wtperf.c
+++ b/src/third_party/wiredtiger/bench/wtperf/wtperf.c
@@ -29,88 +29,84 @@
#include "wtperf.h"
/* Default values. */
-#define DEFAULT_HOME "WT_TEST"
-#define DEFAULT_MONITOR_DIR "WT_TEST"
+#define DEFAULT_HOME "WT_TEST"
+#define DEFAULT_MONITOR_DIR "WT_TEST"
static WT_THREAD_RET checkpoint_worker(void *);
-static int drop_all_tables(WTPERF *);
-static int execute_populate(WTPERF *);
-static int execute_workload(WTPERF *);
-static int find_table_count(WTPERF *);
+static int drop_all_tables(WTPERF *);
+static int execute_populate(WTPERF *);
+static int execute_workload(WTPERF *);
+static int find_table_count(WTPERF *);
static WT_THREAD_RET monitor(void *);
static WT_THREAD_RET populate_thread(void *);
-static void randomize_value(WTPERF_THREAD *, char *);
-static void recreate_dir(const char *);
-static int start_all_runs(WTPERF *);
-static int start_run(WTPERF *);
-static void start_threads(WTPERF *, WORKLOAD *,
- WTPERF_THREAD *, u_int, WT_THREAD_CALLBACK(*)(void *));
-static void stop_threads(u_int, WTPERF_THREAD *);
+static void randomize_value(WTPERF_THREAD *, char *);
+static void recreate_dir(const char *);
+static int start_all_runs(WTPERF *);
+static int start_run(WTPERF *);
+static void start_threads(
+ WTPERF *, WORKLOAD *, WTPERF_THREAD *, u_int, WT_THREAD_CALLBACK (*)(void *));
+static void stop_threads(u_int, WTPERF_THREAD *);
static WT_THREAD_RET thread_run_wtperf(void *);
-static void update_value_delta(WTPERF_THREAD *);
+static void update_value_delta(WTPERF_THREAD *);
static WT_THREAD_RET worker(void *);
-static uint64_t wtperf_rand(WTPERF_THREAD *);
-static uint64_t wtperf_value_range(WTPERF *);
+static uint64_t wtperf_rand(WTPERF_THREAD *);
+static uint64_t wtperf_value_range(WTPERF *);
-#define INDEX_COL_NAMES "columns=(key,val)"
+#define INDEX_COL_NAMES "columns=(key,val)"
/* Retrieve an ID for the next insert operation. */
static inline uint64_t
get_next_incr(WTPERF *wtperf)
{
- return (__wt_atomic_add64(&wtperf->insert_key, 1));
+ return (__wt_atomic_add64(&wtperf->insert_key, 1));
}
/*
- * Each time this function is called we will overwrite the first and one
- * other element in the value buffer.
+ * Each time this function is called we will overwrite the first and one other element in the value
+ * buffer.
*/
static void
randomize_value(WTPERF_THREAD *thread, char *value_buf)
{
- CONFIG_OPTS *opts;
- uint8_t *vb;
- uint32_t i, max_range, rand_val;
-
- opts = thread->wtperf->opts;
-
- /*
- * Limit how much of the buffer we validate for length, this means
- * that only threads that do growing updates will ever make changes to
- * values outside of the initial value size, but that's a fair trade
- * off for avoiding figuring out how long the value is more accurately
- * in this performance sensitive function.
- */
- if (thread->workload == NULL || thread->workload->update_delta == 0)
- max_range = opts->value_sz;
- else if (thread->workload->update_delta > 0)
- max_range = opts->value_sz_max;
- else
- max_range = opts->value_sz_min;
-
- /*
- * Generate a single random value and re-use it. We generally only
- * have small ranges in this function, so avoiding a bunch of calls
- * is worthwhile.
- */
- rand_val = __wt_random(&thread->rnd);
- i = rand_val % (max_range - 1);
-
- /*
- * Ensure we don't write past the end of a value when configured for
- * randomly sized values.
- */
- while (value_buf[i] == '\0' && i > 0)
- --i;
-
- vb = (uint8_t *)value_buf;
- vb[0] = ((rand_val >> 8) % 255) + 1;
- /*
- * If i happened to be 0, we'll be re-writing the same value
- * twice, but that doesn't matter.
- */
- vb[i] = ((rand_val >> 16) % 255) + 1;
+ CONFIG_OPTS *opts;
+ uint8_t *vb;
+ uint32_t i, max_range, rand_val;
+
+ opts = thread->wtperf->opts;
+
+ /*
+ * Limit how much of the buffer we validate for length, this means that only threads that do
+ * growing updates will ever make changes to values outside of the initial value size, but
+ * that's a fair trade off for avoiding figuring out how long the value is more accurately in
+ * this performance sensitive function.
+ */
+ if (thread->workload == NULL || thread->workload->update_delta == 0)
+ max_range = opts->value_sz;
+ else if (thread->workload->update_delta > 0)
+ max_range = opts->value_sz_max;
+ else
+ max_range = opts->value_sz_min;
+
+ /*
+ * Generate a single random value and re-use it. We generally only have small ranges in this
+ * function, so avoiding a bunch of calls is worthwhile.
+ */
+ rand_val = __wt_random(&thread->rnd);
+ i = rand_val % (max_range - 1);
+
+ /*
+ * Ensure we don't write past the end of a value when configured for randomly sized values.
+ */
+ while (value_buf[i] == '\0' && i > 0)
+ --i;
+
+ vb = (uint8_t *)value_buf;
+ vb[0] = ((rand_val >> 8) % 255) + 1;
+ /*
+ * If i happened to be 0, we'll be re-writing the same value twice, but that doesn't matter.
+ */
+ vb[i] = ((rand_val >> 16) % 255) + 1;
}
/*
@@ -119,355 +115,343 @@ randomize_value(WTPERF_THREAD *thread, char *value_buf)
static uint32_t
map_key_to_table(CONFIG_OPTS *opts, uint64_t k)
{
- if (opts->range_partition) {
- /* Take care to return a result in [0..table_count-1]. */
- if (k > opts->icount + opts->random_range)
- return (0);
- return ((uint32_t)((k - 1) /
- ((opts->icount + opts->random_range +
- opts->table_count - 1) / opts->table_count)));
- } else
- return ((uint32_t)(k % opts->table_count));
+ if (opts->range_partition) {
+ /* Take care to return a result in [0..table_count-1]. */
+ if (k > opts->icount + opts->random_range)
+ return (0);
+ return ((uint32_t)((k - 1) /
+ ((opts->icount + opts->random_range + opts->table_count - 1) / opts->table_count)));
+ } else
+ return ((uint32_t)(k % opts->table_count));
}
/*
- * Figure out and extend the size of the value string, used for growing
- * updates. We know that the value to be updated is in the threads value
- * scratch buffer.
+ * Figure out and extend the size of the value string, used for growing updates. We know that the
+ * value to be updated is in the threads value scratch buffer.
*/
static inline void
update_value_delta(WTPERF_THREAD *thread)
{
- CONFIG_OPTS *opts;
- WTPERF *wtperf;
- char * value;
- int64_t delta, len, new_len;
-
- wtperf = thread->wtperf;
- opts = wtperf->opts;
- value = thread->value_buf;
- delta = thread->workload->update_delta;
- len = (int64_t)strlen(value);
-
- if (delta == INT64_MAX)
- delta = __wt_random(&thread->rnd) %
- (opts->value_sz_max - opts->value_sz);
-
- /* Ensure we aren't changing across boundaries */
- if (delta > 0 && len + delta > opts->value_sz_max)
- delta = opts->value_sz_max - len;
- else if (delta < 0 && len + delta < opts->value_sz_min)
- delta = opts->value_sz_min - len;
-
- /* Bail if there isn't anything to do */
- if (delta == 0)
- return;
-
- if (delta < 0)
- value[len + delta] = '\0';
- else {
- /* Extend the value by the configured amount. */
- for (new_len = len;
- new_len < opts->value_sz_max && new_len - len < delta;
- new_len++)
- value[new_len] = 'a';
- }
+ CONFIG_OPTS *opts;
+ WTPERF *wtperf;
+ char *value;
+ int64_t delta, len, new_len;
+
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+ value = thread->value_buf;
+ delta = thread->workload->update_delta;
+ len = (int64_t)strlen(value);
+
+ if (delta == INT64_MAX)
+ delta = __wt_random(&thread->rnd) % (opts->value_sz_max - opts->value_sz);
+
+ /* Ensure we aren't changing across boundaries */
+ if (delta > 0 && len + delta > opts->value_sz_max)
+ delta = opts->value_sz_max - len;
+ else if (delta < 0 && len + delta < opts->value_sz_min)
+ delta = opts->value_sz_min - len;
+
+ /* Bail if there isn't anything to do */
+ if (delta == 0)
+ return;
+
+ if (delta < 0)
+ value[len + delta] = '\0';
+ else {
+ /* Extend the value by the configured amount. */
+ for (new_len = len; new_len < opts->value_sz_max && new_len - len < delta; new_len++)
+ value[new_len] = 'a';
+ }
}
static int
cb_asyncop(WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP *op, int ret, uint32_t flags)
{
- TRACK *trk;
- WTPERF *wtperf;
- WTPERF_THREAD *thread;
- WT_ASYNC_OPTYPE type;
- uint32_t *tables;
- int t_ret;
- char *value;
-
- (void)cb;
- (void)flags;
-
- wtperf = NULL; /* -Wconditional-uninitialized */
- thread = NULL; /* -Wconditional-uninitialized */
-
- type = op->get_type(op);
- if (type != WT_AOP_COMPACT) {
- thread = (WTPERF_THREAD *)op->app_private;
- wtperf = thread->wtperf;
- }
-
- trk = NULL;
- switch (type) {
- case WT_AOP_COMPACT:
- tables = (uint32_t *)op->app_private;
- (void)__wt_atomic_add32(tables, (uint32_t)-1);
- break;
- case WT_AOP_INSERT:
- trk = &thread->insert;
- break;
- case WT_AOP_SEARCH:
- trk = &thread->read;
- if (ret == 0 &&
- (t_ret = op->get_value(op, &value)) != 0) {
- ret = t_ret;
- lprintf(wtperf, ret, 0, "get_value in read.");
- goto err;
- }
- break;
- case WT_AOP_UPDATE:
- trk = &thread->update;
- break;
- case WT_AOP_NONE:
- case WT_AOP_REMOVE:
- /* We never expect this type. */
- lprintf(wtperf,
- ret, 0, "No type in op %" PRIu64, op->get_id(op));
- goto err;
- }
-
- /*
- * Either we have success and we track it, or failure and panic.
- *
- * Reads and updates can fail with WT_NOTFOUND: we may be searching
- * in a random range, or an insert op might have updated the
- * last record in the table but not yet finished the actual insert.
- */
- if (type == WT_AOP_COMPACT)
- return (0);
- if (ret == 0 || (ret == WT_NOTFOUND && type != WT_AOP_INSERT)) {
- if (!wtperf->in_warmup)
- (void)__wt_atomic_add64(&trk->ops, 1);
- return (0);
- }
+ TRACK *trk;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
+ WT_ASYNC_OPTYPE type;
+ uint32_t *tables;
+ int t_ret;
+ char *value;
+
+ (void)cb;
+ (void)flags;
+
+ wtperf = NULL; /* -Wconditional-uninitialized */
+ thread = NULL; /* -Wconditional-uninitialized */
+
+ type = op->get_type(op);
+ if (type != WT_AOP_COMPACT) {
+ thread = (WTPERF_THREAD *)op->app_private;
+ wtperf = thread->wtperf;
+ }
+
+ trk = NULL;
+ switch (type) {
+ case WT_AOP_COMPACT:
+ tables = (uint32_t *)op->app_private;
+ (void)__wt_atomic_add32(tables, (uint32_t)-1);
+ break;
+ case WT_AOP_INSERT:
+ trk = &thread->insert;
+ break;
+ case WT_AOP_SEARCH:
+ trk = &thread->read;
+ if (ret == 0 && (t_ret = op->get_value(op, &value)) != 0) {
+ ret = t_ret;
+ lprintf(wtperf, ret, 0, "get_value in read.");
+ goto err;
+ }
+ break;
+ case WT_AOP_UPDATE:
+ trk = &thread->update;
+ break;
+ case WT_AOP_NONE:
+ case WT_AOP_REMOVE:
+ /* We never expect this type. */
+ lprintf(wtperf, ret, 0, "No type in op %" PRIu64, op->get_id(op));
+ goto err;
+ }
+
+ /*
+ * Either we have success and we track it, or failure and panic.
+ *
+ * Reads and updates can fail with WT_NOTFOUND: we may be searching
+ * in a random range, or an insert op might have updated the
+ * last record in the table but not yet finished the actual insert.
+ */
+ if (type == WT_AOP_COMPACT)
+ return (0);
+ if (ret == 0 || (ret == WT_NOTFOUND && type != WT_AOP_INSERT)) {
+ if (!wtperf->in_warmup)
+ (void)__wt_atomic_add64(&trk->ops, 1);
+ return (0);
+ }
err:
- /* Panic if error */
- lprintf(wtperf, ret, 0, "Error in op %" PRIu64, op->get_id(op));
- wtperf->error = wtperf->stop = true;
- return (1);
+ /* Panic if error */
+ lprintf(wtperf, ret, 0, "Error in op %" PRIu64, op->get_id(op));
+ wtperf->error = wtperf->stop = true;
+ return (1);
}
-static WT_ASYNC_CALLBACK cb = { cb_asyncop };
+static WT_ASYNC_CALLBACK cb = {cb_asyncop};
/*
* track_operation --
- * Update an operation's tracking structure with new latency information.
+ * Update an operation's tracking structure with new latency information.
*/
static inline void
track_operation(TRACK *trk, uint64_t usecs)
{
- uint64_t v;
-
- /* average microseconds per call */
- v = (uint64_t)usecs;
-
- trk->latency += usecs; /* track total latency */
-
- if (v > trk->max_latency) /* track max/min latency */
- trk->max_latency = (uint32_t)v;
- if (v < trk->min_latency)
- trk->min_latency = (uint32_t)v;
-
- /*
- * Update a latency bucket.
- * First buckets: usecs from 100us to 1000us at 100us each.
- */
- if (v < 1000)
- ++trk->us[v];
-
- /*
- * Second buckets: milliseconds from 1ms to 1000ms, at 1ms each.
- */
- else if (v < ms_to_us(1000))
- ++trk->ms[us_to_ms(v)];
-
- /*
- * Third buckets are seconds from 1s to 100s, at 1s each.
- */
- else if (v < sec_to_us(100))
- ++trk->sec[us_to_sec(v)];
-
- /* >100 seconds, accumulate in the biggest bucket. */
- else
- ++trk->sec[ELEMENTS(trk->sec) - 1];
+ uint64_t v;
+
+ /* average microseconds per call */
+ v = (uint64_t)usecs;
+
+ trk->latency += usecs; /* track total latency */
+
+ if (v > trk->max_latency) /* track max/min latency */
+ trk->max_latency = (uint32_t)v;
+ if (v < trk->min_latency)
+ trk->min_latency = (uint32_t)v;
+
+ /*
+ * Update a latency bucket. First buckets: usecs from 100us to 1000us at 100us each.
+ */
+ if (v < 1000)
+ ++trk->us[v];
+
+ /*
+ * Second buckets: milliseconds from 1ms to 1000ms, at 1ms each.
+ */
+ else if (v < ms_to_us(1000))
+ ++trk->ms[us_to_ms(v)];
+
+ /*
+ * Third buckets are seconds from 1s to 100s, at 1s each.
+ */
+ else if (v < sec_to_us(100))
+ ++trk->sec[us_to_sec(v)];
+
+ /* >100 seconds, accumulate in the biggest bucket. */
+ else
+ ++trk->sec[ELEMENTS(trk->sec) - 1];
}
static const char *
op_name(uint8_t *op)
{
- switch (*op) {
- case WORKER_INSERT:
- return ("insert");
- case WORKER_INSERT_RMW:
- return ("insert_rmw");
- case WORKER_READ:
- return ("read");
- case WORKER_TRUNCATE:
- return ("truncate");
- case WORKER_UPDATE:
- return ("update");
- default:
- return ("unknown");
- }
- /* NOTREACHED */
+ switch (*op) {
+ case WORKER_INSERT:
+ return ("insert");
+ case WORKER_INSERT_RMW:
+ return ("insert_rmw");
+ case WORKER_READ:
+ return ("read");
+ case WORKER_TRUNCATE:
+ return ("truncate");
+ case WORKER_UPDATE:
+ return ("update");
+ default:
+ return ("unknown");
+ }
+ /* NOTREACHED */
}
static WT_THREAD_RET
worker_async(void *arg)
{
- CONFIG_OPTS *opts;
- WTPERF *wtperf;
- WTPERF_THREAD *thread;
- WT_ASYNC_OP *asyncop;
- WT_CONNECTION *conn;
- uint64_t next_val;
- uint8_t *op, *op_end;
- int ret;
- char *key_buf, *value_buf;
-
- thread = (WTPERF_THREAD *)arg;
- wtperf = thread->wtperf;
- opts = wtperf->opts;
- conn = wtperf->conn;
-
- key_buf = thread->key_buf;
- value_buf = thread->value_buf;
-
- op = thread->workload->ops;
- op_end = op + sizeof(thread->workload->ops);
-
- while (!wtperf->stop) {
- /*
- * Generate the next key and setup operation specific
- * statistics tracking objects.
- */
- switch (*op) {
- case WORKER_INSERT:
- case WORKER_INSERT_RMW:
- if (opts->random_range)
- next_val = wtperf_rand(thread);
- else
- next_val = opts->icount + get_next_incr(wtperf);
- break;
- case WORKER_READ:
- case WORKER_UPDATE:
- next_val = wtperf_rand(thread);
-
- /*
- * If the workload is started without a populate phase
- * we rely on at least one insert to get a valid item
- * id.
- */
- if (wtperf_value_range(wtperf) < next_val)
- continue;
- break;
- default:
- goto err; /* can't happen */
- }
-
- generate_key(opts, key_buf, next_val);
-
- /*
- * Spread the data out around the multiple databases.
- * Sleep to allow workers a chance to run and process async ops.
- * Then retry to get an async op.
- */
- while ((ret = conn->async_new_op(conn,
- wtperf->uris[map_key_to_table(wtperf->opts, next_val)],
- NULL, &cb, &asyncop)) == EBUSY)
- (void)usleep(10000);
- if (ret != 0)
- goto err;
-
- asyncop->app_private = thread;
- asyncop->set_key(asyncop, key_buf);
- switch (*op) {
- case WORKER_READ:
- ret = asyncop->search(asyncop);
- if (ret == 0)
- break;
- goto op_err;
- case WORKER_INSERT:
- if (opts->random_value)
- randomize_value(thread, value_buf);
- asyncop->set_value(asyncop, value_buf);
- if ((ret = asyncop->insert(asyncop)) == 0)
- break;
- goto op_err;
- case WORKER_UPDATE:
- if (opts->random_value)
- randomize_value(thread, value_buf);
- asyncop->set_value(asyncop, value_buf);
- if ((ret = asyncop->update(asyncop)) == 0)
- break;
- goto op_err;
- default:
-op_err: lprintf(wtperf, ret, 0,
- "%s failed for: %s, range: %"PRIu64,
- op_name(op), key_buf, wtperf_value_range(wtperf));
- goto err; /* can't happen */
- }
-
- /* Schedule the next operation */
- if (++op == op_end)
- op = thread->workload->ops;
- }
-
- if (conn->async_flush(conn) != 0)
- goto err;
-
- /* Notify our caller we failed and shut the system down. */
- if (0) {
-err: wtperf->error = wtperf->stop = true;
- }
- return (WT_THREAD_RET_VALUE);
+ CONFIG_OPTS *opts;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
+ WT_ASYNC_OP *asyncop;
+ WT_CONNECTION *conn;
+ uint64_t next_val;
+ uint8_t *op, *op_end;
+ int ret;
+ char *key_buf, *value_buf;
+
+ thread = (WTPERF_THREAD *)arg;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+ conn = wtperf->conn;
+
+ key_buf = thread->key_buf;
+ value_buf = thread->value_buf;
+
+ op = thread->workload->ops;
+ op_end = op + sizeof(thread->workload->ops);
+
+ while (!wtperf->stop) {
+ /*
+ * Generate the next key and setup operation specific statistics tracking objects.
+ */
+ switch (*op) {
+ case WORKER_INSERT:
+ case WORKER_INSERT_RMW:
+ if (opts->random_range)
+ next_val = wtperf_rand(thread);
+ else
+ next_val = opts->icount + get_next_incr(wtperf);
+ break;
+ case WORKER_READ:
+ case WORKER_UPDATE:
+ next_val = wtperf_rand(thread);
+
+ /*
+ * If the workload is started without a populate phase we rely on at least one insert to
+ * get a valid item id.
+ */
+ if (wtperf_value_range(wtperf) < next_val)
+ continue;
+ break;
+ default:
+ goto err; /* can't happen */
+ }
+
+ generate_key(opts, key_buf, next_val);
+
+ /*
+ * Spread the data out around the multiple databases. Sleep to allow workers a chance to run
+ * and process async ops. Then retry to get an async op.
+ */
+ while (
+ (ret = conn->async_new_op(conn, wtperf->uris[map_key_to_table(wtperf->opts, next_val)],
+ NULL, &cb, &asyncop)) == EBUSY)
+ (void)usleep(10000);
+ if (ret != 0)
+ goto err;
+
+ asyncop->app_private = thread;
+ asyncop->set_key(asyncop, key_buf);
+ switch (*op) {
+ case WORKER_READ:
+ ret = asyncop->search(asyncop);
+ if (ret == 0)
+ break;
+ goto op_err;
+ case WORKER_INSERT:
+ if (opts->random_value)
+ randomize_value(thread, value_buf);
+ asyncop->set_value(asyncop, value_buf);
+ if ((ret = asyncop->insert(asyncop)) == 0)
+ break;
+ goto op_err;
+ case WORKER_UPDATE:
+ if (opts->random_value)
+ randomize_value(thread, value_buf);
+ asyncop->set_value(asyncop, value_buf);
+ if ((ret = asyncop->update(asyncop)) == 0)
+ break;
+ goto op_err;
+ default:
+ op_err:
+ lprintf(wtperf, ret, 0, "%s failed for: %s, range: %" PRIu64, op_name(op), key_buf,
+ wtperf_value_range(wtperf));
+ goto err; /* can't happen */
+ }
+
+ /* Schedule the next operation */
+ if (++op == op_end)
+ op = thread->workload->ops;
+ }
+
+ if (conn->async_flush(conn) != 0)
+ goto err;
+
+ /* Notify our caller we failed and shut the system down. */
+ if (0) {
+err:
+ wtperf->error = wtperf->stop = true;
+ }
+ return (WT_THREAD_RET_VALUE);
}
/*
* do_range_reads --
- * If configured to execute a sequence of next operations after each
- * search do them. Ensuring the keys we see are always in order.
+ * If configured to execute a sequence of next operations after each search do them. Ensuring
+ * the keys we see are always in order.
*/
static int
do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor, int64_t read_range)
{
- uint64_t next_val, prev_val;
- int64_t range;
- char *range_key_buf;
- char buf[512];
- int ret;
-
- ret = 0;
-
- if (read_range == 0)
- return (0);
-
- memset(&buf[0], 0, 512 * sizeof(char));
- range_key_buf = &buf[0];
-
- /* Save where the first key is for comparisons. */
- testutil_check(cursor->get_key(cursor, &range_key_buf));
- extract_key(range_key_buf, &next_val);
-
- for (range = 0; range < read_range; ++range) {
- prev_val = next_val;
- ret = cursor->next(cursor);
- /* We are done if we reach the end. */
- if (ret != 0)
- break;
-
- /* Retrieve and decode the key */
- testutil_check(cursor->get_key(cursor, &range_key_buf));
- extract_key(range_key_buf, &next_val);
- if (next_val < prev_val) {
- lprintf(wtperf, EINVAL, 0,
- "Out of order keys %" PRIu64
- " came before %" PRIu64,
- prev_val, next_val);
- return (EINVAL);
- }
- }
- return (0);
+ uint64_t next_val, prev_val;
+ int64_t range;
+ char *range_key_buf;
+ char buf[512];
+ int ret;
+
+ ret = 0;
+
+ if (read_range == 0)
+ return (0);
+
+ memset(&buf[0], 0, 512 * sizeof(char));
+ range_key_buf = &buf[0];
+
+ /* Save where the first key is for comparisons. */
+ testutil_check(cursor->get_key(cursor, &range_key_buf));
+ extract_key(range_key_buf, &next_val);
+
+ for (range = 0; range < read_range; ++range) {
+ prev_val = next_val;
+ ret = cursor->next(cursor);
+ /* We are done if we reach the end. */
+ if (ret != 0)
+ break;
+
+ /* Retrieve and decode the key */
+ testutil_check(cursor->get_key(cursor, &range_key_buf));
+ extract_key(range_key_buf, &next_val);
+ if (next_val < prev_val) {
+ lprintf(wtperf, EINVAL, 0, "Out of order keys %" PRIu64 " came before %" PRIu64,
+ prev_val, next_val);
+ return (EINVAL);
+ }
+ }
+ return (0);
}
/* pre_load_data --
@@ -476,1491 +460,1337 @@ do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor, int64_t read_range)
static void
pre_load_data(WTPERF *wtperf)
{
- CONFIG_OPTS *opts;
- WT_CONNECTION *conn;
- WT_CURSOR *cursor;
- WT_SESSION *session;
- size_t i;
- int ret;
- char *key;
-
- opts = wtperf->opts;
- conn = wtperf->conn;
-
- testutil_check(conn->open_session(
- conn, NULL, opts->sess_config, &session));
- for (i = 0; i < opts->table_count; i++) {
- testutil_check(session->open_cursor(
- session, wtperf->uris[i], NULL, NULL, &cursor));
- while ((ret = cursor->next(cursor)) == 0)
- testutil_check(cursor->get_key(cursor, &key));
- testutil_assert(ret == WT_NOTFOUND);
- testutil_check(cursor->close(cursor));
- }
- testutil_check(session->close(session, NULL));
+ CONFIG_OPTS *opts;
+ WT_CONNECTION *conn;
+ WT_CURSOR *cursor;
+ WT_SESSION *session;
+ size_t i;
+ int ret;
+ char *key;
+
+ opts = wtperf->opts;
+ conn = wtperf->conn;
+
+ testutil_check(conn->open_session(conn, NULL, opts->sess_config, &session));
+ for (i = 0; i < opts->table_count; i++) {
+ testutil_check(session->open_cursor(session, wtperf->uris[i], NULL, NULL, &cursor));
+ while ((ret = cursor->next(cursor)) == 0)
+ testutil_check(cursor->get_key(cursor, &key));
+ testutil_assert(ret == WT_NOTFOUND);
+ testutil_check(cursor->close(cursor));
+ }
+ testutil_check(session->close(session, NULL));
}
static WT_THREAD_RET
worker(void *arg)
{
- struct timespec start, stop;
- CONFIG_OPTS *opts;
- TRACK *trk;
- WORKLOAD *workload;
- WTPERF *wtperf;
- WTPERF_THREAD *thread;
- WT_CONNECTION *conn;
- WT_CURSOR **cursors, *cursor, *log_table_cursor, *tmp_cursor;
- WT_SESSION *session;
- size_t i;
- int64_t ops, ops_per_txn;
- uint64_t log_id, next_val, usecs;
- uint8_t *op, *op_end;
- int measure_latency, ret, truncated;
- char *value_buf, *key_buf, *value;
- char buf[512];
-
- thread = (WTPERF_THREAD *)arg;
- workload = thread->workload;
- wtperf = thread->wtperf;
- opts = wtperf->opts;
- conn = wtperf->conn;
- cursors = NULL;
- cursor = log_table_cursor = NULL; /* -Wconditional-initialized */
- ops = 0;
- ops_per_txn = workload->ops_per_txn;
- session = NULL;
- trk = NULL;
-
- if ((ret = conn->open_session(
- conn, NULL, opts->sess_config, &session)) != 0) {
- lprintf(wtperf, ret, 0, "worker: WT_CONNECTION.open_session");
- goto err;
- }
- for (i = 0; i < opts->table_count_idle; i++) {
- testutil_check(__wt_snprintf(
- buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i));
- if ((ret = session->open_cursor(
- session, buf, NULL, NULL, &tmp_cursor)) != 0) {
- lprintf(wtperf, ret, 0,
- "Error opening idle table %s", buf);
- goto err;
- }
- if ((ret = tmp_cursor->close(tmp_cursor)) != 0) {
- lprintf(wtperf, ret, 0,
- "Error closing idle table %s", buf);
- goto err;
- }
- }
- if (workload->table_index != INT32_MAX) {
- if ((ret = session->open_cursor(session,
- wtperf->uris[workload->table_index],
- NULL, NULL, &cursor)) != 0) {
- lprintf(wtperf, ret, 0,
- "worker: WT_SESSION.open_cursor: %s",
- wtperf->uris[workload->table_index]);
- goto err;
- }
- if ((ret = session->open_cursor(session,
- wtperf->uris[workload->table_index],
- NULL, "next_random=true", &thread->rand_cursor)) != 0) {
- lprintf(wtperf, ret, 0,
- "worker: WT_SESSION.open_cursor: random %s",
- wtperf->uris[workload->table_index]);
- goto err;
- }
- } else {
- cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
- for (i = 0; i < opts->table_count; i++) {
- if ((ret = session->open_cursor(session,
- wtperf->uris[i], NULL, NULL, &cursors[i])) != 0) {
- lprintf(wtperf, ret, 0,
- "worker: WT_SESSION.open_cursor: %s",
- wtperf->uris[i]);
- goto err;
- }
- }
- }
- if (opts->log_like_table && (ret = session->open_cursor(session,
- wtperf->log_table_uri, NULL, NULL, &log_table_cursor)) != 0) {
- lprintf(wtperf, ret, 0,
- "worker: WT_SESSION.open_cursor: %s",
- wtperf->log_table_uri);
- goto err;
- }
-
- /* Setup the timer for throttling. */
- if (workload->throttle != 0)
- setup_throttle(thread);
-
- /* Setup for truncate */
- if (workload->truncate != 0)
- setup_truncate(wtperf, thread, session);
-
- key_buf = thread->key_buf;
- value_buf = thread->value_buf;
-
- op = workload->ops;
- op_end = op + sizeof(workload->ops);
-
- if ((ops_per_txn != 0 || opts->log_like_table) &&
- (ret = session->begin_transaction(session, NULL)) != 0) {
- lprintf(wtperf, ret, 0, "First transaction begin failed");
- goto err;
- }
-
- while (!wtperf->stop) {
- if (workload->pause != 0)
- (void)sleep((unsigned int)workload->pause);
- /*
- * Generate the next key and setup operation specific
- * statistics tracking objects.
- */
- switch (*op) {
- case WORKER_INSERT:
- case WORKER_INSERT_RMW:
- trk = &thread->insert;
- if (opts->random_range)
- next_val = wtperf_rand(thread);
- else
- next_val = opts->icount + get_next_incr(wtperf);
- break;
- case WORKER_READ:
- trk = &thread->read;
- /* FALLTHROUGH */
- case WORKER_UPDATE:
- if (*op == WORKER_UPDATE)
- trk = &thread->update;
- next_val = wtperf_rand(thread);
-
- /*
- * If the workload is started without a populate phase
- * we rely on at least one insert to get a valid item
- * id.
- */
- if (wtperf_value_range(wtperf) < next_val)
- continue;
- break;
- case WORKER_TRUNCATE:
- /* Required but not used. */
- next_val = wtperf_rand(thread);
- break;
- default:
- goto err; /* can't happen */
- }
-
- generate_key(opts, key_buf, next_val);
-
- if (workload->table_index == INT32_MAX)
- /*
- * Spread the data out around the multiple databases.
- */
- cursor = cursors[
- map_key_to_table(wtperf->opts, next_val)];
-
- /*
- * Skip the first time we do an operation, when trk->ops
- * is 0, to avoid first time latency spikes.
- */
- measure_latency =
- opts->sample_interval != 0 && trk != NULL &&
- trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
- if (measure_latency)
- __wt_epoch(NULL, &start);
-
- cursor->set_key(cursor, key_buf);
-
- switch (*op) {
- case WORKER_READ:
- /*
- * Reads can fail with WT_NOTFOUND: we may be searching
- * in a random range, or an insert thread might have
- * updated the last record in the table but not yet
- * finished the actual insert. Count failed search in
- * a random range as a "read".
- */
- ret = cursor->search(cursor);
- if (ret == 0) {
- if ((ret = cursor->get_value(
- cursor, &value)) != 0) {
- lprintf(wtperf, ret, 0,
- "get_value in read.");
- goto err;
- }
- /*
- * If we want to read a range, then call next
- * for several operations, confirming that the
- * next key is in the correct order.
- */
- ret = do_range_reads(wtperf,
- cursor, workload->read_range);
- }
-
- if (ret == 0 || ret == WT_NOTFOUND)
- break;
- goto op_err;
- case WORKER_INSERT_RMW:
- if ((ret = cursor->search(cursor)) != WT_NOTFOUND)
- goto op_err;
-
- /* The error return reset the cursor's key. */
- cursor->set_key(cursor, key_buf);
-
- /* FALLTHROUGH */
- case WORKER_INSERT:
- if (opts->random_value)
- randomize_value(thread, value_buf);
- cursor->set_value(cursor, value_buf);
- if ((ret = cursor->insert(cursor)) == 0)
- break;
- goto op_err;
- case WORKER_TRUNCATE:
- if ((ret = run_truncate(wtperf,
- thread, cursor, session, &truncated)) == 0) {
- if (truncated)
- trk = &thread->truncate;
- else
- trk = &thread->truncate_sleep;
- /* Pause between truncate attempts */
- (void)usleep(1000);
- break;
- }
- goto op_err;
- case WORKER_UPDATE:
- if ((ret = cursor->search(cursor)) == 0) {
- if ((ret = cursor->get_value(
- cursor, &value)) != 0) {
- lprintf(wtperf, ret, 0,
- "get_value in update.");
- goto err;
- }
- /*
- * Copy as much of the previous value as is
- * safe, and be sure to NUL-terminate.
- */
- strncpy(value_buf,
- value, opts->value_sz_max - 1);
- if (workload->update_delta != 0)
- update_value_delta(thread);
- if (value_buf[0] == 'a')
- value_buf[0] = 'b';
- else
- value_buf[0] = 'a';
- if (opts->random_value)
- randomize_value(thread, value_buf);
- cursor->set_value(cursor, value_buf);
- if ((ret = cursor->update(cursor)) == 0)
- break;
- goto op_err;
- }
-
- /*
- * Reads can fail with WT_NOTFOUND: we may be searching
- * in a random range, or an insert thread might have
- * updated the last record in the table but not yet
- * finished the actual insert. Count failed search in
- * a random range as a "read".
- */
- if (ret == WT_NOTFOUND)
- break;
-
-op_err: if (ret == WT_ROLLBACK && ops_per_txn != 0) {
- /*
- * If we are running with explicit transactions
- * configured and we hit a WT_ROLLBACK, then we
- * should rollback the current transaction and
- * attempt to continue.
- * This does break the guarantee of insertion
- * order in cases of ordered inserts, as we
- * aren't retrying here.
- */
- lprintf(wtperf, ret, 1,
- "%s for: %s, range: %"PRIu64, op_name(op),
- key_buf, wtperf_value_range(wtperf));
- if ((ret = session->rollback_transaction(
- session, NULL)) != 0) {
- lprintf(wtperf, ret, 0,
- "Failed rollback_transaction");
- goto err;
- }
- if ((ret = session->begin_transaction(
- session, NULL)) != 0) {
- lprintf(wtperf, ret, 0,
- "Worker begin transaction failed");
- goto err;
- }
- break;
- }
- lprintf(wtperf, ret, 0,
- "%s failed for: %s, range: %"PRIu64,
- op_name(op), key_buf, wtperf_value_range(wtperf));
- goto err;
- default:
- goto err; /* can't happen */
- }
-
- /* Update the log-like table. */
- if (opts->log_like_table &&
- (*op != WORKER_READ && *op != WORKER_TRUNCATE)) {
- log_id =
- __wt_atomic_add64(&wtperf->log_like_table_key, 1);
- log_table_cursor->set_key(log_table_cursor, log_id);
- log_table_cursor->set_value(
- log_table_cursor, value_buf);
- if ((ret =
- log_table_cursor->insert(log_table_cursor)) != 0) {
- lprintf(wtperf, ret, 1, "Cursor insert failed");
- if (ret == WT_ROLLBACK && ops_per_txn == 0) {
- lprintf(wtperf, ret, 1,
- "log-table: ROLLBACK");
- if ((ret =
- session->rollback_transaction(
- session, NULL)) != 0) {
- lprintf(wtperf, ret, 0, "Failed"
- " rollback_transaction");
- goto err;
- }
- if ((ret = session->begin_transaction(
- session, NULL)) != 0) {
- lprintf(wtperf, ret, 0,
- "Worker begin "
- "transaction failed");
- goto err;
- }
- } else
- goto err;
- }
- }
-
- /* Release the cursor, if we have multiple tables. */
- if (opts->table_count > 1 && ret == 0 &&
- *op != WORKER_INSERT && *op != WORKER_INSERT_RMW) {
- if ((ret = cursor->reset(cursor)) != 0) {
- lprintf(wtperf, ret, 0, "Cursor reset failed");
- goto err;
- }
- }
-
- /* Gather statistics */
- if (!wtperf->in_warmup) {
- if (measure_latency) {
- __wt_epoch(NULL, &stop);
- ++trk->latency_ops;
- usecs = WT_TIMEDIFF_US(stop, start);
- track_operation(trk, usecs);
- }
- /* Increment operation count */
- ++trk->ops;
- }
-
- /*
- * Commit the transaction if grouping operations together
- * or tracking changes in our log table.
- */
- if ((opts->log_like_table && ops_per_txn == 0) ||
- (ops_per_txn != 0 && ops++ % ops_per_txn == 0)) {
- if ((ret = session->commit_transaction(
- session, NULL)) != 0) {
- lprintf(wtperf, ret, 0,
- "Worker transaction commit failed");
- goto err;
- }
- if ((ret = session->begin_transaction(
- session, NULL)) != 0) {
- lprintf(wtperf, ret, 0,
- "Worker begin transaction failed");
- goto err;
- }
- }
-
- /* Schedule the next operation */
- if (++op == op_end)
- op = workload->ops;
-
- /*
- * Decrement throttle ops and check if we should sleep
- * and then get more work to perform.
- */
- if (--thread->throttle_cfg.ops_count == 0)
- worker_throttle(thread);
-
- }
-
- if ((ret = session->close(session, NULL)) != 0) {
- lprintf(wtperf, ret, 0, "Session close in worker failed");
- goto err;
- }
-
- /* Notify our caller we failed and shut the system down. */
- if (0) {
-err: wtperf->error = wtperf->stop = true;
- }
- free(cursors);
-
- return (WT_THREAD_RET_VALUE);
+ struct timespec start, stop;
+ CONFIG_OPTS *opts;
+ TRACK *trk;
+ WORKLOAD *workload;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
+ WT_CONNECTION *conn;
+ WT_CURSOR **cursors, *cursor, *log_table_cursor, *tmp_cursor;
+ WT_SESSION *session;
+ size_t i;
+ int64_t ops, ops_per_txn;
+ uint64_t log_id, next_val, usecs;
+ uint8_t *op, *op_end;
+ int measure_latency, ret, truncated;
+ char *value_buf, *key_buf, *value;
+ char buf[512];
+
+ thread = (WTPERF_THREAD *)arg;
+ workload = thread->workload;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+ conn = wtperf->conn;
+ cursors = NULL;
+ cursor = log_table_cursor = NULL; /* -Wconditional-initialized */
+ ops = 0;
+ ops_per_txn = workload->ops_per_txn;
+ session = NULL;
+ trk = NULL;
+
+ if ((ret = conn->open_session(conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0, "worker: WT_CONNECTION.open_session");
+ goto err;
+ }
+ for (i = 0; i < opts->table_count_idle; i++) {
+ testutil_check(__wt_snprintf(buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i));
+ if ((ret = session->open_cursor(session, buf, NULL, NULL, &tmp_cursor)) != 0) {
+ lprintf(wtperf, ret, 0, "Error opening idle table %s", buf);
+ goto err;
+ }
+ if ((ret = tmp_cursor->close(tmp_cursor)) != 0) {
+ lprintf(wtperf, ret, 0, "Error closing idle table %s", buf);
+ goto err;
+ }
+ }
+ if (workload->table_index != INT32_MAX) {
+ if ((ret = session->open_cursor(
+ session, wtperf->uris[workload->table_index], NULL, NULL, &cursor)) != 0) {
+ lprintf(wtperf, ret, 0, "worker: WT_SESSION.open_cursor: %s",
+ wtperf->uris[workload->table_index]);
+ goto err;
+ }
+ if ((ret = session->open_cursor(session, wtperf->uris[workload->table_index], NULL,
+ "next_random=true", &thread->rand_cursor)) != 0) {
+ lprintf(wtperf, ret, 0, "worker: WT_SESSION.open_cursor: random %s",
+ wtperf->uris[workload->table_index]);
+ goto err;
+ }
+ } else {
+ cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
+ for (i = 0; i < opts->table_count; i++) {
+ if ((ret = session->open_cursor(session, wtperf->uris[i], NULL, NULL, &cursors[i])) !=
+ 0) {
+ lprintf(wtperf, ret, 0, "worker: WT_SESSION.open_cursor: %s", wtperf->uris[i]);
+ goto err;
+ }
+ }
+ }
+ if (opts->log_like_table &&
+ (ret = session->open_cursor(session, wtperf->log_table_uri, NULL, NULL, &log_table_cursor)) !=
+ 0) {
+ lprintf(wtperf, ret, 0, "worker: WT_SESSION.open_cursor: %s", wtperf->log_table_uri);
+ goto err;
+ }
+
+ /* Setup the timer for throttling. */
+ if (workload->throttle != 0)
+ setup_throttle(thread);
+
+ /* Setup for truncate */
+ if (workload->truncate != 0)
+ setup_truncate(wtperf, thread, session);
+
+ key_buf = thread->key_buf;
+ value_buf = thread->value_buf;
+
+ op = workload->ops;
+ op_end = op + sizeof(workload->ops);
+
+ if ((ops_per_txn != 0 || opts->log_like_table) &&
+ (ret = session->begin_transaction(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "First transaction begin failed");
+ goto err;
+ }
+
+ while (!wtperf->stop) {
+ if (workload->pause != 0)
+ (void)sleep((unsigned int)workload->pause);
+ /*
+ * Generate the next key and setup operation specific statistics tracking objects.
+ */
+ switch (*op) {
+ case WORKER_INSERT:
+ case WORKER_INSERT_RMW:
+ trk = &thread->insert;
+ if (opts->random_range)
+ next_val = wtperf_rand(thread);
+ else
+ next_val = opts->icount + get_next_incr(wtperf);
+ break;
+ case WORKER_READ:
+ trk = &thread->read;
+ /* FALLTHROUGH */
+ case WORKER_UPDATE:
+ if (*op == WORKER_UPDATE)
+ trk = &thread->update;
+ next_val = wtperf_rand(thread);
+
+ /*
+ * If the workload is started without a populate phase we rely on at least one insert to
+ * get a valid item id.
+ */
+ if (wtperf_value_range(wtperf) < next_val)
+ continue;
+ break;
+ case WORKER_TRUNCATE:
+ /* Required but not used. */
+ next_val = wtperf_rand(thread);
+ break;
+ default:
+ goto err; /* can't happen */
+ }
+
+ generate_key(opts, key_buf, next_val);
+
+ if (workload->table_index == INT32_MAX)
+ /*
+ * Spread the data out around the multiple databases.
+ */
+ cursor = cursors[map_key_to_table(wtperf->opts, next_val)];
+
+ /*
+ * Skip the first time we do an operation, when trk->ops is 0, to avoid first time latency
+ * spikes.
+ */
+ measure_latency = opts->sample_interval != 0 && trk != NULL && trk->ops != 0 &&
+ (trk->ops % opts->sample_rate == 0);
+ if (measure_latency)
+ __wt_epoch(NULL, &start);
+
+ cursor->set_key(cursor, key_buf);
+
+ switch (*op) {
+ case WORKER_READ:
+ /*
+ * Reads can fail with WT_NOTFOUND: we may be searching in a random range, or an insert
+ * thread might have updated the last record in the table but not yet finished the
+ * actual insert. Count failed search in a random range as a "read".
+ */
+ ret = cursor->search(cursor);
+ if (ret == 0) {
+ if ((ret = cursor->get_value(cursor, &value)) != 0) {
+ lprintf(wtperf, ret, 0, "get_value in read.");
+ goto err;
+ }
+ /*
+ * If we want to read a range, then call next for several operations, confirming
+ * that the next key is in the correct order.
+ */
+ ret = do_range_reads(wtperf, cursor, workload->read_range);
+ }
+
+ if (ret == 0 || ret == WT_NOTFOUND)
+ break;
+ goto op_err;
+ case WORKER_INSERT_RMW:
+ if ((ret = cursor->search(cursor)) != WT_NOTFOUND)
+ goto op_err;
+
+ /* The error return reset the cursor's key. */
+ cursor->set_key(cursor, key_buf);
+
+ /* FALLTHROUGH */
+ case WORKER_INSERT:
+ if (opts->random_value)
+ randomize_value(thread, value_buf);
+ cursor->set_value(cursor, value_buf);
+ if ((ret = cursor->insert(cursor)) == 0)
+ break;
+ goto op_err;
+ case WORKER_TRUNCATE:
+ if ((ret = run_truncate(wtperf, thread, cursor, session, &truncated)) == 0) {
+ if (truncated)
+ trk = &thread->truncate;
+ else
+ trk = &thread->truncate_sleep;
+ /* Pause between truncate attempts */
+ (void)usleep(1000);
+ break;
+ }
+ goto op_err;
+ case WORKER_UPDATE:
+ if ((ret = cursor->search(cursor)) == 0) {
+ if ((ret = cursor->get_value(cursor, &value)) != 0) {
+ lprintf(wtperf, ret, 0, "get_value in update.");
+ goto err;
+ }
+ /*
+ * Copy as much of the previous value as is safe, and be sure to NUL-terminate.
+ */
+ strncpy(value_buf, value, opts->value_sz_max - 1);
+ if (workload->update_delta != 0)
+ update_value_delta(thread);
+ if (value_buf[0] == 'a')
+ value_buf[0] = 'b';
+ else
+ value_buf[0] = 'a';
+ if (opts->random_value)
+ randomize_value(thread, value_buf);
+ cursor->set_value(cursor, value_buf);
+ if ((ret = cursor->update(cursor)) == 0)
+ break;
+ goto op_err;
+ }
+
+ /*
+ * Reads can fail with WT_NOTFOUND: we may be searching in a random range, or an insert
+ * thread might have updated the last record in the table but not yet finished the
+ * actual insert. Count failed search in a random range as a "read".
+ */
+ if (ret == WT_NOTFOUND)
+ break;
+
+ op_err:
+ if (ret == WT_ROLLBACK && ops_per_txn != 0) {
+ /*
+ * If we are running with explicit transactions configured and we hit a WT_ROLLBACK,
+ * then we should rollback the current transaction and attempt to continue. This
+ * does break the guarantee of insertion order in cases of ordered inserts, as we
+ * aren't retrying here.
+ */
+ lprintf(wtperf, ret, 1, "%s for: %s, range: %" PRIu64, op_name(op), key_buf,
+ wtperf_value_range(wtperf));
+ if ((ret = session->rollback_transaction(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Failed rollback_transaction");
+ goto err;
+ }
+ if ((ret = session->begin_transaction(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Worker begin transaction failed");
+ goto err;
+ }
+ break;
+ }
+ lprintf(wtperf, ret, 0, "%s failed for: %s, range: %" PRIu64, op_name(op), key_buf,
+ wtperf_value_range(wtperf));
+ goto err;
+ default:
+ goto err; /* can't happen */
+ }
+
+ /* Update the log-like table. */
+ if (opts->log_like_table && (*op != WORKER_READ && *op != WORKER_TRUNCATE)) {
+ log_id = __wt_atomic_add64(&wtperf->log_like_table_key, 1);
+ log_table_cursor->set_key(log_table_cursor, log_id);
+ log_table_cursor->set_value(log_table_cursor, value_buf);
+ if ((ret = log_table_cursor->insert(log_table_cursor)) != 0) {
+ lprintf(wtperf, ret, 1, "Cursor insert failed");
+ if (ret == WT_ROLLBACK && ops_per_txn == 0) {
+ lprintf(wtperf, ret, 1, "log-table: ROLLBACK");
+ if ((ret = session->rollback_transaction(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0,
+ "Failed"
+ " rollback_transaction");
+ goto err;
+ }
+ if ((ret = session->begin_transaction(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0,
+ "Worker begin "
+ "transaction failed");
+ goto err;
+ }
+ } else
+ goto err;
+ }
+ }
+
+ /* Release the cursor, if we have multiple tables. */
+ if (opts->table_count > 1 && ret == 0 && *op != WORKER_INSERT && *op != WORKER_INSERT_RMW) {
+ if ((ret = cursor->reset(cursor)) != 0) {
+ lprintf(wtperf, ret, 0, "Cursor reset failed");
+ goto err;
+ }
+ }
+
+ /* Gather statistics */
+ if (!wtperf->in_warmup) {
+ if (measure_latency) {
+ __wt_epoch(NULL, &stop);
+ ++trk->latency_ops;
+ usecs = WT_TIMEDIFF_US(stop, start);
+ track_operation(trk, usecs);
+ }
+ /* Increment operation count */
+ ++trk->ops;
+ }
+
+ /*
+ * Commit the transaction if grouping operations together or tracking changes in our log
+ * table.
+ */
+ if ((opts->log_like_table && ops_per_txn == 0) ||
+ (ops_per_txn != 0 && ops++ % ops_per_txn == 0)) {
+ if ((ret = session->commit_transaction(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Worker transaction commit failed");
+ goto err;
+ }
+ if ((ret = session->begin_transaction(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Worker begin transaction failed");
+ goto err;
+ }
+ }
+
+ /* Schedule the next operation */
+ if (++op == op_end)
+ op = workload->ops;
+
+ /*
+ * Decrement throttle ops and check if we should sleep and then get more work to perform.
+ */
+ if (--thread->throttle_cfg.ops_count == 0)
+ worker_throttle(thread);
+ }
+
+ if ((ret = session->close(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Session close in worker failed");
+ goto err;
+ }
+
+ /* Notify our caller we failed and shut the system down. */
+ if (0) {
+err:
+ wtperf->error = wtperf->stop = true;
+ }
+ free(cursors);
+
+ return (WT_THREAD_RET_VALUE);
}
/*
* run_mix_schedule_op --
- * Replace read operations with another operation, in the configured
- * percentage.
+ * Replace read operations with another operation, in the configured percentage.
*/
static void
run_mix_schedule_op(WORKLOAD *workp, int op, int64_t op_cnt)
{
- int jump, pass;
- uint8_t *p, *end;
-
- /* Jump around the array to roughly spread out the operations. */
- jump = (int)(100 / op_cnt);
-
- /*
- * Find a read operation and replace it with another operation. This
- * is roughly n-squared, but it's an N of 100, leave it.
- */
- p = workp->ops;
- end = workp->ops + sizeof(workp->ops);
- while (op_cnt-- > 0) {
- for (pass = 0; *p != WORKER_READ; ++p)
- if (p == end) {
- /*
- * Passed a percentage of total operations and
- * should always be a read operation to replace,
- * but don't allow infinite loops.
- */
- if (++pass > 1)
- return;
- p = workp->ops;
- }
- *p = (uint8_t)op;
-
- if (end - jump < p)
- p = workp->ops;
- else
- p += jump;
- }
+ int jump, pass;
+ uint8_t *p, *end;
+
+ /* Jump around the array to roughly spread out the operations. */
+ jump = (int)(100 / op_cnt);
+
+ /*
+ * Find a read operation and replace it with another operation. This is roughly n-squared, but
+ * it's an N of 100, leave it.
+ */
+ p = workp->ops;
+ end = workp->ops + sizeof(workp->ops);
+ while (op_cnt-- > 0) {
+ for (pass = 0; *p != WORKER_READ; ++p)
+ if (p == end) {
+ /*
+ * Passed a percentage of total operations and should always be a read operation to
+ * replace, but don't allow infinite loops.
+ */
+ if (++pass > 1)
+ return;
+ p = workp->ops;
+ }
+ *p = (uint8_t)op;
+
+ if (end - jump < p)
+ p = workp->ops;
+ else
+ p += jump;
+ }
}
/*
* run_mix_schedule --
- * Schedule the mixed-run operations.
+ * Schedule the mixed-run operations.
*/
static int
run_mix_schedule(WTPERF *wtperf, WORKLOAD *workp)
{
- CONFIG_OPTS *opts;
- int64_t pct;
-
- opts = wtperf->opts;
-
- if (workp->truncate != 0) {
- if (workp->insert != 0 ||
- workp->read != 0 || workp->update != 0) {
- lprintf(wtperf, EINVAL, 0,
- "Can't configure truncate in a mixed workload");
- return (EINVAL);
- }
- memset(workp->ops, WORKER_TRUNCATE, sizeof(workp->ops));
- return (0);
- }
-
- /* Confirm reads, inserts and updates cannot all be zero. */
- if (workp->insert == 0 && workp->read == 0 && workp->update == 0) {
- lprintf(wtperf, EINVAL, 0, "no operations scheduled");
- return (EINVAL);
- }
-
- /*
- * Check for a simple case where the thread is only doing insert or
- * update operations (because the default operation for a
- * job-mix is read, the subsequent code works fine if only reads are
- * specified).
- */
- if (workp->insert != 0 && workp->read == 0 && workp->update == 0) {
- memset(workp->ops,
- opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT,
- sizeof(workp->ops));
- return (0);
- }
- if (workp->insert == 0 && workp->read == 0 && workp->update != 0) {
- memset(workp->ops, WORKER_UPDATE, sizeof(workp->ops));
- return (0);
- }
-
- /*
- * The worker thread configuration is done as ratios of operations. If
- * the caller gives us something insane like "reads=77,updates=23" (do
- * 77 reads for every 23 updates), we don't want to do 77 reads followed
- * by 23 updates, we want to uniformly distribute the read and update
- * operations across the space. Convert to percentages and then lay out
- * the operations across an array.
- *
- * Percentage conversion is lossy, the application can do stupid stuff
- * here, for example, imagine a configured ratio of "reads=1,inserts=2,
- * updates=999999". First, if the percentages are skewed enough, some
- * operations might never be done. Second, we set the base operation to
- * read, which means any fractional results from percentage conversion
- * will be reads, implying read operations in some cases where reads
- * weren't configured. We should be fine if the application configures
- * something approaching a rational set of ratios.
- */
- memset(workp->ops, WORKER_READ, sizeof(workp->ops));
-
- pct = (workp->insert * 100) /
- (workp->insert + workp->read + workp->update);
- if (pct != 0)
- run_mix_schedule_op(workp,
- opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT, pct);
- pct = (workp->update * 100) /
- (workp->insert + workp->read + workp->update);
- if (pct != 0)
- run_mix_schedule_op(workp, WORKER_UPDATE, pct);
- return (0);
+ CONFIG_OPTS *opts;
+ int64_t pct;
+
+ opts = wtperf->opts;
+
+ if (workp->truncate != 0) {
+ if (workp->insert != 0 || workp->read != 0 || workp->update != 0) {
+ lprintf(wtperf, EINVAL, 0, "Can't configure truncate in a mixed workload");
+ return (EINVAL);
+ }
+ memset(workp->ops, WORKER_TRUNCATE, sizeof(workp->ops));
+ return (0);
+ }
+
+ /* Confirm reads, inserts and updates cannot all be zero. */
+ if (workp->insert == 0 && workp->read == 0 && workp->update == 0) {
+ lprintf(wtperf, EINVAL, 0, "no operations scheduled");
+ return (EINVAL);
+ }
+
+ /*
+ * Check for a simple case where the thread is only doing insert or update operations (because
+ * the default operation for a job-mix is read, the subsequent code works fine if only reads are
+ * specified).
+ */
+ if (workp->insert != 0 && workp->read == 0 && workp->update == 0) {
+ memset(
+ workp->ops, opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT, sizeof(workp->ops));
+ return (0);
+ }
+ if (workp->insert == 0 && workp->read == 0 && workp->update != 0) {
+ memset(workp->ops, WORKER_UPDATE, sizeof(workp->ops));
+ return (0);
+ }
+
+ /*
+ * The worker thread configuration is done as ratios of operations. If
+ * the caller gives us something insane like "reads=77,updates=23" (do
+ * 77 reads for every 23 updates), we don't want to do 77 reads followed
+ * by 23 updates, we want to uniformly distribute the read and update
+ * operations across the space. Convert to percentages and then lay out
+ * the operations across an array.
+ *
+ * Percentage conversion is lossy, the application can do stupid stuff
+ * here, for example, imagine a configured ratio of "reads=1,inserts=2,
+ * updates=999999". First, if the percentages are skewed enough, some
+ * operations might never be done. Second, we set the base operation to
+ * read, which means any fractional results from percentage conversion
+ * will be reads, implying read operations in some cases where reads
+ * weren't configured. We should be fine if the application configures
+ * something approaching a rational set of ratios.
+ */
+ memset(workp->ops, WORKER_READ, sizeof(workp->ops));
+
+ pct = (workp->insert * 100) / (workp->insert + workp->read + workp->update);
+ if (pct != 0)
+ run_mix_schedule_op(workp, opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT, pct);
+ pct = (workp->update * 100) / (workp->insert + workp->read + workp->update);
+ if (pct != 0)
+ run_mix_schedule_op(workp, WORKER_UPDATE, pct);
+ return (0);
}
static WT_THREAD_RET
populate_thread(void *arg)
{
- struct timespec start, stop;
- CONFIG_OPTS *opts;
- TRACK *trk;
- WTPERF *wtperf;
- WTPERF_THREAD *thread;
- WT_CONNECTION *conn;
- WT_CURSOR **cursors, *cursor;
- WT_SESSION *session;
- size_t i;
- uint64_t op, usecs;
- uint32_t opcount;
- int intxn, measure_latency, ret, stress_checkpoint_due;
- char *value_buf, *key_buf;
- const char *cursor_config;
-
- thread = (WTPERF_THREAD *)arg;
- wtperf = thread->wtperf;
- opts = wtperf->opts;
- conn = wtperf->conn;
- session = NULL;
- cursors = NULL;
- ret = stress_checkpoint_due = 0;
- trk = &thread->insert;
-
- key_buf = thread->key_buf;
- value_buf = thread->value_buf;
-
- if ((ret = conn->open_session(
- conn, NULL, opts->sess_config, &session)) != 0) {
- lprintf(wtperf, ret, 0, "populate: WT_CONNECTION.open_session");
- goto err;
- }
-
- /* Do bulk loads if populate is single-threaded. */
- cursor_config =
- (opts->populate_threads == 1 && !opts->index) ? "bulk" : NULL;
- /* Create the cursors. */
- cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
- for (i = 0; i < opts->table_count; i++) {
- if ((ret = session->open_cursor(
- session, wtperf->uris[i], NULL,
- cursor_config, &cursors[i])) != 0) {
- lprintf(wtperf, ret, 0,
- "populate: WT_SESSION.open_cursor: %s",
- wtperf->uris[i]);
- goto err;
- }
- }
-
- /* Populate the databases. */
- for (intxn = 0, opcount = 0;;) {
- op = get_next_incr(wtperf);
- if (op > opts->icount)
- break;
-
- if (opts->populate_ops_per_txn != 0 && !intxn) {
- if ((ret = session->begin_transaction(
- session, opts->transaction_config)) != 0) {
- lprintf(wtperf, ret, 0,
- "Failed starting transaction.");
- goto err;
- }
- intxn = 1;
- }
- /*
- * Figure out which table this op belongs to.
- */
- cursor = cursors[map_key_to_table(wtperf->opts, op)];
- generate_key(opts, key_buf, op);
- measure_latency =
- opts->sample_interval != 0 &&
- trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
- if (measure_latency)
- __wt_epoch(NULL, &start);
- cursor->set_key(cursor, key_buf);
- if (opts->random_value)
- randomize_value(thread, value_buf);
- cursor->set_value(cursor, value_buf);
- if ((ret = cursor->insert(cursor)) == WT_ROLLBACK) {
- lprintf(wtperf, ret, 0, "insert retrying");
- if ((ret = session->rollback_transaction(
- session, NULL)) != 0) {
- lprintf(wtperf, ret, 0,
- "Failed rollback_transaction");
- goto err;
- }
- intxn = 0;
- continue;
- } else if (ret != 0) {
- lprintf(wtperf, ret, 0, "Failed inserting");
- goto err;
- }
- /*
- * Gather statistics.
- * We measure the latency of inserting a single key. If there
- * are multiple tables, it is the time for insertion into all
- * of them.
- */
- if (measure_latency) {
- __wt_epoch(NULL, &stop);
- ++trk->latency_ops;
- usecs = WT_TIMEDIFF_US(stop, start);
- track_operation(trk, usecs);
- }
- ++thread->insert.ops; /* Same as trk->ops */
-
- if (opts->checkpoint_stress_rate != 0 &&
- (op % opts->checkpoint_stress_rate) == 0)
- stress_checkpoint_due = 1;
-
- if (opts->populate_ops_per_txn != 0) {
- if (++opcount < opts->populate_ops_per_txn)
- continue;
- opcount = 0;
-
- if ((ret = session->commit_transaction(
- session, NULL)) != 0)
- lprintf(wtperf, ret, 0,
- "Fail committing, transaction was aborted");
- intxn = 0;
- }
-
- if (stress_checkpoint_due && intxn == 0) {
- stress_checkpoint_due = 0;
- if ((ret = session->checkpoint(session, NULL)) != 0) {
- lprintf(wtperf, ret, 0, "Checkpoint failed");
- goto err;
- }
- }
- }
- if (intxn &&
- (ret = session->commit_transaction(session, NULL)) != 0)
- lprintf(wtperf, ret, 0,
- "Fail committing, transaction was aborted");
-
- if ((ret = session->close(session, NULL)) != 0) {
- lprintf(wtperf, ret, 0, "Error closing session in populate");
- goto err;
- }
-
- /* Notify our caller we failed and shut the system down. */
- if (0) {
-err: wtperf->error = wtperf->stop = true;
- }
- free(cursors);
-
- return (WT_THREAD_RET_VALUE);
+ struct timespec start, stop;
+ CONFIG_OPTS *opts;
+ TRACK *trk;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
+ WT_CONNECTION *conn;
+ WT_CURSOR **cursors, *cursor;
+ WT_SESSION *session;
+ size_t i;
+ uint64_t op, usecs;
+ uint32_t opcount;
+ int intxn, measure_latency, ret, stress_checkpoint_due;
+ char *value_buf, *key_buf;
+ const char *cursor_config;
+
+ thread = (WTPERF_THREAD *)arg;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+ conn = wtperf->conn;
+ session = NULL;
+ cursors = NULL;
+ ret = stress_checkpoint_due = 0;
+ trk = &thread->insert;
+
+ key_buf = thread->key_buf;
+ value_buf = thread->value_buf;
+
+ if ((ret = conn->open_session(conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0, "populate: WT_CONNECTION.open_session");
+ goto err;
+ }
+
+ /* Do bulk loads if populate is single-threaded. */
+ cursor_config = (opts->populate_threads == 1 && !opts->index) ? "bulk" : NULL;
+ /* Create the cursors. */
+ cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
+ for (i = 0; i < opts->table_count; i++) {
+ if ((ret = session->open_cursor(
+ session, wtperf->uris[i], NULL, cursor_config, &cursors[i])) != 0) {
+ lprintf(wtperf, ret, 0, "populate: WT_SESSION.open_cursor: %s", wtperf->uris[i]);
+ goto err;
+ }
+ }
+
+ /* Populate the databases. */
+ for (intxn = 0, opcount = 0;;) {
+ op = get_next_incr(wtperf);
+ if (op > opts->icount)
+ break;
+
+ if (opts->populate_ops_per_txn != 0 && !intxn) {
+ if ((ret = session->begin_transaction(session, opts->transaction_config)) != 0) {
+ lprintf(wtperf, ret, 0, "Failed starting transaction.");
+ goto err;
+ }
+ intxn = 1;
+ }
+ /*
+ * Figure out which table this op belongs to.
+ */
+ cursor = cursors[map_key_to_table(wtperf->opts, op)];
+ generate_key(opts, key_buf, op);
+ measure_latency =
+ opts->sample_interval != 0 && trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
+ if (measure_latency)
+ __wt_epoch(NULL, &start);
+ cursor->set_key(cursor, key_buf);
+ if (opts->random_value)
+ randomize_value(thread, value_buf);
+ cursor->set_value(cursor, value_buf);
+ if ((ret = cursor->insert(cursor)) == WT_ROLLBACK) {
+ lprintf(wtperf, ret, 0, "insert retrying");
+ if ((ret = session->rollback_transaction(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Failed rollback_transaction");
+ goto err;
+ }
+ intxn = 0;
+ continue;
+ } else if (ret != 0) {
+ lprintf(wtperf, ret, 0, "Failed inserting");
+ goto err;
+ }
+ /*
+ * Gather statistics. We measure the latency of inserting a single key. If there are
+ * multiple tables, it is the time for insertion into all of them.
+ */
+ if (measure_latency) {
+ __wt_epoch(NULL, &stop);
+ ++trk->latency_ops;
+ usecs = WT_TIMEDIFF_US(stop, start);
+ track_operation(trk, usecs);
+ }
+ ++thread->insert.ops; /* Same as trk->ops */
+
+ if (opts->checkpoint_stress_rate != 0 && (op % opts->checkpoint_stress_rate) == 0)
+ stress_checkpoint_due = 1;
+
+ if (opts->populate_ops_per_txn != 0) {
+ if (++opcount < opts->populate_ops_per_txn)
+ continue;
+ opcount = 0;
+
+ if ((ret = session->commit_transaction(session, NULL)) != 0)
+ lprintf(wtperf, ret, 0, "Fail committing, transaction was aborted");
+ intxn = 0;
+ }
+
+ if (stress_checkpoint_due && intxn == 0) {
+ stress_checkpoint_due = 0;
+ if ((ret = session->checkpoint(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Checkpoint failed");
+ goto err;
+ }
+ }
+ }
+ if (intxn && (ret = session->commit_transaction(session, NULL)) != 0)
+ lprintf(wtperf, ret, 0, "Fail committing, transaction was aborted");
+
+ if ((ret = session->close(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Error closing session in populate");
+ goto err;
+ }
+
+ /* Notify our caller we failed and shut the system down. */
+ if (0) {
+err:
+ wtperf->error = wtperf->stop = true;
+ }
+ free(cursors);
+
+ return (WT_THREAD_RET_VALUE);
}
static WT_THREAD_RET
populate_async(void *arg)
{
- struct timespec start, stop;
- CONFIG_OPTS *opts;
- TRACK *trk;
- WTPERF *wtperf;
- WTPERF_THREAD *thread;
- WT_ASYNC_OP *asyncop;
- WT_CONNECTION *conn;
- WT_SESSION *session;
- uint64_t op, usecs;
- int measure_latency, ret;
- char *value_buf, *key_buf;
-
- thread = (WTPERF_THREAD *)arg;
- wtperf = thread->wtperf;
- opts = wtperf->opts;
- conn = wtperf->conn;
- session = NULL;
- ret = 0;
- trk = &thread->insert;
-
- key_buf = thread->key_buf;
- value_buf = thread->value_buf;
-
- if ((ret = conn->open_session(
- conn, NULL, opts->sess_config, &session)) != 0) {
- lprintf(wtperf, ret, 0, "populate: WT_CONNECTION.open_session");
- goto err;
- }
-
- /*
- * Measuring latency of one async op is not meaningful. We
- * will measure the time it takes to do all of them, including
- * the time to process by workers.
- */
- measure_latency =
- opts->sample_interval != 0 &&
- trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
- if (measure_latency)
- __wt_epoch(NULL, &start);
-
- /* Populate the databases. */
- for (;;) {
- op = get_next_incr(wtperf);
- if (op > opts->icount)
- break;
- /*
- * Allocate an async op for whichever table.
- */
- while ((ret = conn->async_new_op(
- conn, wtperf->uris[map_key_to_table(wtperf->opts, op)],
- NULL, &cb, &asyncop)) == EBUSY)
- (void)usleep(10000);
- if (ret != 0)
- goto err;
-
- asyncop->app_private = thread;
- generate_key(opts, key_buf, op);
- asyncop->set_key(asyncop, key_buf);
- if (opts->random_value)
- randomize_value(thread, value_buf);
- asyncop->set_value(asyncop, value_buf);
- if ((ret = asyncop->insert(asyncop)) != 0) {
- lprintf(wtperf, ret, 0, "Failed inserting");
- goto err;
- }
- }
-
- /*
- * Gather statistics.
- * We measure the latency of inserting a single key. If there
- * are multiple tables, it is the time for insertion into all
- * of them. Note that currently every populate thread will call
- * async_flush and those calls will convoy. That is not the
- * most efficient way, but we want to flush before measuring latency.
- */
- if (conn->async_flush(conn) != 0)
- goto err;
- if (measure_latency) {
- __wt_epoch(NULL, &stop);
- ++trk->latency_ops;
- usecs = WT_TIMEDIFF_US(stop, start);
- track_operation(trk, usecs);
- }
- if ((ret = session->close(session, NULL)) != 0) {
- lprintf(wtperf, ret, 0, "Error closing session in populate");
- goto err;
- }
-
- /* Notify our caller we failed and shut the system down. */
- if (0) {
-err: wtperf->error = wtperf->stop = true;
- }
- return (WT_THREAD_RET_VALUE);
+ struct timespec start, stop;
+ CONFIG_OPTS *opts;
+ TRACK *trk;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
+ WT_ASYNC_OP *asyncop;
+ WT_CONNECTION *conn;
+ WT_SESSION *session;
+ uint64_t op, usecs;
+ int measure_latency, ret;
+ char *value_buf, *key_buf;
+
+ thread = (WTPERF_THREAD *)arg;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+ conn = wtperf->conn;
+ session = NULL;
+ ret = 0;
+ trk = &thread->insert;
+
+ key_buf = thread->key_buf;
+ value_buf = thread->value_buf;
+
+ if ((ret = conn->open_session(conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0, "populate: WT_CONNECTION.open_session");
+ goto err;
+ }
+
+ /*
+ * Measuring latency of one async op is not meaningful. We will measure the time it takes to do
+ * all of them, including the time to process by workers.
+ */
+ measure_latency =
+ opts->sample_interval != 0 && trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
+ if (measure_latency)
+ __wt_epoch(NULL, &start);
+
+ /* Populate the databases. */
+ for (;;) {
+ op = get_next_incr(wtperf);
+ if (op > opts->icount)
+ break;
+ /*
+ * Allocate an async op for whichever table.
+ */
+ while ((ret = conn->async_new_op(conn, wtperf->uris[map_key_to_table(wtperf->opts, op)],
+ NULL, &cb, &asyncop)) == EBUSY)
+ (void)usleep(10000);
+ if (ret != 0)
+ goto err;
+
+ asyncop->app_private = thread;
+ generate_key(opts, key_buf, op);
+ asyncop->set_key(asyncop, key_buf);
+ if (opts->random_value)
+ randomize_value(thread, value_buf);
+ asyncop->set_value(asyncop, value_buf);
+ if ((ret = asyncop->insert(asyncop)) != 0) {
+ lprintf(wtperf, ret, 0, "Failed inserting");
+ goto err;
+ }
+ }
+
+ /*
+ * Gather statistics. We measure the latency of inserting a single key. If there are multiple
+ * tables, it is the time for insertion into all of them. Note that currently every populate
+ * thread will call async_flush and those calls will convoy. That is not the most efficient way,
+ * but we want to flush before measuring latency.
+ */
+ if (conn->async_flush(conn) != 0)
+ goto err;
+ if (measure_latency) {
+ __wt_epoch(NULL, &stop);
+ ++trk->latency_ops;
+ usecs = WT_TIMEDIFF_US(stop, start);
+ track_operation(trk, usecs);
+ }
+ if ((ret = session->close(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Error closing session in populate");
+ goto err;
+ }
+
+ /* Notify our caller we failed and shut the system down. */
+ if (0) {
+err:
+ wtperf->error = wtperf->stop = true;
+ }
+ return (WT_THREAD_RET_VALUE);
}
static WT_THREAD_RET
monitor(void *arg)
{
- struct timespec t;
- struct tm localt;
- CONFIG_OPTS *opts;
- FILE *fp, *jfp;
- WTPERF *wtperf;
- size_t len;
- uint64_t min_thr, reads, inserts, updates;
- uint64_t cur_reads, cur_inserts, cur_updates;
- uint64_t last_reads, last_inserts, last_updates;
- uint32_t read_avg, read_min, read_max;
- uint32_t insert_avg, insert_min, insert_max;
- uint32_t update_avg, update_min, update_max;
- uint32_t latency_max, level;
- u_int i;
- size_t buf_size;
- int msg_err;
- const char *str;
- char buf[64], *path;
- bool first;
-
- wtperf = (WTPERF *)arg;
- opts = wtperf->opts;
- assert(opts->sample_interval != 0);
-
- fp = jfp = NULL;
- first = true;
- path = NULL;
-
- min_thr = (uint64_t)opts->min_throughput;
- latency_max = (uint32_t)ms_to_us(opts->max_latency);
-
- /* Open the logging file. */
- len = strlen(wtperf->monitor_dir) + 100;
- path = dmalloc(len);
- testutil_check(__wt_snprintf(
- path, len, "%s/monitor", wtperf->monitor_dir));
- if ((fp = fopen(path, "w")) == NULL) {
- lprintf(wtperf, errno, 0, "%s", path);
- goto err;
- }
- testutil_check(__wt_snprintf(
- path, len, "%s/monitor.json", wtperf->monitor_dir));
- if ((jfp = fopen(path, "w")) == NULL) {
- lprintf(wtperf, errno, 0, "%s", path);
- goto err;
- }
- /* Set line buffering for monitor file. */
- __wt_stream_set_line_buffer(fp);
- __wt_stream_set_line_buffer(jfp);
- fprintf(fp,
- "#time,"
- "totalsec,"
- "read ops per second,"
- "insert ops per second,"
- "update ops per second,"
- "checkpoints,"
- "read average latency(uS),"
- "read minimum latency(uS),"
- "read maximum latency(uS),"
- "insert average latency(uS),"
- "insert min latency(uS),"
- "insert maximum latency(uS),"
- "update average latency(uS),"
- "update min latency(uS),"
- "update maximum latency(uS)"
- "\n");
- last_reads = last_inserts = last_updates = 0;
- while (!wtperf->stop) {
- for (i = 0; i < opts->sample_interval; i++) {
- sleep(1);
- if (wtperf->stop)
- break;
- }
- /* If the workers are done, don't bother with a final call. */
- if (wtperf->stop)
- break;
- if (wtperf->in_warmup)
- continue;
-
- __wt_epoch(NULL, &t);
- testutil_check(__wt_localtime(NULL, &t.tv_sec, &localt));
- testutil_assert(
- strftime(buf, sizeof(buf), "%b %d %H:%M:%S", &localt) != 0);
-
- reads = sum_read_ops(wtperf);
- inserts = sum_insert_ops(wtperf);
- updates = sum_update_ops(wtperf);
- latency_read(wtperf, &read_avg, &read_min, &read_max);
- latency_insert(wtperf, &insert_avg, &insert_min, &insert_max);
- latency_update(wtperf, &update_avg, &update_min, &update_max);
-
- cur_reads = (reads - last_reads) / opts->sample_interval;
- cur_updates = (updates - last_updates) / opts->sample_interval;
- /*
- * For now the only item we need to worry about changing is
- * inserts when we transition from the populate phase to
- * workload phase.
- */
- if (inserts < last_inserts)
- cur_inserts = 0;
- else
- cur_inserts =
- (inserts - last_inserts) / opts->sample_interval;
-
- (void)fprintf(fp,
- "%s,%" PRIu32
- ",%" PRIu64 ",%" PRIu64 ",%" PRIu64
- ",%c"
- ",%" PRIu32 ",%" PRIu32 ",%" PRIu32
- ",%" PRIu32 ",%" PRIu32 ",%" PRIu32
- ",%" PRIu32 ",%" PRIu32 ",%" PRIu32
- "\n",
- buf, wtperf->totalsec,
- cur_reads, cur_inserts, cur_updates,
- wtperf->ckpt ? 'Y' : 'N',
- read_avg, read_min, read_max,
- insert_avg, insert_min, insert_max,
- update_avg, update_min, update_max);
- if (jfp != NULL) {
- buf_size = strftime(buf,
- sizeof(buf), "%Y-%m-%dT%H:%M:%S", &localt);
- testutil_assert(buf_size != 0);
- testutil_check(__wt_snprintf(&buf[buf_size],
- sizeof(buf) - buf_size,
- ".%3.3" PRIu64 "Z",
- ns_to_ms((uint64_t)t.tv_nsec)));
- (void)fprintf(jfp, "{");
- if (first) {
- (void)fprintf(jfp, "\"version\":\"%s\",",
- WIREDTIGER_VERSION_STRING);
- first = false;
- }
- (void)fprintf(jfp,
- "\"localTime\":\"%s\",\"wtperf\":{", buf);
- /* Note does not have initial comma before "read" */
- (void)fprintf(jfp,
- "\"read\":{\"ops per sec\":%" PRIu64
- ",\"average latency\":%" PRIu32
- ",\"min latency\":%" PRIu32
- ",\"max latency\":%" PRIu32 "}",
- cur_reads, read_avg, read_min, read_max);
- (void)fprintf(jfp,
- ",\"insert\":{\"ops per sec\":%" PRIu64
- ",\"average latency\":%" PRIu32
- ",\"min latency\":%" PRIu32
- ",\"max latency\":%" PRIu32 "}",
- cur_inserts, insert_avg, insert_min, insert_max);
- (void)fprintf(jfp,
- ",\"update\":{\"ops per sec\":%" PRIu64
- ",\"average latency\":%" PRIu32
- ",\"min latency\":%" PRIu32
- ",\"max latency\":%" PRIu32 "}",
- cur_updates, update_avg, update_min, update_max);
- fprintf(jfp, "}}\n");
- }
-
- if (latency_max != 0 &&
- (read_max > latency_max || insert_max > latency_max ||
- update_max > latency_max)) {
- if (opts->max_latency_fatal) {
- level = 1;
- msg_err = WT_PANIC;
- str = "ERROR";
- } else {
- level = 0;
- msg_err = 0;
- str = "WARNING";
- }
- lprintf(wtperf, msg_err, level,
- "%s: max latency exceeded: threshold %" PRIu32
- " read max %" PRIu32 " insert max %" PRIu32
- " update max %" PRIu32, str, latency_max,
- read_max, insert_max, update_max);
- }
- if (min_thr != 0 &&
- ((cur_reads != 0 && cur_reads < min_thr) ||
- (cur_inserts != 0 && cur_inserts < min_thr) ||
- (cur_updates != 0 && cur_updates < min_thr))) {
- if (opts->min_throughput_fatal) {
- level = 1;
- msg_err = WT_PANIC;
- str = "ERROR";
- } else {
- level = 0;
- msg_err = 0;
- str = "WARNING";
- }
- lprintf(wtperf, msg_err, level,
- "%s: minimum throughput not met: threshold %" PRIu64
- " reads %" PRIu64 " inserts %" PRIu64
- " updates %" PRIu64, str, min_thr, cur_reads,
- cur_inserts, cur_updates);
- }
- last_reads = reads;
- last_inserts = inserts;
- last_updates = updates;
- }
-
- /* Notify our caller we failed and shut the system down. */
- if (0) {
-err: wtperf->error = wtperf->stop = true;
- }
-
- if (fp != NULL)
- (void)fclose(fp);
- if (jfp != NULL)
- (void)fclose(jfp);
- free(path);
-
- return (WT_THREAD_RET_VALUE);
+ struct timespec t;
+ struct tm localt;
+ CONFIG_OPTS *opts;
+ FILE *fp, *jfp;
+ WTPERF *wtperf;
+ size_t len;
+ uint64_t min_thr, reads, inserts, updates;
+ uint64_t cur_reads, cur_inserts, cur_updates;
+ uint64_t last_reads, last_inserts, last_updates;
+ uint32_t read_avg, read_min, read_max;
+ uint32_t insert_avg, insert_min, insert_max;
+ uint32_t update_avg, update_min, update_max;
+ uint32_t latency_max, level;
+ u_int i;
+ size_t buf_size;
+ int msg_err;
+ const char *str;
+ char buf[64], *path;
+ bool first;
+
+ wtperf = (WTPERF *)arg;
+ opts = wtperf->opts;
+ assert(opts->sample_interval != 0);
+
+ fp = jfp = NULL;
+ first = true;
+ path = NULL;
+
+ min_thr = (uint64_t)opts->min_throughput;
+ latency_max = (uint32_t)ms_to_us(opts->max_latency);
+
+ /* Open the logging file. */
+ len = strlen(wtperf->monitor_dir) + 100;
+ path = dmalloc(len);
+ testutil_check(__wt_snprintf(path, len, "%s/monitor", wtperf->monitor_dir));
+ if ((fp = fopen(path, "w")) == NULL) {
+ lprintf(wtperf, errno, 0, "%s", path);
+ goto err;
+ }
+ testutil_check(__wt_snprintf(path, len, "%s/monitor.json", wtperf->monitor_dir));
+ if ((jfp = fopen(path, "w")) == NULL) {
+ lprintf(wtperf, errno, 0, "%s", path);
+ goto err;
+ }
+ /* Set line buffering for monitor file. */
+ __wt_stream_set_line_buffer(fp);
+ __wt_stream_set_line_buffer(jfp);
+ fprintf(fp,
+ "#time,"
+ "totalsec,"
+ "read ops per second,"
+ "insert ops per second,"
+ "update ops per second,"
+ "checkpoints,"
+ "read average latency(uS),"
+ "read minimum latency(uS),"
+ "read maximum latency(uS),"
+ "insert average latency(uS),"
+ "insert min latency(uS),"
+ "insert maximum latency(uS),"
+ "update average latency(uS),"
+ "update min latency(uS),"
+ "update maximum latency(uS)"
+ "\n");
+ last_reads = last_inserts = last_updates = 0;
+ while (!wtperf->stop) {
+ for (i = 0; i < opts->sample_interval; i++) {
+ sleep(1);
+ if (wtperf->stop)
+ break;
+ }
+ /* If the workers are done, don't bother with a final call. */
+ if (wtperf->stop)
+ break;
+ if (wtperf->in_warmup)
+ continue;
+
+ __wt_epoch(NULL, &t);
+ testutil_check(__wt_localtime(NULL, &t.tv_sec, &localt));
+ testutil_assert(strftime(buf, sizeof(buf), "%b %d %H:%M:%S", &localt) != 0);
+
+ reads = sum_read_ops(wtperf);
+ inserts = sum_insert_ops(wtperf);
+ updates = sum_update_ops(wtperf);
+ latency_read(wtperf, &read_avg, &read_min, &read_max);
+ latency_insert(wtperf, &insert_avg, &insert_min, &insert_max);
+ latency_update(wtperf, &update_avg, &update_min, &update_max);
+
+ cur_reads = (reads - last_reads) / opts->sample_interval;
+ cur_updates = (updates - last_updates) / opts->sample_interval;
+ /*
+ * For now the only item we need to worry about changing is inserts when we transition from
+ * the populate phase to workload phase.
+ */
+ if (inserts < last_inserts)
+ cur_inserts = 0;
+ else
+ cur_inserts = (inserts - last_inserts) / opts->sample_interval;
+
+ (void)fprintf(fp, "%s,%" PRIu32 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64
+ ",%c"
+ ",%" PRIu32 ",%" PRIu32 ",%" PRIu32 ",%" PRIu32 ",%" PRIu32 ",%" PRIu32
+ ",%" PRIu32 ",%" PRIu32 ",%" PRIu32 "\n",
+ buf, wtperf->totalsec, cur_reads, cur_inserts, cur_updates, wtperf->ckpt ? 'Y' : 'N',
+ read_avg, read_min, read_max, insert_avg, insert_min, insert_max, update_avg, update_min,
+ update_max);
+ if (jfp != NULL) {
+ buf_size = strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", &localt);
+ testutil_assert(buf_size != 0);
+ testutil_check(__wt_snprintf(&buf[buf_size], sizeof(buf) - buf_size, ".%3.3" PRIu64 "Z",
+ ns_to_ms((uint64_t)t.tv_nsec)));
+ (void)fprintf(jfp, "{");
+ if (first) {
+ (void)fprintf(jfp, "\"version\":\"%s\",", WIREDTIGER_VERSION_STRING);
+ first = false;
+ }
+ (void)fprintf(jfp, "\"localTime\":\"%s\",\"wtperf\":{", buf);
+ /* Note does not have initial comma before "read" */
+ (void)fprintf(jfp, "\"read\":{\"ops per sec\":%" PRIu64 ",\"average latency\":%" PRIu32
+ ",\"min latency\":%" PRIu32 ",\"max latency\":%" PRIu32 "}",
+ cur_reads, read_avg, read_min, read_max);
+ (void)fprintf(jfp,
+ ",\"insert\":{\"ops per sec\":%" PRIu64 ",\"average latency\":%" PRIu32
+ ",\"min latency\":%" PRIu32 ",\"max latency\":%" PRIu32 "}",
+ cur_inserts, insert_avg, insert_min, insert_max);
+ (void)fprintf(jfp,
+ ",\"update\":{\"ops per sec\":%" PRIu64 ",\"average latency\":%" PRIu32
+ ",\"min latency\":%" PRIu32 ",\"max latency\":%" PRIu32 "}",
+ cur_updates, update_avg, update_min, update_max);
+ fprintf(jfp, "}}\n");
+ }
+
+ if (latency_max != 0 &&
+ (read_max > latency_max || insert_max > latency_max || update_max > latency_max)) {
+ if (opts->max_latency_fatal) {
+ level = 1;
+ msg_err = WT_PANIC;
+ str = "ERROR";
+ } else {
+ level = 0;
+ msg_err = 0;
+ str = "WARNING";
+ }
+ lprintf(wtperf, msg_err, level,
+ "%s: max latency exceeded: threshold %" PRIu32 " read max %" PRIu32
+ " insert max %" PRIu32 " update max %" PRIu32,
+ str, latency_max, read_max, insert_max, update_max);
+ }
+ if (min_thr != 0 &&
+ ((cur_reads != 0 && cur_reads < min_thr) || (cur_inserts != 0 && cur_inserts < min_thr) ||
+ (cur_updates != 0 && cur_updates < min_thr))) {
+ if (opts->min_throughput_fatal) {
+ level = 1;
+ msg_err = WT_PANIC;
+ str = "ERROR";
+ } else {
+ level = 0;
+ msg_err = 0;
+ str = "WARNING";
+ }
+ lprintf(wtperf, msg_err, level,
+ "%s: minimum throughput not met: threshold %" PRIu64 " reads %" PRIu64
+ " inserts %" PRIu64 " updates %" PRIu64,
+ str, min_thr, cur_reads, cur_inserts, cur_updates);
+ }
+ last_reads = reads;
+ last_inserts = inserts;
+ last_updates = updates;
+ }
+
+ /* Notify our caller we failed and shut the system down. */
+ if (0) {
+err:
+ wtperf->error = wtperf->stop = true;
+ }
+
+ if (fp != NULL)
+ (void)fclose(fp);
+ if (jfp != NULL)
+ (void)fclose(jfp);
+ free(path);
+
+ return (WT_THREAD_RET_VALUE);
}
static WT_THREAD_RET
checkpoint_worker(void *arg)
{
- CONFIG_OPTS *opts;
- WTPERF *wtperf;
- WTPERF_THREAD *thread;
- WT_CONNECTION *conn;
- WT_SESSION *session;
- struct timespec e, s;
- uint32_t i;
- int ret;
-
- thread = (WTPERF_THREAD *)arg;
- wtperf = thread->wtperf;
- opts = wtperf->opts;
- conn = wtperf->conn;
- session = NULL;
-
- if ((ret = conn->open_session(
- conn, NULL, opts->sess_config, &session)) != 0) {
- lprintf(wtperf, ret, 0,
- "open_session failed in checkpoint thread.");
- goto err;
- }
-
- while (!wtperf->stop) {
- /* Break the sleep up, so we notice interrupts faster. */
- for (i = 0; i < opts->checkpoint_interval; i++) {
- sleep(1);
- if (wtperf->stop)
- break;
- }
- /* If the workers are done, don't bother with a final call. */
- if (wtperf->stop)
- break;
-
- __wt_epoch(NULL, &s);
-
- wtperf->ckpt = true;
- if ((ret = session->checkpoint(session, NULL)) != 0) {
- lprintf(wtperf, ret, 0, "Checkpoint failed.");
- goto err;
- }
- wtperf->ckpt = false;
- ++thread->ckpt.ops;
-
- __wt_epoch(NULL, &e);
- }
-
- if (session != NULL &&
- ((ret = session->close(session, NULL)) != 0)) {
- lprintf(wtperf, ret, 0,
- "Error closing session in checkpoint worker.");
- goto err;
- }
-
- /* Notify our caller we failed and shut the system down. */
- if (0) {
-err: wtperf->error = wtperf->stop = true;
- }
-
- return (WT_THREAD_RET_VALUE);
+ CONFIG_OPTS *opts;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
+ WT_CONNECTION *conn;
+ WT_SESSION *session;
+ struct timespec e, s;
+ uint32_t i;
+ int ret;
+
+ thread = (WTPERF_THREAD *)arg;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+ conn = wtperf->conn;
+ session = NULL;
+
+ if ((ret = conn->open_session(conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0, "open_session failed in checkpoint thread.");
+ goto err;
+ }
+
+ while (!wtperf->stop) {
+ /* Break the sleep up, so we notice interrupts faster. */
+ for (i = 0; i < opts->checkpoint_interval; i++) {
+ sleep(1);
+ if (wtperf->stop)
+ break;
+ }
+ /* If the workers are done, don't bother with a final call. */
+ if (wtperf->stop)
+ break;
+
+ __wt_epoch(NULL, &s);
+
+ wtperf->ckpt = true;
+ if ((ret = session->checkpoint(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Checkpoint failed.");
+ goto err;
+ }
+ wtperf->ckpt = false;
+ ++thread->ckpt.ops;
+
+ __wt_epoch(NULL, &e);
+ }
+
+ if (session != NULL && ((ret = session->close(session, NULL)) != 0)) {
+ lprintf(wtperf, ret, 0, "Error closing session in checkpoint worker.");
+ goto err;
+ }
+
+ /* Notify our caller we failed and shut the system down. */
+ if (0) {
+err:
+ wtperf->error = wtperf->stop = true;
+ }
+
+ return (WT_THREAD_RET_VALUE);
}
static int
execute_populate(WTPERF *wtperf)
{
- struct timespec start, stop;
- CONFIG_OPTS *opts;
- WT_ASYNC_OP *asyncop;
- WTPERF_THREAD *popth;
- WT_THREAD_CALLBACK(*pfunc)(void *);
- size_t i;
- uint64_t last_ops, msecs, print_ops_sec;
- uint32_t interval, tables;
- wt_thread_t idle_table_cycle_thread;
- double print_secs;
- int elapsed, ret;
-
- opts = wtperf->opts;
-
- lprintf(wtperf, 0, 1,
- "Starting %" PRIu32
- " populate thread(s) for %" PRIu32 " items",
- opts->populate_threads, opts->icount);
-
- /* Start cycling idle tables if configured. */
- start_idle_table_cycle(wtperf, &idle_table_cycle_thread);
-
- wtperf->insert_key = 0;
-
- wtperf->popthreads =
- dcalloc(opts->populate_threads, sizeof(WTPERF_THREAD));
- if (wtperf->use_asyncops) {
- lprintf(wtperf, 0, 1, "Starting %" PRIu32 " async thread(s)",
- opts->async_threads);
- pfunc = populate_async;
- } else
- pfunc = populate_thread;
- start_threads(wtperf, NULL,
- wtperf->popthreads, opts->populate_threads, pfunc);
-
- __wt_epoch(NULL, &start);
- for (elapsed = 0, interval = 0, last_ops = 0;
- wtperf->insert_key < opts->icount && !wtperf->error;) {
- /*
- * Sleep for 100th of a second, report_interval is in second
- * granularity, each 100th increment of elapsed is a single
- * increment of interval.
- */
- (void)usleep(10000);
- if (opts->report_interval == 0 || ++elapsed < 100)
- continue;
- elapsed = 0;
- if (++interval < opts->report_interval)
- continue;
- interval = 0;
- wtperf->totalsec += opts->report_interval;
- wtperf->insert_ops = sum_pop_ops(wtperf);
- lprintf(wtperf, 0, 1,
- "%" PRIu64 " populate inserts (%" PRIu64 " of %"
- PRIu32 ") in %" PRIu32 " secs (%" PRIu32 " total secs)",
- wtperf->insert_ops - last_ops, wtperf->insert_ops,
- opts->icount, opts->report_interval, wtperf->totalsec);
- last_ops = wtperf->insert_ops;
- }
- __wt_epoch(NULL, &stop);
-
- /*
- * Move popthreads aside to narrow possible race with the monitor
- * thread. The latency tracking code also requires that popthreads be
- * NULL when the populate phase is finished, to know that the workload
- * phase has started.
- */
- popth = wtperf->popthreads;
- wtperf->popthreads = NULL;
- stop_threads(opts->populate_threads, popth);
- free(popth);
-
- /* Report if any worker threads didn't finish. */
- if (wtperf->error) {
- lprintf(wtperf, WT_ERROR, 0,
- "Populate thread(s) exited without finishing.");
- return (WT_ERROR);
- }
-
- lprintf(wtperf,
- 0, 1, "Finished load of %" PRIu32 " items", opts->icount);
- msecs = WT_TIMEDIFF_MS(stop, start);
-
- /*
- * This is needed as the divisions will fail if the insert takes no time
- * which will only be the case when there is no data to insert.
- */
- if (msecs == 0) {
- print_secs = 0;
- print_ops_sec = 0;
- } else {
- print_secs = (double)msecs / (double)MSEC_PER_SEC;
- print_ops_sec = (uint64_t)(opts->icount / print_secs);
- }
- lprintf(wtperf, 0, 1,
- "Load time: %.2f\n" "load ops/sec: %" PRIu64,
- print_secs, print_ops_sec);
-
- /*
- * If configured, compact to allow LSM merging to complete. We
- * set an unlimited timeout because if we close the connection
- * then any in-progress compact/merge is aborted.
- */
- if (opts->compact) {
- assert(opts->async_threads > 0);
- lprintf(wtperf, 0, 1, "Compact after populate");
- __wt_epoch(NULL, &start);
- tables = opts->table_count;
- for (i = 0; i < opts->table_count; i++) {
- /*
- * If no ops are available, retry. Any other error,
- * return.
- */
- while ((ret = wtperf->conn->async_new_op(
- wtperf->conn, wtperf->uris[i],
- "timeout=0", &cb, &asyncop)) == EBUSY)
- (void)usleep(10000);
- if (ret != 0)
- return (ret);
-
- asyncop->app_private = &tables;
- if ((ret = asyncop->compact(asyncop)) != 0) {
- lprintf(wtperf,
- ret, 0, "Async compact failed.");
- return (ret);
- }
- }
- if ((ret = wtperf->conn->async_flush(wtperf->conn)) != 0) {
- lprintf(wtperf, ret, 0, "Populate async flush failed.");
- return (ret);
- }
- __wt_epoch(NULL, &stop);
- lprintf(wtperf, 0, 1,
- "Compact completed in %" PRIu64 " seconds",
- (uint64_t)(WT_TIMEDIFF_SEC(stop, start)));
- assert(tables == 0);
- }
-
- /* Stop cycling idle tables. */
- stop_idle_table_cycle(wtperf, idle_table_cycle_thread);
-
- return (0);
+ struct timespec start, stop;
+ CONFIG_OPTS *opts;
+ WT_ASYNC_OP *asyncop;
+ WTPERF_THREAD *popth;
+ WT_THREAD_CALLBACK (*pfunc)(void *);
+ size_t i;
+ uint64_t last_ops, msecs, print_ops_sec;
+ uint32_t interval, tables;
+ wt_thread_t idle_table_cycle_thread;
+ double print_secs;
+ int elapsed, ret;
+
+ opts = wtperf->opts;
+
+ lprintf(wtperf, 0, 1, "Starting %" PRIu32 " populate thread(s) for %" PRIu32 " items",
+ opts->populate_threads, opts->icount);
+
+ /* Start cycling idle tables if configured. */
+ start_idle_table_cycle(wtperf, &idle_table_cycle_thread);
+
+ wtperf->insert_key = 0;
+
+ wtperf->popthreads = dcalloc(opts->populate_threads, sizeof(WTPERF_THREAD));
+ if (wtperf->use_asyncops) {
+ lprintf(wtperf, 0, 1, "Starting %" PRIu32 " async thread(s)", opts->async_threads);
+ pfunc = populate_async;
+ } else
+ pfunc = populate_thread;
+ start_threads(wtperf, NULL, wtperf->popthreads, opts->populate_threads, pfunc);
+
+ __wt_epoch(NULL, &start);
+ for (elapsed = 0, interval = 0, last_ops = 0;
+ wtperf->insert_key < opts->icount && !wtperf->error;) {
+ /*
+ * Sleep for 100th of a second, report_interval is in second granularity, each 100th
+ * increment of elapsed is a single increment of interval.
+ */
+ (void)usleep(10000);
+ if (opts->report_interval == 0 || ++elapsed < 100)
+ continue;
+ elapsed = 0;
+ if (++interval < opts->report_interval)
+ continue;
+ interval = 0;
+ wtperf->totalsec += opts->report_interval;
+ wtperf->insert_ops = sum_pop_ops(wtperf);
+ lprintf(wtperf, 0, 1, "%" PRIu64 " populate inserts (%" PRIu64 " of %" PRIu32
+ ") in %" PRIu32 " secs (%" PRIu32 " total secs)",
+ wtperf->insert_ops - last_ops, wtperf->insert_ops, opts->icount, opts->report_interval,
+ wtperf->totalsec);
+ last_ops = wtperf->insert_ops;
+ }
+ __wt_epoch(NULL, &stop);
+
+ /*
+ * Move popthreads aside to narrow possible race with the monitor thread. The latency tracking
+ * code also requires that popthreads be NULL when the populate phase is finished, to know that
+ * the workload phase has started.
+ */
+ popth = wtperf->popthreads;
+ wtperf->popthreads = NULL;
+ stop_threads(opts->populate_threads, popth);
+ free(popth);
+
+ /* Report if any worker threads didn't finish. */
+ if (wtperf->error) {
+ lprintf(wtperf, WT_ERROR, 0, "Populate thread(s) exited without finishing.");
+ return (WT_ERROR);
+ }
+
+ lprintf(wtperf, 0, 1, "Finished load of %" PRIu32 " items", opts->icount);
+ msecs = WT_TIMEDIFF_MS(stop, start);
+
+ /*
+ * This is needed as the divisions will fail if the insert takes no time which will only be the
+ * case when there is no data to insert.
+ */
+ if (msecs == 0) {
+ print_secs = 0;
+ print_ops_sec = 0;
+ } else {
+ print_secs = (double)msecs / (double)MSEC_PER_SEC;
+ print_ops_sec = (uint64_t)(opts->icount / print_secs);
+ }
+ lprintf(wtperf, 0, 1,
+ "Load time: %.2f\n"
+ "load ops/sec: %" PRIu64,
+ print_secs, print_ops_sec);
+
+ /*
+ * If configured, compact to allow LSM merging to complete. We set an unlimited timeout because
+ * if we close the connection then any in-progress compact/merge is aborted.
+ */
+ if (opts->compact) {
+ assert(opts->async_threads > 0);
+ lprintf(wtperf, 0, 1, "Compact after populate");
+ __wt_epoch(NULL, &start);
+ tables = opts->table_count;
+ for (i = 0; i < opts->table_count; i++) {
+ /*
+ * If no ops are available, retry. Any other error, return.
+ */
+ while ((ret = wtperf->conn->async_new_op(
+ wtperf->conn, wtperf->uris[i], "timeout=0", &cb, &asyncop)) == EBUSY)
+ (void)usleep(10000);
+ if (ret != 0)
+ return (ret);
+
+ asyncop->app_private = &tables;
+ if ((ret = asyncop->compact(asyncop)) != 0) {
+ lprintf(wtperf, ret, 0, "Async compact failed.");
+ return (ret);
+ }
+ }
+ if ((ret = wtperf->conn->async_flush(wtperf->conn)) != 0) {
+ lprintf(wtperf, ret, 0, "Populate async flush failed.");
+ return (ret);
+ }
+ __wt_epoch(NULL, &stop);
+ lprintf(wtperf, 0, 1, "Compact completed in %" PRIu64 " seconds",
+ (uint64_t)(WT_TIMEDIFF_SEC(stop, start)));
+ assert(tables == 0);
+ }
+
+ /* Stop cycling idle tables. */
+ stop_idle_table_cycle(wtperf, idle_table_cycle_thread);
+
+ return (0);
}
static int
close_reopen(WTPERF *wtperf)
{
- CONFIG_OPTS *opts;
- int ret;
-
- opts = wtperf->opts;
-
- if (opts->in_memory)
- return (0);
-
- if (!opts->readonly && !opts->reopen_connection)
- return (0);
- /*
- * Reopen the connection. We do this so that the workload phase always
- * starts with the on-disk files, and so that read-only workloads can
- * be identified. This is particularly important for LSM, where the
- * merge algorithm is more aggressive for read-only trees.
- */
- /* wtperf->conn is released no matter the return value from close(). */
- ret = wtperf->conn->close(wtperf->conn, NULL);
- wtperf->conn = NULL;
- if (ret != 0) {
- lprintf(wtperf, ret, 0, "Closing the connection failed");
- return (ret);
- }
- if ((ret = wiredtiger_open(
- wtperf->home, NULL, wtperf->reopen_config, &wtperf->conn)) != 0) {
- lprintf(wtperf, ret, 0, "Re-opening the connection failed");
- return (ret);
- }
- /*
- * If we started async threads only for the purposes of compact,
- * then turn it off before starting the workload so that those extra
- * threads looking for work that will never arrive don't affect
- * performance.
- */
- if (opts->compact && !wtperf->use_asyncops) {
- if ((ret = wtperf->conn->reconfigure(
- wtperf->conn, "async=(enabled=false)")) != 0) {
- lprintf(wtperf, ret, 0, "Reconfigure async off failed");
- return (ret);
- }
- }
- return (0);
+ CONFIG_OPTS *opts;
+ int ret;
+
+ opts = wtperf->opts;
+
+ if (opts->in_memory)
+ return (0);
+
+ if (!opts->readonly && !opts->reopen_connection)
+ return (0);
+ /*
+ * Reopen the connection. We do this so that the workload phase always starts with the on-disk
+ * files, and so that read-only workloads can be identified. This is particularly important for
+ * LSM, where the merge algorithm is more aggressive for read-only trees.
+ */
+ /* wtperf->conn is released no matter the return value from close(). */
+ ret = wtperf->conn->close(wtperf->conn, NULL);
+ wtperf->conn = NULL;
+ if (ret != 0) {
+ lprintf(wtperf, ret, 0, "Closing the connection failed");
+ return (ret);
+ }
+ if ((ret = wiredtiger_open(wtperf->home, NULL, wtperf->reopen_config, &wtperf->conn)) != 0) {
+ lprintf(wtperf, ret, 0, "Re-opening the connection failed");
+ return (ret);
+ }
+ /*
+ * If we started async threads only for the purposes of compact, then turn it off before
+ * starting the workload so that those extra threads looking for work that will never arrive
+ * don't affect performance.
+ */
+ if (opts->compact && !wtperf->use_asyncops) {
+ if ((ret = wtperf->conn->reconfigure(wtperf->conn, "async=(enabled=false)")) != 0) {
+ lprintf(wtperf, ret, 0, "Reconfigure async off failed");
+ return (ret);
+ }
+ }
+ return (0);
}
static int
execute_workload(WTPERF *wtperf)
{
- CONFIG_OPTS *opts;
- WORKLOAD *workp;
- WTPERF_THREAD *threads;
- WT_CONNECTION *conn;
- WT_SESSION **sessions;
- WT_THREAD_CALLBACK(*pfunc)(void *);
- wt_thread_t idle_table_cycle_thread;
- uint64_t last_ckpts, last_inserts, last_reads, last_truncates;
- uint64_t last_updates;
- uint32_t interval, run_ops, run_time;
- u_int i;
- int ret;
-
- opts = wtperf->opts;
-
- wtperf->insert_key = 0;
- wtperf->insert_ops = wtperf->read_ops = wtperf->truncate_ops = 0;
- wtperf->update_ops = 0;
-
- last_ckpts = last_inserts = last_reads = last_truncates = 0;
- last_updates = 0;
- ret = 0;
-
- sessions = NULL;
-
- /* Start cycling idle tables. */
- start_idle_table_cycle(wtperf, &idle_table_cycle_thread);
-
- if (opts->warmup != 0)
- wtperf->in_warmup = true;
-
- /* Allocate memory for the worker threads. */
- wtperf->workers =
- dcalloc((size_t)wtperf->workers_cnt, sizeof(WTPERF_THREAD));
-
- if (wtperf->use_asyncops) {
- lprintf(wtperf, 0, 1, "Starting %" PRIu32 " async thread(s)",
- opts->async_threads);
- pfunc = worker_async;
- } else
- pfunc = worker;
-
- if (opts->session_count_idle != 0) {
- sessions = dcalloc((size_t)opts->session_count_idle,
- sizeof(WT_SESSION *));
- conn = wtperf->conn;
- for (i = 0; i < opts->session_count_idle; ++i)
- if ((ret = conn->open_session(conn,
- NULL, opts->sess_config, &sessions[i])) != 0) {
- lprintf(wtperf, ret, 0,
- "execute_workload: idle open_session");
- goto err;
- }
- }
- /* Start each workload. */
- for (threads = wtperf->workers, i = 0,
- workp = wtperf->workload; i < wtperf->workload_cnt; ++i, ++workp) {
- lprintf(wtperf, 0, 1,
- "Starting workload #%u: %" PRId64 " threads, inserts=%"
- PRId64 ", reads=%" PRId64 ", updates=%" PRId64
- ", truncate=%" PRId64 ", throttle=%" PRIu64,
- i + 1, workp->threads, workp->insert,
- workp->read, workp->update, workp->truncate,
- workp->throttle);
-
- /* Figure out the workload's schedule. */
- if ((ret = run_mix_schedule(wtperf, workp)) != 0)
- goto err;
-
- /* Start the workload's threads. */
- start_threads(
- wtperf, workp, threads, (u_int)workp->threads, pfunc);
- threads += workp->threads;
- }
-
- if (opts->warmup != 0) {
- lprintf(wtperf, 0, 1,
- "Waiting for warmup duration of %" PRIu32, opts->warmup);
- sleep(opts->warmup);
- wtperf->in_warmup = false;
- }
-
- for (interval = opts->report_interval,
- run_time = opts->run_time, run_ops = opts->run_ops;
- !wtperf->error;) {
- /*
- * Sleep for one second at a time.
- * If we are tracking run time, check to see if we're done, and
- * if we're only tracking run time, go back to sleep.
- */
- sleep(1);
- if (run_time != 0) {
- if (--run_time == 0)
- break;
- if (!interval && !run_ops)
- continue;
- }
-
- /* Sum the operations we've done. */
- wtperf->ckpt_ops = sum_ckpt_ops(wtperf);
- wtperf->insert_ops = sum_insert_ops(wtperf);
- wtperf->read_ops = sum_read_ops(wtperf);
- wtperf->update_ops = sum_update_ops(wtperf);
- wtperf->truncate_ops = sum_truncate_ops(wtperf);
-
- /* If we're checking total operations, see if we're done. */
- if (run_ops != 0 && run_ops <=
- wtperf->insert_ops + wtperf->read_ops + wtperf->update_ops)
- break;
-
- /* If writing out throughput information, see if it's time. */
- if (interval == 0 || --interval > 0)
- continue;
- interval = opts->report_interval;
- wtperf->totalsec += opts->report_interval;
-
- lprintf(wtperf, 0, 1,
- "%" PRIu64 " reads, %" PRIu64 " inserts, %" PRIu64
- " updates, %" PRIu64 " truncates, %" PRIu64
- " checkpoints in %" PRIu32 " secs (%" PRIu32 " total secs)",
- wtperf->read_ops - last_reads,
- wtperf->insert_ops - last_inserts,
- wtperf->update_ops - last_updates,
- wtperf->truncate_ops - last_truncates,
- wtperf->ckpt_ops - last_ckpts,
- opts->report_interval, wtperf->totalsec);
- last_reads = wtperf->read_ops;
- last_inserts = wtperf->insert_ops;
- last_updates = wtperf->update_ops;
- last_truncates = wtperf->truncate_ops;
- last_ckpts = wtperf->ckpt_ops;
- }
-
- /* Notify the worker threads they are done. */
-err: wtperf->stop = true;
-
- /* Stop cycling idle tables. */
- stop_idle_table_cycle(wtperf, idle_table_cycle_thread);
-
- stop_threads((u_int)wtperf->workers_cnt, wtperf->workers);
-
- /* Drop tables if configured to and this isn't an error path */
- if (ret == 0 &&
- opts->drop_tables && (ret = drop_all_tables(wtperf)) != 0)
- lprintf(wtperf, ret, 0, "Drop tables failed.");
-
- free(sessions);
- /* Report if any worker threads didn't finish. */
- if (wtperf->error) {
- lprintf(wtperf, WT_ERROR, 0,
- "Worker thread(s) exited without finishing.");
- if (ret == 0)
- ret = WT_ERROR;
- }
- return (ret);
+ CONFIG_OPTS *opts;
+ WORKLOAD *workp;
+ WTPERF_THREAD *threads;
+ WT_CONNECTION *conn;
+ WT_SESSION **sessions;
+ WT_THREAD_CALLBACK (*pfunc)(void *);
+ wt_thread_t idle_table_cycle_thread;
+ uint64_t last_ckpts, last_inserts, last_reads, last_truncates;
+ uint64_t last_updates;
+ uint32_t interval, run_ops, run_time;
+ u_int i;
+ int ret;
+
+ opts = wtperf->opts;
+
+ wtperf->insert_key = 0;
+ wtperf->insert_ops = wtperf->read_ops = wtperf->truncate_ops = 0;
+ wtperf->update_ops = 0;
+
+ last_ckpts = last_inserts = last_reads = last_truncates = 0;
+ last_updates = 0;
+ ret = 0;
+
+ sessions = NULL;
+
+ /* Start cycling idle tables. */
+ start_idle_table_cycle(wtperf, &idle_table_cycle_thread);
+
+ if (opts->warmup != 0)
+ wtperf->in_warmup = true;
+
+ /* Allocate memory for the worker threads. */
+ wtperf->workers = dcalloc((size_t)wtperf->workers_cnt, sizeof(WTPERF_THREAD));
+
+ if (wtperf->use_asyncops) {
+ lprintf(wtperf, 0, 1, "Starting %" PRIu32 " async thread(s)", opts->async_threads);
+ pfunc = worker_async;
+ } else
+ pfunc = worker;
+
+ if (opts->session_count_idle != 0) {
+ sessions = dcalloc((size_t)opts->session_count_idle, sizeof(WT_SESSION *));
+ conn = wtperf->conn;
+ for (i = 0; i < opts->session_count_idle; ++i)
+ if ((ret = conn->open_session(conn, NULL, opts->sess_config, &sessions[i])) != 0) {
+ lprintf(wtperf, ret, 0, "execute_workload: idle open_session");
+ goto err;
+ }
+ }
+ /* Start each workload. */
+ for (threads = wtperf->workers, i = 0, workp = wtperf->workload; i < wtperf->workload_cnt;
+ ++i, ++workp) {
+ lprintf(wtperf, 0, 1,
+ "Starting workload #%u: %" PRId64 " threads, inserts=%" PRId64 ", reads=%" PRId64
+ ", updates=%" PRId64 ", truncate=%" PRId64 ", throttle=%" PRIu64,
+ i + 1, workp->threads, workp->insert, workp->read, workp->update, workp->truncate,
+ workp->throttle);
+
+ /* Figure out the workload's schedule. */
+ if ((ret = run_mix_schedule(wtperf, workp)) != 0)
+ goto err;
+
+ /* Start the workload's threads. */
+ start_threads(wtperf, workp, threads, (u_int)workp->threads, pfunc);
+ threads += workp->threads;
+ }
+
+ if (opts->warmup != 0) {
+ lprintf(wtperf, 0, 1, "Waiting for warmup duration of %" PRIu32, opts->warmup);
+ sleep(opts->warmup);
+ wtperf->in_warmup = false;
+ }
+
+ for (interval = opts->report_interval, run_time = opts->run_time, run_ops = opts->run_ops;
+ !wtperf->error;) {
+ /*
+ * Sleep for one second at a time. If we are tracking run time, check to see if we're done,
+ * and if we're only tracking run time, go back to sleep.
+ */
+ sleep(1);
+ if (run_time != 0) {
+ if (--run_time == 0)
+ break;
+ if (!interval && !run_ops)
+ continue;
+ }
+
+ /* Sum the operations we've done. */
+ wtperf->ckpt_ops = sum_ckpt_ops(wtperf);
+ wtperf->insert_ops = sum_insert_ops(wtperf);
+ wtperf->read_ops = sum_read_ops(wtperf);
+ wtperf->update_ops = sum_update_ops(wtperf);
+ wtperf->truncate_ops = sum_truncate_ops(wtperf);
+
+ /* If we're checking total operations, see if we're done. */
+ if (run_ops != 0 && run_ops <= wtperf->insert_ops + wtperf->read_ops + wtperf->update_ops)
+ break;
+
+ /* If writing out throughput information, see if it's time. */
+ if (interval == 0 || --interval > 0)
+ continue;
+ interval = opts->report_interval;
+ wtperf->totalsec += opts->report_interval;
+
+ lprintf(wtperf, 0, 1,
+ "%" PRIu64 " reads, %" PRIu64 " inserts, %" PRIu64 " updates, %" PRIu64
+ " truncates, %" PRIu64 " checkpoints in %" PRIu32 " secs (%" PRIu32 " total secs)",
+ wtperf->read_ops - last_reads, wtperf->insert_ops - last_inserts,
+ wtperf->update_ops - last_updates, wtperf->truncate_ops - last_truncates,
+ wtperf->ckpt_ops - last_ckpts, opts->report_interval, wtperf->totalsec);
+ last_reads = wtperf->read_ops;
+ last_inserts = wtperf->insert_ops;
+ last_updates = wtperf->update_ops;
+ last_truncates = wtperf->truncate_ops;
+ last_ckpts = wtperf->ckpt_ops;
+ }
+
+/* Notify the worker threads they are done. */
+err:
+ wtperf->stop = true;
+
+ /* Stop cycling idle tables. */
+ stop_idle_table_cycle(wtperf, idle_table_cycle_thread);
+
+ stop_threads((u_int)wtperf->workers_cnt, wtperf->workers);
+
+ /* Drop tables if configured to and this isn't an error path */
+ if (ret == 0 && opts->drop_tables && (ret = drop_all_tables(wtperf)) != 0)
+ lprintf(wtperf, ret, 0, "Drop tables failed.");
+
+ free(sessions);
+ /* Report if any worker threads didn't finish. */
+ if (wtperf->error) {
+ lprintf(wtperf, WT_ERROR, 0, "Worker thread(s) exited without finishing.");
+ if (ret == 0)
+ ret = WT_ERROR;
+ }
+ return (ret);
}
/*
- * Ensure that icount matches the number of records in the
- * existing table.
+ * Ensure that icount matches the number of records in the existing table.
*/
static int
find_table_count(WTPERF *wtperf)
{
- CONFIG_OPTS *opts;
- WT_CONNECTION *conn;
- WT_CURSOR *cursor;
- WT_SESSION *session;
- uint32_t i, max_icount, table_icount;
- int ret, t_ret;
- char *key;
-
- opts = wtperf->opts;
- conn = wtperf->conn;
-
- max_icount = 0;
- if ((ret = conn->open_session(
- conn, NULL, opts->sess_config, &session)) != 0) {
- lprintf(wtperf, ret, 0,
- "find_table_count: open_session failed");
- goto out;
- }
- for (i = 0; i < opts->table_count; i++) {
- if ((ret = session->open_cursor(session, wtperf->uris[i],
- NULL, NULL, &cursor)) != 0) {
- lprintf(wtperf, ret, 0,
- "find_table_count: open_cursor failed");
- goto err;
- }
- if ((ret = cursor->prev(cursor)) != 0) {
- lprintf(wtperf, ret, 0,
- "find_table_count: cursor prev failed");
- goto err;
- }
- if ((ret = cursor->get_key(cursor, &key)) != 0) {
- lprintf(wtperf, ret, 0,
- "find_table_count: cursor get_key failed");
- goto err;
- }
- table_icount = (uint32_t)atoi(key);
- if (table_icount > max_icount)
- max_icount = table_icount;
-
- if ((ret = cursor->close(cursor)) != 0) {
- lprintf(wtperf, ret, 0,
- "find_table_count: cursor close failed");
- goto err;
- }
- }
-err: if ((t_ret = session->close(session, NULL)) != 0) {
- if (ret == 0)
- ret = t_ret;
- lprintf(wtperf, ret, 0,
- "find_table_count: session close failed");
- }
- opts->icount = max_icount;
-out: return (ret);
+ CONFIG_OPTS *opts;
+ WT_CONNECTION *conn;
+ WT_CURSOR *cursor;
+ WT_SESSION *session;
+ uint32_t i, max_icount, table_icount;
+ int ret, t_ret;
+ char *key;
+
+ opts = wtperf->opts;
+ conn = wtperf->conn;
+
+ max_icount = 0;
+ if ((ret = conn->open_session(conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0, "find_table_count: open_session failed");
+ goto out;
+ }
+ for (i = 0; i < opts->table_count; i++) {
+ if ((ret = session->open_cursor(session, wtperf->uris[i], NULL, NULL, &cursor)) != 0) {
+ lprintf(wtperf, ret, 0, "find_table_count: open_cursor failed");
+ goto err;
+ }
+ if ((ret = cursor->prev(cursor)) != 0) {
+ lprintf(wtperf, ret, 0, "find_table_count: cursor prev failed");
+ goto err;
+ }
+ if ((ret = cursor->get_key(cursor, &key)) != 0) {
+ lprintf(wtperf, ret, 0, "find_table_count: cursor get_key failed");
+ goto err;
+ }
+ table_icount = (uint32_t)atoi(key);
+ if (table_icount > max_icount)
+ max_icount = table_icount;
+
+ if ((ret = cursor->close(cursor)) != 0) {
+ lprintf(wtperf, ret, 0, "find_table_count: cursor close failed");
+ goto err;
+ }
+ }
+err:
+ if ((t_ret = session->close(session, NULL)) != 0) {
+ if (ret == 0)
+ ret = t_ret;
+ lprintf(wtperf, ret, 0, "find_table_count: session close failed");
+ }
+ opts->icount = max_icount;
+out:
+ return (ret);
}
/*
@@ -1969,460 +1799,424 @@ out: return (ret);
static void
create_uris(WTPERF *wtperf)
{
- CONFIG_OPTS *opts;
- size_t len;
- uint32_t i;
-
- opts = wtperf->opts;
-
- wtperf->uris = dcalloc(opts->table_count, sizeof(char *));
- len = strlen("table:") + strlen(opts->table_name) + 20;
- for (i = 0; i < opts->table_count; i++) {
- /* If there is only one table, just use the base name. */
- wtperf->uris[i] = dmalloc(len);
- if (opts->table_count == 1)
- testutil_check(__wt_snprintf(wtperf->uris[i],
- len, "table:%s", opts->table_name));
- else
- testutil_check(__wt_snprintf(wtperf->uris[i],
- len, "table:%s%05d", opts->table_name, i));
- }
-
- /* Create the log-like-table URI. */
- len = strlen("table:") +
- strlen(opts->table_name) + strlen("_log_table") + 1;
- wtperf->log_table_uri = dmalloc(len);
- testutil_check(__wt_snprintf(wtperf->log_table_uri,
- len, "table:%s_log_table", opts->table_name));
+ CONFIG_OPTS *opts;
+ size_t len;
+ uint32_t i;
+
+ opts = wtperf->opts;
+
+ wtperf->uris = dcalloc(opts->table_count, sizeof(char *));
+ len = strlen("table:") + strlen(opts->table_name) + 20;
+ for (i = 0; i < opts->table_count; i++) {
+ /* If there is only one table, just use the base name. */
+ wtperf->uris[i] = dmalloc(len);
+ if (opts->table_count == 1)
+ testutil_check(__wt_snprintf(wtperf->uris[i], len, "table:%s", opts->table_name));
+ else
+ testutil_check(
+ __wt_snprintf(wtperf->uris[i], len, "table:%s%05d", opts->table_name, i));
+ }
+
+ /* Create the log-like-table URI. */
+ len = strlen("table:") + strlen(opts->table_name) + strlen("_log_table") + 1;
+ wtperf->log_table_uri = dmalloc(len);
+ testutil_check(
+ __wt_snprintf(wtperf->log_table_uri, len, "table:%s_log_table", opts->table_name));
}
static int
create_tables(WTPERF *wtperf)
{
- CONFIG_OPTS *opts;
- WT_SESSION *session;
- size_t i;
- int ret;
- char buf[512];
-
- opts = wtperf->opts;
-
- if ((ret = wtperf->conn->open_session(
- wtperf->conn, NULL, opts->sess_config, &session)) != 0) {
- lprintf(wtperf, ret, 0,
- "Error opening a session on %s", wtperf->home);
- return (ret);
- }
-
- for (i = 0; i < opts->table_count_idle; i++) {
- testutil_check(__wt_snprintf(
- buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i));
- if ((ret = session->create(
- session, buf, opts->table_config)) != 0) {
- lprintf(wtperf, ret, 0,
- "Error creating idle table %s", buf);
- return (ret);
- }
- }
- if (opts->log_like_table && (ret = session->create(session,
- wtperf->log_table_uri, "key_format=Q,value_format=S")) != 0) {
- lprintf(wtperf, ret, 0, "Error creating log table %s", buf);
- return (ret);
- }
-
- for (i = 0; i < opts->table_count; i++) {
- if (opts->log_partial && i > 0) {
- if (((ret = session->create(session,
- wtperf->uris[i], wtperf->partial_config)) != 0)) {
- lprintf(wtperf, ret, 0,
- "Error creating table %s", wtperf->uris[i]);
- return (ret);
- }
- } else if ((ret = session->create(
- session, wtperf->uris[i], opts->table_config)) != 0) {
- lprintf(wtperf, ret, 0,
- "Error creating table %s", wtperf->uris[i]);
- return (ret);
- }
- if (opts->index) {
- testutil_check(__wt_snprintf(buf, 512,
- "index:%s:val_idx",
- wtperf->uris[i] + strlen("table:")));
- if ((ret = session->create(
- session, buf, "columns=(val)")) != 0) {
- lprintf(wtperf, ret, 0,
- "Error creating index %s", buf);
- return (ret);
- }
- }
- }
-
- if ((ret = session->close(session, NULL)) != 0) {
- lprintf(wtperf, ret, 0, "Error closing session");
- return (ret);
- }
-
- return (0);
+ CONFIG_OPTS *opts;
+ WT_SESSION *session;
+ size_t i;
+ int ret;
+ char buf[512];
+
+ opts = wtperf->opts;
+
+ if ((ret = wtperf->conn->open_session(wtperf->conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0, "Error opening a session on %s", wtperf->home);
+ return (ret);
+ }
+
+ for (i = 0; i < opts->table_count_idle; i++) {
+ testutil_check(__wt_snprintf(buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i));
+ if ((ret = session->create(session, buf, opts->table_config)) != 0) {
+ lprintf(wtperf, ret, 0, "Error creating idle table %s", buf);
+ return (ret);
+ }
+ }
+ if (opts->log_like_table &&
+ (ret = session->create(session, wtperf->log_table_uri, "key_format=Q,value_format=S")) != 0) {
+ lprintf(wtperf, ret, 0, "Error creating log table %s", buf);
+ return (ret);
+ }
+
+ for (i = 0; i < opts->table_count; i++) {
+ if (opts->log_partial && i > 0) {
+ if (((ret = session->create(session, wtperf->uris[i], wtperf->partial_config)) != 0)) {
+ lprintf(wtperf, ret, 0, "Error creating table %s", wtperf->uris[i]);
+ return (ret);
+ }
+ } else if ((ret = session->create(session, wtperf->uris[i], opts->table_config)) != 0) {
+ lprintf(wtperf, ret, 0, "Error creating table %s", wtperf->uris[i]);
+ return (ret);
+ }
+ if (opts->index) {
+ testutil_check(
+ __wt_snprintf(buf, 512, "index:%s:val_idx", wtperf->uris[i] + strlen("table:")));
+ if ((ret = session->create(session, buf, "columns=(val)")) != 0) {
+ lprintf(wtperf, ret, 0, "Error creating index %s", buf);
+ return (ret);
+ }
+ }
+ }
+
+ if ((ret = session->close(session, NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Error closing session");
+ return (ret);
+ }
+
+ return (0);
}
/*
* wtperf_copy --
- * Create a new WTPERF structure as a duplicate of a previous one.
+ * Create a new WTPERF structure as a duplicate of a previous one.
*/
static void
wtperf_copy(const WTPERF *src, WTPERF **retp)
{
- CONFIG_OPTS *opts;
- WTPERF *dest;
- size_t i;
+ CONFIG_OPTS *opts;
+ WTPERF *dest;
+ size_t i;
- opts = src->opts;
+ opts = src->opts;
- dest = dcalloc(1, sizeof(WTPERF));
+ dest = dcalloc(1, sizeof(WTPERF));
- /*
- * Don't copy the home and monitor directories, they are filled in by
- * our caller, explicitly.
- */
+ /*
+ * Don't copy the home and monitor directories, they are filled in by our caller, explicitly.
+ */
- if (src->partial_config != NULL)
- dest->partial_config = dstrdup(src->partial_config);
- if (src->reopen_config != NULL)
- dest->reopen_config = dstrdup(src->reopen_config);
+ if (src->partial_config != NULL)
+ dest->partial_config = dstrdup(src->partial_config);
+ if (src->reopen_config != NULL)
+ dest->reopen_config = dstrdup(src->reopen_config);
- if (src->uris != NULL) {
- dest->uris = dcalloc(opts->table_count, sizeof(char *));
- for (i = 0; i < opts->table_count; i++)
- dest->uris[i] = dstrdup(src->uris[i]);
- }
+ if (src->uris != NULL) {
+ dest->uris = dcalloc(opts->table_count, sizeof(char *));
+ for (i = 0; i < opts->table_count; i++)
+ dest->uris[i] = dstrdup(src->uris[i]);
+ }
- if (src->async_config != NULL)
- dest->async_config = dstrdup(src->async_config);
+ if (src->async_config != NULL)
+ dest->async_config = dstrdup(src->async_config);
- dest->ckptthreads = NULL;
- dest->popthreads = NULL;
+ dest->ckptthreads = NULL;
+ dest->popthreads = NULL;
- dest->workers = NULL;
- dest->workers_cnt = src->workers_cnt;
- if (src->workload_cnt != 0) {
- dest->workload_cnt = src->workload_cnt;
- dest->workload = dcalloc(src->workload_cnt, sizeof(WORKLOAD));
- memcpy(dest->workload,
- src->workload, src->workload_cnt * sizeof(WORKLOAD));
- }
+ dest->workers = NULL;
+ dest->workers_cnt = src->workers_cnt;
+ if (src->workload_cnt != 0) {
+ dest->workload_cnt = src->workload_cnt;
+ dest->workload = dcalloc(src->workload_cnt, sizeof(WORKLOAD));
+ memcpy(dest->workload, src->workload, src->workload_cnt * sizeof(WORKLOAD));
+ }
- TAILQ_INIT(&dest->stone_head);
+ TAILQ_INIT(&dest->stone_head);
- dest->opts = src->opts;
+ dest->opts = src->opts;
- *retp = dest;
+ *retp = dest;
}
/*
* wtperf_free --
- * Free any storage allocated in the WTPERF structure.
+ * Free any storage allocated in the WTPERF structure.
*/
static void
wtperf_free(WTPERF *wtperf)
{
- CONFIG_OPTS *opts;
- size_t i;
+ CONFIG_OPTS *opts;
+ size_t i;
- opts = wtperf->opts;
+ opts = wtperf->opts;
- free(wtperf->home);
- free(wtperf->monitor_dir);
- free(wtperf->partial_config);
- free(wtperf->reopen_config);
- free(wtperf->log_table_uri);
+ free(wtperf->home);
+ free(wtperf->monitor_dir);
+ free(wtperf->partial_config);
+ free(wtperf->reopen_config);
+ free(wtperf->log_table_uri);
- if (wtperf->uris != NULL) {
- for (i = 0; i < opts->table_count; i++)
- free(wtperf->uris[i]);
- free(wtperf->uris);
- }
+ if (wtperf->uris != NULL) {
+ for (i = 0; i < opts->table_count; i++)
+ free(wtperf->uris[i]);
+ free(wtperf->uris);
+ }
- free(wtperf->async_config);
+ free(wtperf->async_config);
- free(wtperf->ckptthreads);
- free(wtperf->popthreads);
+ free(wtperf->ckptthreads);
+ free(wtperf->popthreads);
- free(wtperf->workers);
- free(wtperf->workload);
+ free(wtperf->workers);
+ free(wtperf->workload);
- cleanup_truncate_config(wtperf);
+ cleanup_truncate_config(wtperf);
}
/*
* config_compress --
- * Parse the compression configuration.
+ * Parse the compression configuration.
*/
static int
config_compress(WTPERF *wtperf)
{
- CONFIG_OPTS *opts;
- int ret;
- const char *s;
-
- opts = wtperf->opts;
- ret = 0;
-
- s = opts->compression;
- if (strcmp(s, "none") == 0) {
- wtperf->compress_ext = NULL;
- wtperf->compress_table = NULL;
- } else if (strcmp(s, "lz4") == 0) {
+ CONFIG_OPTS *opts;
+ int ret;
+ const char *s;
+
+ opts = wtperf->opts;
+ ret = 0;
+
+ s = opts->compression;
+ if (strcmp(s, "none") == 0) {
+ wtperf->compress_ext = NULL;
+ wtperf->compress_table = NULL;
+ } else if (strcmp(s, "lz4") == 0) {
#ifndef HAVE_BUILTIN_EXTENSION_LZ4
- wtperf->compress_ext = LZ4_EXT;
+ wtperf->compress_ext = LZ4_EXT;
#endif
- wtperf->compress_table = LZ4_BLK;
- } else if (strcmp(s, "snappy") == 0) {
+ wtperf->compress_table = LZ4_BLK;
+ } else if (strcmp(s, "snappy") == 0) {
#ifndef HAVE_BUILTIN_EXTENSION_SNAPPY
- wtperf->compress_ext = SNAPPY_EXT;
+ wtperf->compress_ext = SNAPPY_EXT;
#endif
- wtperf->compress_table = SNAPPY_BLK;
- } else if (strcmp(s, "zlib") == 0) {
+ wtperf->compress_table = SNAPPY_BLK;
+ } else if (strcmp(s, "zlib") == 0) {
#ifndef HAVE_BUILTIN_EXTENSION_ZLIB
- wtperf->compress_ext = ZLIB_EXT;
+ wtperf->compress_ext = ZLIB_EXT;
#endif
- wtperf->compress_table = ZLIB_BLK;
- } else if (strcmp(s, "zstd") == 0) {
+ wtperf->compress_table = ZLIB_BLK;
+ } else if (strcmp(s, "zstd") == 0) {
#ifndef HAVE_BUILTIN_EXTENSION_ZSTD
- wtperf->compress_ext = ZSTD_EXT;
+ wtperf->compress_ext = ZSTD_EXT;
#endif
- wtperf->compress_table = ZSTD_BLK;
- } else {
- fprintf(stderr,
- "invalid compression configuration: %s\n", s);
- ret = EINVAL;
- }
- return (ret);
-
+ wtperf->compress_table = ZSTD_BLK;
+ } else {
+ fprintf(stderr, "invalid compression configuration: %s\n", s);
+ ret = EINVAL;
+ }
+ return (ret);
}
static int
start_all_runs(WTPERF *wtperf)
{
- CONFIG_OPTS *opts;
- WTPERF *next_wtperf, **wtperfs;
- size_t i, len;
- wt_thread_t *threads;
- int ret;
-
- opts = wtperf->opts;
- wtperfs = NULL;
- ret = 0;
-
- if (opts->database_count == 1)
- return (start_run(wtperf));
-
- /* Allocate an array to hold our WTPERF copies. */
- wtperfs = dcalloc(opts->database_count, sizeof(WTPERF *));
-
- /* Allocate an array to hold our thread IDs. */
- threads = dcalloc(opts->database_count, sizeof(*threads));
-
- for (i = 0; i < opts->database_count; i++) {
- wtperf_copy(wtperf, &next_wtperf);
- wtperfs[i] = next_wtperf;
-
- /*
- * Set up unique home/monitor directories for each database.
- * Re-create the directories if creating the databases.
- */
- len = strlen(wtperf->home) + 5;
- next_wtperf->home = dmalloc(len);
- testutil_check(__wt_snprintf(
- next_wtperf->home, len, "%s/D%02d", wtperf->home, (int)i));
- if (opts->create != 0)
- recreate_dir(next_wtperf->home);
-
- len = strlen(wtperf->monitor_dir) + 5;
- next_wtperf->monitor_dir = dmalloc(len);
- testutil_check(__wt_snprintf(next_wtperf->monitor_dir,
- len, "%s/D%02d", wtperf->monitor_dir, (int)i));
- if (opts->create != 0 &&
- strcmp(next_wtperf->home, next_wtperf->monitor_dir) != 0)
- recreate_dir(next_wtperf->monitor_dir);
-
- testutil_check(__wt_thread_create(NULL,
- &threads[i], thread_run_wtperf, next_wtperf));
- }
-
- /* Wait for threads to finish. */
- for (i = 0; i < opts->database_count; i++)
- testutil_check(__wt_thread_join(NULL, &threads[i]));
-
- for (i = 0; i < opts->database_count && wtperfs[i] != NULL; i++) {
- wtperf_free(wtperfs[i]);
- free(wtperfs[i]);
- }
- free(wtperfs);
- free(threads);
-
- return (ret);
+ CONFIG_OPTS *opts;
+ WTPERF *next_wtperf, **wtperfs;
+ size_t i, len;
+ wt_thread_t *threads;
+ int ret;
+
+ opts = wtperf->opts;
+ wtperfs = NULL;
+ ret = 0;
+
+ if (opts->database_count == 1)
+ return (start_run(wtperf));
+
+ /* Allocate an array to hold our WTPERF copies. */
+ wtperfs = dcalloc(opts->database_count, sizeof(WTPERF *));
+
+ /* Allocate an array to hold our thread IDs. */
+ threads = dcalloc(opts->database_count, sizeof(*threads));
+
+ for (i = 0; i < opts->database_count; i++) {
+ wtperf_copy(wtperf, &next_wtperf);
+ wtperfs[i] = next_wtperf;
+
+ /*
+ * Set up unique home/monitor directories for each database. Re-create the directories if
+ * creating the databases.
+ */
+ len = strlen(wtperf->home) + 5;
+ next_wtperf->home = dmalloc(len);
+ testutil_check(__wt_snprintf(next_wtperf->home, len, "%s/D%02d", wtperf->home, (int)i));
+ if (opts->create != 0)
+ recreate_dir(next_wtperf->home);
+
+ len = strlen(wtperf->monitor_dir) + 5;
+ next_wtperf->monitor_dir = dmalloc(len);
+ testutil_check(
+ __wt_snprintf(next_wtperf->monitor_dir, len, "%s/D%02d", wtperf->monitor_dir, (int)i));
+ if (opts->create != 0 && strcmp(next_wtperf->home, next_wtperf->monitor_dir) != 0)
+ recreate_dir(next_wtperf->monitor_dir);
+
+ testutil_check(__wt_thread_create(NULL, &threads[i], thread_run_wtperf, next_wtperf));
+ }
+
+ /* Wait for threads to finish. */
+ for (i = 0; i < opts->database_count; i++)
+ testutil_check(__wt_thread_join(NULL, &threads[i]));
+
+ for (i = 0; i < opts->database_count && wtperfs[i] != NULL; i++) {
+ wtperf_free(wtperfs[i]);
+ free(wtperfs[i]);
+ }
+ free(wtperfs);
+ free(threads);
+
+ return (ret);
}
/* Run an instance of wtperf for a given configuration. */
static WT_THREAD_RET
thread_run_wtperf(void *arg)
{
- WTPERF *wtperf;
- int ret;
+ WTPERF *wtperf;
+ int ret;
- wtperf = (WTPERF *)arg;
- if ((ret = start_run(wtperf)) != 0)
- lprintf(wtperf, ret, 0, "Run failed for: %s.", wtperf->home);
- return (WT_THREAD_RET_VALUE);
+ wtperf = (WTPERF *)arg;
+ if ((ret = start_run(wtperf)) != 0)
+ lprintf(wtperf, ret, 0, "Run failed for: %s.", wtperf->home);
+ return (WT_THREAD_RET_VALUE);
}
static int
start_run(WTPERF *wtperf)
{
- CONFIG_OPTS *opts;
- wt_thread_t monitor_thread;
- uint64_t total_ops;
- uint32_t run_time;
- int monitor_created, ret, t_ret;
-
- opts = wtperf->opts;
- monitor_created = ret = 0;
- /* [-Wconditional-uninitialized] */
- memset(&monitor_thread, 0, sizeof(monitor_thread));
-
- if ((ret = setup_log_file(wtperf)) != 0)
- goto err;
-
- if ((ret = wiredtiger_open( /* Open the real connection. */
- wtperf->home, NULL, opts->conn_config, &wtperf->conn)) != 0) {
- lprintf(wtperf, ret, 0, "Error connecting to %s", wtperf->home);
- goto err;
- }
-
- create_uris(wtperf);
-
- /* If creating, create the tables. */
- if (opts->create != 0 && (ret = create_tables(wtperf)) != 0)
- goto err;
-
- /* Start the monitor thread. */
- if (opts->sample_interval != 0) {
- testutil_check(__wt_thread_create(
- NULL, &monitor_thread, monitor, wtperf));
- monitor_created = 1;
- }
-
- /* If creating, populate the table. */
- if (opts->create != 0 && execute_populate(wtperf) != 0)
- goto err;
-
- /* Optional workload. */
- if (wtperf->workers_cnt != 0 &&
- (opts->run_time != 0 || opts->run_ops != 0)) {
- /*
- * If we have a workload, close and reopen the connection so
- * that LSM can detect read-only workloads.
- */
- if (close_reopen(wtperf) != 0)
- goto err;
-
- /* Didn't create, set insert count. */
- if (opts->create == 0 &&
- opts->random_range == 0 && find_table_count(wtperf) != 0)
- goto err;
- /* Start the checkpoint thread. */
- if (opts->checkpoint_threads != 0) {
- lprintf(wtperf, 0, 1,
- "Starting %" PRIu32 " checkpoint thread(s)",
- opts->checkpoint_threads);
- wtperf->ckptthreads = dcalloc(
- opts->checkpoint_threads, sizeof(WTPERF_THREAD));
- start_threads(wtperf, NULL, wtperf->ckptthreads,
- opts->checkpoint_threads, checkpoint_worker);
- }
- if (opts->pre_load_data)
- pre_load_data(wtperf);
-
- /* Execute the workload. */
- if ((ret = execute_workload(wtperf)) != 0)
- goto err;
-
- /* One final summation of the operations we've completed. */
- wtperf->read_ops = sum_read_ops(wtperf);
- wtperf->insert_ops = sum_insert_ops(wtperf);
- wtperf->truncate_ops = sum_truncate_ops(wtperf);
- wtperf->update_ops = sum_update_ops(wtperf);
- wtperf->ckpt_ops = sum_ckpt_ops(wtperf);
- total_ops =
- wtperf->read_ops + wtperf->insert_ops + wtperf->update_ops;
-
- run_time = opts->run_time == 0 ? 1 : opts->run_time;
- lprintf(wtperf, 0, 1,
- "Executed %" PRIu64 " read operations (%" PRIu64
- "%%) %" PRIu64 " ops/sec",
- wtperf->read_ops, (wtperf->read_ops * 100) / total_ops,
- wtperf->read_ops / run_time);
- lprintf(wtperf, 0, 1,
- "Executed %" PRIu64 " insert operations (%" PRIu64
- "%%) %" PRIu64 " ops/sec",
- wtperf->insert_ops, (wtperf->insert_ops * 100) / total_ops,
- wtperf->insert_ops / run_time);
- lprintf(wtperf, 0, 1,
- "Executed %" PRIu64 " truncate operations (%" PRIu64
- "%%) %" PRIu64 " ops/sec",
- wtperf->truncate_ops,
- (wtperf->truncate_ops * 100) / total_ops,
- wtperf->truncate_ops / run_time);
- lprintf(wtperf, 0, 1,
- "Executed %" PRIu64 " update operations (%" PRIu64
- "%%) %" PRIu64 " ops/sec",
- wtperf->update_ops, (wtperf->update_ops * 100) / total_ops,
- wtperf->update_ops / run_time);
- lprintf(wtperf, 0, 1,
- "Executed %" PRIu64 " checkpoint operations",
- wtperf->ckpt_ops);
-
- latency_print(wtperf);
- }
-
- if (0) {
-err: if (ret == 0)
- ret = EXIT_FAILURE;
- }
-
- /* Notify the worker threads they are done. */
- wtperf->stop = true;
-
- stop_threads(1, wtperf->ckptthreads);
-
- if (monitor_created != 0)
- testutil_check(__wt_thread_join(NULL, &monitor_thread));
-
- if (wtperf->conn != NULL && opts->close_conn &&
- (t_ret = wtperf->conn->close(wtperf->conn, NULL)) != 0) {
- lprintf(wtperf, t_ret, 0,
- "Error closing connection to %s", wtperf->home);
- if (ret == 0)
- ret = t_ret;
- }
-
- if (ret == 0) {
- if (opts->run_time == 0 && opts->run_ops == 0)
- lprintf(wtperf, 0, 1, "Run completed");
- else
- lprintf(wtperf, 0, 1, "Run completed: %" PRIu32 " %s",
- opts->run_time == 0 ?
- opts->run_ops : opts->run_time,
- opts->run_time == 0 ? "operations" : "seconds");
- }
-
- if (wtperf->logf != NULL) {
- if ((t_ret = fflush(wtperf->logf)) != 0 && ret == 0)
- ret = t_ret;
- if ((t_ret = fclose(wtperf->logf)) != 0 && ret == 0)
- ret = t_ret;
- }
- return (ret);
+ CONFIG_OPTS *opts;
+ wt_thread_t monitor_thread;
+ uint64_t total_ops;
+ uint32_t run_time;
+ int monitor_created, ret, t_ret;
+
+ opts = wtperf->opts;
+ monitor_created = ret = 0;
+ /* [-Wconditional-uninitialized] */
+ memset(&monitor_thread, 0, sizeof(monitor_thread));
+
+ if ((ret = setup_log_file(wtperf)) != 0)
+ goto err;
+
+ if ((ret = wiredtiger_open(/* Open the real connection. */
+ wtperf->home, NULL, opts->conn_config, &wtperf->conn)) != 0) {
+ lprintf(wtperf, ret, 0, "Error connecting to %s", wtperf->home);
+ goto err;
+ }
+
+ create_uris(wtperf);
+
+ /* If creating, create the tables. */
+ if (opts->create != 0 && (ret = create_tables(wtperf)) != 0)
+ goto err;
+
+ /* Start the monitor thread. */
+ if (opts->sample_interval != 0) {
+ testutil_check(__wt_thread_create(NULL, &monitor_thread, monitor, wtperf));
+ monitor_created = 1;
+ }
+
+ /* If creating, populate the table. */
+ if (opts->create != 0 && execute_populate(wtperf) != 0)
+ goto err;
+
+ /* Optional workload. */
+ if (wtperf->workers_cnt != 0 && (opts->run_time != 0 || opts->run_ops != 0)) {
+ /*
+ * If we have a workload, close and reopen the connection so that LSM can detect read-only
+ * workloads.
+ */
+ if (close_reopen(wtperf) != 0)
+ goto err;
+
+ /* Didn't create, set insert count. */
+ if (opts->create == 0 && opts->random_range == 0 && find_table_count(wtperf) != 0)
+ goto err;
+ /* Start the checkpoint thread. */
+ if (opts->checkpoint_threads != 0) {
+ lprintf(
+ wtperf, 0, 1, "Starting %" PRIu32 " checkpoint thread(s)", opts->checkpoint_threads);
+ wtperf->ckptthreads = dcalloc(opts->checkpoint_threads, sizeof(WTPERF_THREAD));
+ start_threads(
+ wtperf, NULL, wtperf->ckptthreads, opts->checkpoint_threads, checkpoint_worker);
+ }
+ if (opts->pre_load_data)
+ pre_load_data(wtperf);
+
+ /* Execute the workload. */
+ if ((ret = execute_workload(wtperf)) != 0)
+ goto err;
+
+ /* One final summation of the operations we've completed. */
+ wtperf->read_ops = sum_read_ops(wtperf);
+ wtperf->insert_ops = sum_insert_ops(wtperf);
+ wtperf->truncate_ops = sum_truncate_ops(wtperf);
+ wtperf->update_ops = sum_update_ops(wtperf);
+ wtperf->ckpt_ops = sum_ckpt_ops(wtperf);
+ total_ops = wtperf->read_ops + wtperf->insert_ops + wtperf->update_ops;
+
+ run_time = opts->run_time == 0 ? 1 : opts->run_time;
+ lprintf(wtperf, 0, 1,
+ "Executed %" PRIu64 " read operations (%" PRIu64 "%%) %" PRIu64 " ops/sec",
+ wtperf->read_ops, (wtperf->read_ops * 100) / total_ops, wtperf->read_ops / run_time);
+ lprintf(wtperf, 0, 1,
+ "Executed %" PRIu64 " insert operations (%" PRIu64 "%%) %" PRIu64 " ops/sec",
+ wtperf->insert_ops, (wtperf->insert_ops * 100) / total_ops,
+ wtperf->insert_ops / run_time);
+ lprintf(wtperf, 0, 1,
+ "Executed %" PRIu64 " truncate operations (%" PRIu64 "%%) %" PRIu64 " ops/sec",
+ wtperf->truncate_ops, (wtperf->truncate_ops * 100) / total_ops,
+ wtperf->truncate_ops / run_time);
+ lprintf(wtperf, 0, 1,
+ "Executed %" PRIu64 " update operations (%" PRIu64 "%%) %" PRIu64 " ops/sec",
+ wtperf->update_ops, (wtperf->update_ops * 100) / total_ops,
+ wtperf->update_ops / run_time);
+ lprintf(wtperf, 0, 1, "Executed %" PRIu64 " checkpoint operations", wtperf->ckpt_ops);
+
+ latency_print(wtperf);
+ }
+
+ if (0) {
+err:
+ if (ret == 0)
+ ret = EXIT_FAILURE;
+ }
+
+ /* Notify the worker threads they are done. */
+ wtperf->stop = true;
+
+ stop_threads(1, wtperf->ckptthreads);
+
+ if (monitor_created != 0)
+ testutil_check(__wt_thread_join(NULL, &monitor_thread));
+
+ if (wtperf->conn != NULL && opts->close_conn &&
+ (t_ret = wtperf->conn->close(wtperf->conn, NULL)) != 0) {
+ lprintf(wtperf, t_ret, 0, "Error closing connection to %s", wtperf->home);
+ if (ret == 0)
+ ret = t_ret;
+ }
+
+ if (ret == 0) {
+ if (opts->run_time == 0 && opts->run_ops == 0)
+ lprintf(wtperf, 0, 1, "Run completed");
+ else
+ lprintf(wtperf, 0, 1, "Run completed: %" PRIu32 " %s",
+ opts->run_time == 0 ? opts->run_ops : opts->run_time,
+ opts->run_time == 0 ? "operations" : "seconds");
+ }
+
+ if (wtperf->logf != NULL) {
+ if ((t_ret = fflush(wtperf->logf)) != 0 && ret == 0)
+ ret = t_ret;
+ if ((t_ret = fclose(wtperf->logf)) != 0 && ret == 0)
+ ret = t_ret;
+ }
+ return (ret);
}
extern int __wt_optind, __wt_optreset;
@@ -2430,546 +2224,501 @@ extern char *__wt_optarg;
/*
* usage --
- * wtperf usage print, no error.
+ * wtperf usage print, no error.
*/
static void
usage(void)
{
- printf("wtperf [-C config] "
- "[-H mount] [-h home] [-O file] [-o option] [-T config]\n");
- printf("\t-C <string> additional connection configuration\n");
- printf("\t (added to option conn_config)\n");
- printf("\t-H <mount> configure Helium volume mount point\n");
- printf("\t-h <string> Wired Tiger home must exist, default WT_TEST\n");
- printf("\t-O <file> file contains options as listed below\n");
- printf("\t-o option=val[,option=val,...] set options listed below\n");
- printf("\t-T <string> additional table configuration\n");
- printf("\t (added to option table_config)\n");
- printf("\n");
- config_opt_usage();
+ printf(
+ "wtperf [-C config] "
+ "[-H mount] [-h home] [-O file] [-o option] [-T config]\n");
+ printf("\t-C <string> additional connection configuration\n");
+ printf("\t (added to option conn_config)\n");
+ printf("\t-H <mount> configure Helium volume mount point\n");
+ printf("\t-h <string> Wired Tiger home must exist, default WT_TEST\n");
+ printf("\t-O <file> file contains options as listed below\n");
+ printf("\t-o option=val[,option=val,...] set options listed below\n");
+ printf("\t-T <string> additional table configuration\n");
+ printf("\t (added to option table_config)\n");
+ printf("\n");
+ config_opt_usage();
}
int
main(int argc, char *argv[])
{
- CONFIG_OPTS *opts;
- WTPERF *wtperf, _wtperf;
- size_t pos, req_len, sreq_len;
- bool monitor_set;
- int ch, ret;
- const char *cmdflags = "C:h:m:O:o:T:";
- const char *append_comma, *config_opts;
- char *cc_buf, *path, *sess_cfg, *tc_buf, *user_cconfig, *user_tconfig;
-
- /* The first WTPERF structure (from which all others are derived). */
- wtperf = &_wtperf;
- memset(wtperf, 0, sizeof(*wtperf));
- wtperf->home = dstrdup(DEFAULT_HOME);
- wtperf->monitor_dir = dstrdup(DEFAULT_MONITOR_DIR);
- TAILQ_INIT(&wtperf->stone_head);
- config_opt_init(&wtperf->opts);
-
- opts = wtperf->opts;
- monitor_set = false;
- ret = 0;
- config_opts = NULL;
- cc_buf = sess_cfg = tc_buf = user_cconfig = user_tconfig = NULL;
-
- /* Do a basic validation of options, and home is needed before open. */
- while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
- switch (ch) {
- case 'C':
- if (user_cconfig == NULL)
- user_cconfig = dstrdup(__wt_optarg);
- else {
- user_cconfig = drealloc(user_cconfig,
- strlen(user_cconfig) +
- strlen(__wt_optarg) + 2);
- strcat(user_cconfig, ",");
- strcat(user_cconfig, __wt_optarg);
- }
- break;
- case 'h':
- free(wtperf->home);
- wtperf->home = dstrdup(__wt_optarg);
- break;
- case 'm':
- free(wtperf->monitor_dir);
- wtperf->monitor_dir = dstrdup(__wt_optarg);
- monitor_set = true;
- break;
- case 'O':
- config_opts = __wt_optarg;
- break;
- case 'T':
- if (user_tconfig == NULL)
- user_tconfig = dstrdup(__wt_optarg);
- else {
- user_tconfig = drealloc(user_tconfig,
- strlen(user_tconfig) +
- strlen(__wt_optarg) + 2);
- strcat(user_tconfig, ",");
- strcat(user_tconfig, __wt_optarg);
- }
- break;
- case '?':
- usage();
- goto einval;
- }
-
- /*
- * If the user did not specify a monitor directory then set the
- * monitor directory to the home dir.
- */
- if (!monitor_set) {
- free(wtperf->monitor_dir);
- wtperf->monitor_dir = dstrdup(wtperf->home);
- }
-
- /* Parse configuration settings from configuration file. */
- if (config_opts != NULL && config_opt_file(wtperf, config_opts) != 0)
- goto einval;
-
- /* Parse options that override values set via a configuration file. */
- __wt_optreset = __wt_optind = 1;
- while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
- switch (ch) {
- case 'o':
- /* Allow -o key=value */
- if (config_opt_str(wtperf, __wt_optarg) != 0)
- goto einval;
- break;
- }
-
- if (opts->populate_threads == 0 && opts->icount != 0) {
- lprintf(wtperf, 1, 0,
- "Cannot have 0 populate threads when icount is set\n");
- goto err;
- }
-
- wtperf->async_config = NULL;
- /*
- * If the user specified async_threads we use async for all ops.
- * If the user wants compaction, then we also enable async for
- * the compact operation, but not for the workloads.
- */
- if (opts->async_threads > 0) {
- if (F_ISSET(wtperf, CFG_TRUNCATE)) {
- lprintf(wtperf,
- 1, 0, "Cannot run truncate and async\n");
- goto err;
- }
- wtperf->use_asyncops = true;
- }
- if (opts->compact && opts->async_threads == 0)
- opts->async_threads = 2;
- if (opts->async_threads > 0) {
- /*
- * The maximum number of async threads is two digits, so just
- * use that to compute the space we need. Assume the default
- * of 1024 for the max ops. Although we could bump that up
- * to 4096 if needed.
- */
- req_len = strlen(",async=(enabled=true,threads=)") + 4;
- wtperf->async_config = dmalloc(req_len);
- testutil_check(__wt_snprintf(wtperf->async_config, req_len,
- ",async=(enabled=true,threads=%" PRIu32 ")",
- opts->async_threads));
- }
- if ((ret = config_compress(wtperf)) != 0)
- goto err;
-
- /* You can't have truncate on a random collection. */
- if (F_ISSET(wtperf, CFG_TRUNCATE) && opts->random_range) {
- lprintf(wtperf, 1, 0, "Cannot run truncate and random_range\n");
- goto err;
- }
-
- /* We can't run truncate with more than one table. */
- if (F_ISSET(wtperf, CFG_TRUNCATE) && opts->table_count > 1) {
- lprintf(wtperf, 1, 0, "Cannot truncate more than 1 table\n");
- goto err;
- }
-
- /* Make stdout line buffered, so verbose output appears quickly. */
- __wt_stream_set_line_buffer(stdout);
-
- /* Concatenate non-default configuration strings. */
- if (user_cconfig != NULL || opts->session_count_idle > 0 ||
- wtperf->compress_ext != NULL || wtperf->async_config != NULL ||
- opts->in_memory) {
- req_len = 20;
- req_len += wtperf->async_config != NULL ?
- strlen(wtperf->async_config) : 0;
- req_len += wtperf->compress_ext != NULL ?
- strlen(wtperf->compress_ext) : 0;
- if (opts->session_count_idle > 0) {
- sreq_len = strlen("session_max=") + 6;
- req_len += sreq_len;
- sess_cfg = dmalloc(sreq_len);
- testutil_check(__wt_snprintf(sess_cfg, sreq_len,
- "session_max=%" PRIu32,
- opts->session_count_idle +
- wtperf->workers_cnt + opts->populate_threads + 10));
- }
- req_len += opts->in_memory ? strlen("in_memory=true") : 0;
- req_len += user_cconfig != NULL ? strlen(user_cconfig) : 0;
- cc_buf = dmalloc(req_len);
-
- pos = 0;
- append_comma = "";
- if (wtperf->async_config != NULL &&
- strlen(wtperf->async_config) != 0) {
- testutil_check(__wt_snprintf_len_incr(
- cc_buf + pos, req_len - pos, &pos, "%s%s",
- append_comma, wtperf->async_config));
- append_comma = ",";
- }
- if (wtperf->compress_ext != NULL &&
- strlen(wtperf->compress_ext) != 0) {
- testutil_check(__wt_snprintf_len_incr(
- cc_buf + pos, req_len - pos, &pos, "%s%s",
- append_comma, wtperf->compress_ext));
- append_comma = ",";
- }
- if (opts->in_memory) {
- testutil_check(__wt_snprintf_len_incr(
- cc_buf + pos, req_len - pos, &pos, "%s%s",
- append_comma, "in_memory=true"));
- append_comma = ",";
- }
- if (sess_cfg != NULL && strlen(sess_cfg) != 0) {
- testutil_check(__wt_snprintf_len_incr(
- cc_buf + pos, req_len - pos, &pos, "%s%s",
- append_comma, sess_cfg));
- append_comma = ",";
- }
- if (user_cconfig != NULL && strlen(user_cconfig) != 0) {
- testutil_check(__wt_snprintf_len_incr(
- cc_buf + pos, req_len - pos, &pos, "%s%s",
- append_comma, user_cconfig));
- }
-
- if (strlen(cc_buf) != 0 && (ret =
- config_opt_name_value(wtperf, "conn_config", cc_buf)) != 0)
- goto err;
- }
- if (opts->index ||
- user_tconfig != NULL || wtperf->compress_table != NULL) {
- req_len = 20;
- req_len += wtperf->compress_table != NULL ?
- strlen(wtperf->compress_table) : 0;
- req_len += opts->index ? strlen(INDEX_COL_NAMES) : 0;
- req_len += user_tconfig != NULL ? strlen(user_tconfig) : 0;
- tc_buf = dmalloc(req_len);
-
- pos = 0;
- append_comma = "";
- if (wtperf->compress_table != NULL &&
- strlen(wtperf->compress_table) != 0) {
- testutil_check(__wt_snprintf_len_incr(
- tc_buf + pos, req_len - pos, &pos, "%s%s",
- append_comma, wtperf->compress_table));
- append_comma = ",";
- }
- if (opts->index) {
- testutil_check(__wt_snprintf_len_incr(
- tc_buf + pos, req_len - pos, &pos, "%s%s",
- append_comma, INDEX_COL_NAMES));
- append_comma = ",";
- }
- if (user_tconfig != NULL && strlen(user_tconfig) != 0) {
- testutil_check(__wt_snprintf_len_incr(
- tc_buf + pos, req_len - pos, &pos, "%s%s",
- append_comma, user_tconfig));
- }
-
- if (strlen(tc_buf) != 0 && (ret =
- config_opt_name_value(wtperf, "table_config", tc_buf)) != 0)
- goto err;
- }
- if (opts->log_partial && opts->table_count > 1) {
- req_len = strlen(opts->table_config) +
- strlen(LOG_PARTIAL_CONFIG) + 1;
- wtperf->partial_config = dmalloc(req_len);
- testutil_check(__wt_snprintf(
- wtperf->partial_config, req_len, "%s%s",
- opts->table_config, LOG_PARTIAL_CONFIG));
- }
- /*
- * Set the config for reopen. If readonly add in that string.
- * If not readonly then just copy the original conn_config.
- */
- if (opts->readonly)
- req_len = strlen(opts->conn_config) +
- strlen(READONLY_CONFIG) + 1;
- else
- req_len = strlen(opts->conn_config) + 1;
- wtperf->reopen_config = dmalloc(req_len);
- if (opts->readonly)
- testutil_check(__wt_snprintf(
- wtperf->reopen_config, req_len, "%s%s",
- opts->conn_config, READONLY_CONFIG));
- else
- testutil_check(__wt_snprintf(
- wtperf->reopen_config, req_len, "%s", opts->conn_config));
-
- /* Sanity-check the configuration. */
- if ((ret = config_sanity(wtperf)) != 0)
- goto err;
-
- /* If creating, remove and re-create the home directory. */
- if (opts->create != 0)
- recreate_dir(wtperf->home);
-
- /* Write a copy of the config. */
- req_len = strlen(wtperf->home) + strlen("/CONFIG.wtperf") + 1;
- path = dmalloc(req_len);
- testutil_check(__wt_snprintf(
- path, req_len, "%s/CONFIG.wtperf", wtperf->home));
- config_opt_log(opts, path);
- free(path);
-
- /* Display the configuration. */
- if (opts->verbose > 1)
- config_opt_print(wtperf);
-
- if ((ret = start_all_runs(wtperf)) != 0)
- goto err;
-
- if (0) {
-einval: ret = EINVAL;
- }
-
-err: wtperf_free(wtperf);
- config_opt_cleanup(opts);
-
- free(cc_buf);
- free(sess_cfg);
- free(tc_buf);
- free(user_cconfig);
- free(user_tconfig);
-
- return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
+ CONFIG_OPTS *opts;
+ WTPERF *wtperf, _wtperf;
+ size_t pos, req_len, sreq_len;
+ bool monitor_set;
+ int ch, ret;
+ const char *cmdflags = "C:h:m:O:o:T:";
+ const char *append_comma, *config_opts;
+ char *cc_buf, *path, *sess_cfg, *tc_buf, *user_cconfig, *user_tconfig;
+
+ /* The first WTPERF structure (from which all others are derived). */
+ wtperf = &_wtperf;
+ memset(wtperf, 0, sizeof(*wtperf));
+ wtperf->home = dstrdup(DEFAULT_HOME);
+ wtperf->monitor_dir = dstrdup(DEFAULT_MONITOR_DIR);
+ TAILQ_INIT(&wtperf->stone_head);
+ config_opt_init(&wtperf->opts);
+
+ opts = wtperf->opts;
+ monitor_set = false;
+ ret = 0;
+ config_opts = NULL;
+ cc_buf = sess_cfg = tc_buf = user_cconfig = user_tconfig = NULL;
+
+ /* Do a basic validation of options, and home is needed before open. */
+ while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
+ switch (ch) {
+ case 'C':
+ if (user_cconfig == NULL)
+ user_cconfig = dstrdup(__wt_optarg);
+ else {
+ user_cconfig =
+ drealloc(user_cconfig, strlen(user_cconfig) + strlen(__wt_optarg) + 2);
+ strcat(user_cconfig, ",");
+ strcat(user_cconfig, __wt_optarg);
+ }
+ break;
+ case 'h':
+ free(wtperf->home);
+ wtperf->home = dstrdup(__wt_optarg);
+ break;
+ case 'm':
+ free(wtperf->monitor_dir);
+ wtperf->monitor_dir = dstrdup(__wt_optarg);
+ monitor_set = true;
+ break;
+ case 'O':
+ config_opts = __wt_optarg;
+ break;
+ case 'T':
+ if (user_tconfig == NULL)
+ user_tconfig = dstrdup(__wt_optarg);
+ else {
+ user_tconfig =
+ drealloc(user_tconfig, strlen(user_tconfig) + strlen(__wt_optarg) + 2);
+ strcat(user_tconfig, ",");
+ strcat(user_tconfig, __wt_optarg);
+ }
+ break;
+ case '?':
+ usage();
+ goto einval;
+ }
+
+ /*
+ * If the user did not specify a monitor directory then set the monitor directory to the home
+ * dir.
+ */
+ if (!monitor_set) {
+ free(wtperf->monitor_dir);
+ wtperf->monitor_dir = dstrdup(wtperf->home);
+ }
+
+ /* Parse configuration settings from configuration file. */
+ if (config_opts != NULL && config_opt_file(wtperf, config_opts) != 0)
+ goto einval;
+
+ /* Parse options that override values set via a configuration file. */
+ __wt_optreset = __wt_optind = 1;
+ while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
+ switch (ch) {
+ case 'o':
+ /* Allow -o key=value */
+ if (config_opt_str(wtperf, __wt_optarg) != 0)
+ goto einval;
+ break;
+ }
+
+ if (opts->populate_threads == 0 && opts->icount != 0) {
+ lprintf(wtperf, 1, 0, "Cannot have 0 populate threads when icount is set\n");
+ goto err;
+ }
+
+ wtperf->async_config = NULL;
+ /*
+ * If the user specified async_threads we use async for all ops. If the user wants compaction,
+ * then we also enable async for the compact operation, but not for the workloads.
+ */
+ if (opts->async_threads > 0) {
+ if (F_ISSET(wtperf, CFG_TRUNCATE)) {
+ lprintf(wtperf, 1, 0, "Cannot run truncate and async\n");
+ goto err;
+ }
+ wtperf->use_asyncops = true;
+ }
+ if (opts->compact && opts->async_threads == 0)
+ opts->async_threads = 2;
+ if (opts->async_threads > 0) {
+ /*
+ * The maximum number of async threads is two digits, so just use that to compute the space
+ * we need. Assume the default of 1024 for the max ops. Although we could bump that up to
+ * 4096 if needed.
+ */
+ req_len = strlen(",async=(enabled=true,threads=)") + 4;
+ wtperf->async_config = dmalloc(req_len);
+ testutil_check(__wt_snprintf(wtperf->async_config, req_len,
+ ",async=(enabled=true,threads=%" PRIu32 ")", opts->async_threads));
+ }
+ if ((ret = config_compress(wtperf)) != 0)
+ goto err;
+
+ /* You can't have truncate on a random collection. */
+ if (F_ISSET(wtperf, CFG_TRUNCATE) && opts->random_range) {
+ lprintf(wtperf, 1, 0, "Cannot run truncate and random_range\n");
+ goto err;
+ }
+
+ /* We can't run truncate with more than one table. */
+ if (F_ISSET(wtperf, CFG_TRUNCATE) && opts->table_count > 1) {
+ lprintf(wtperf, 1, 0, "Cannot truncate more than 1 table\n");
+ goto err;
+ }
+
+ /* Make stdout line buffered, so verbose output appears quickly. */
+ __wt_stream_set_line_buffer(stdout);
+
+ /* Concatenate non-default configuration strings. */
+ if (user_cconfig != NULL || opts->session_count_idle > 0 || wtperf->compress_ext != NULL ||
+ wtperf->async_config != NULL || opts->in_memory) {
+ req_len = 20;
+ req_len += wtperf->async_config != NULL ? strlen(wtperf->async_config) : 0;
+ req_len += wtperf->compress_ext != NULL ? strlen(wtperf->compress_ext) : 0;
+ if (opts->session_count_idle > 0) {
+ sreq_len = strlen("session_max=") + 6;
+ req_len += sreq_len;
+ sess_cfg = dmalloc(sreq_len);
+ testutil_check(__wt_snprintf(sess_cfg, sreq_len, "session_max=%" PRIu32,
+ opts->session_count_idle + wtperf->workers_cnt + opts->populate_threads + 10));
+ }
+ req_len += opts->in_memory ? strlen("in_memory=true") : 0;
+ req_len += user_cconfig != NULL ? strlen(user_cconfig) : 0;
+ cc_buf = dmalloc(req_len);
+
+ pos = 0;
+ append_comma = "";
+ if (wtperf->async_config != NULL && strlen(wtperf->async_config) != 0) {
+ testutil_check(__wt_snprintf_len_incr(
+ cc_buf + pos, req_len - pos, &pos, "%s%s", append_comma, wtperf->async_config));
+ append_comma = ",";
+ }
+ if (wtperf->compress_ext != NULL && strlen(wtperf->compress_ext) != 0) {
+ testutil_check(__wt_snprintf_len_incr(
+ cc_buf + pos, req_len - pos, &pos, "%s%s", append_comma, wtperf->compress_ext));
+ append_comma = ",";
+ }
+ if (opts->in_memory) {
+ testutil_check(__wt_snprintf_len_incr(
+ cc_buf + pos, req_len - pos, &pos, "%s%s", append_comma, "in_memory=true"));
+ append_comma = ",";
+ }
+ if (sess_cfg != NULL && strlen(sess_cfg) != 0) {
+ testutil_check(__wt_snprintf_len_incr(
+ cc_buf + pos, req_len - pos, &pos, "%s%s", append_comma, sess_cfg));
+ append_comma = ",";
+ }
+ if (user_cconfig != NULL && strlen(user_cconfig) != 0) {
+ testutil_check(__wt_snprintf_len_incr(
+ cc_buf + pos, req_len - pos, &pos, "%s%s", append_comma, user_cconfig));
+ }
+
+ if (strlen(cc_buf) != 0 &&
+ (ret = config_opt_name_value(wtperf, "conn_config", cc_buf)) != 0)
+ goto err;
+ }
+ if (opts->index || user_tconfig != NULL || wtperf->compress_table != NULL) {
+ req_len = 20;
+ req_len += wtperf->compress_table != NULL ? strlen(wtperf->compress_table) : 0;
+ req_len += opts->index ? strlen(INDEX_COL_NAMES) : 0;
+ req_len += user_tconfig != NULL ? strlen(user_tconfig) : 0;
+ tc_buf = dmalloc(req_len);
+
+ pos = 0;
+ append_comma = "";
+ if (wtperf->compress_table != NULL && strlen(wtperf->compress_table) != 0) {
+ testutil_check(__wt_snprintf_len_incr(
+ tc_buf + pos, req_len - pos, &pos, "%s%s", append_comma, wtperf->compress_table));
+ append_comma = ",";
+ }
+ if (opts->index) {
+ testutil_check(__wt_snprintf_len_incr(
+ tc_buf + pos, req_len - pos, &pos, "%s%s", append_comma, INDEX_COL_NAMES));
+ append_comma = ",";
+ }
+ if (user_tconfig != NULL && strlen(user_tconfig) != 0) {
+ testutil_check(__wt_snprintf_len_incr(
+ tc_buf + pos, req_len - pos, &pos, "%s%s", append_comma, user_tconfig));
+ }
+
+ if (strlen(tc_buf) != 0 &&
+ (ret = config_opt_name_value(wtperf, "table_config", tc_buf)) != 0)
+ goto err;
+ }
+ if (opts->log_partial && opts->table_count > 1) {
+ req_len = strlen(opts->table_config) + strlen(LOG_PARTIAL_CONFIG) + 1;
+ wtperf->partial_config = dmalloc(req_len);
+ testutil_check(__wt_snprintf(
+ wtperf->partial_config, req_len, "%s%s", opts->table_config, LOG_PARTIAL_CONFIG));
+ }
+ /*
+ * Set the config for reopen. If readonly add in that string. If not readonly then just copy the
+ * original conn_config.
+ */
+ if (opts->readonly)
+ req_len = strlen(opts->conn_config) + strlen(READONLY_CONFIG) + 1;
+ else
+ req_len = strlen(opts->conn_config) + 1;
+ wtperf->reopen_config = dmalloc(req_len);
+ if (opts->readonly)
+ testutil_check(__wt_snprintf(
+ wtperf->reopen_config, req_len, "%s%s", opts->conn_config, READONLY_CONFIG));
+ else
+ testutil_check(__wt_snprintf(wtperf->reopen_config, req_len, "%s", opts->conn_config));
+
+ /* Sanity-check the configuration. */
+ if ((ret = config_sanity(wtperf)) != 0)
+ goto err;
+
+ /* If creating, remove and re-create the home directory. */
+ if (opts->create != 0)
+ recreate_dir(wtperf->home);
+
+ /* Write a copy of the config. */
+ req_len = strlen(wtperf->home) + strlen("/CONFIG.wtperf") + 1;
+ path = dmalloc(req_len);
+ testutil_check(__wt_snprintf(path, req_len, "%s/CONFIG.wtperf", wtperf->home));
+ config_opt_log(opts, path);
+ free(path);
+
+ /* Display the configuration. */
+ if (opts->verbose > 1)
+ config_opt_print(wtperf);
+
+ if ((ret = start_all_runs(wtperf)) != 0)
+ goto err;
+
+ if (0) {
+einval:
+ ret = EINVAL;
+ }
+
+err:
+ wtperf_free(wtperf);
+ config_opt_cleanup(opts);
+
+ free(cc_buf);
+ free(sess_cfg);
+ free(tc_buf);
+ free(user_cconfig);
+ free(user_tconfig);
+
+ return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}
static void
-start_threads(WTPERF *wtperf, WORKLOAD *workp,
- WTPERF_THREAD *base, u_int num, WT_THREAD_CALLBACK(*func)(void *))
+start_threads(WTPERF *wtperf, WORKLOAD *workp, WTPERF_THREAD *base, u_int num,
+ WT_THREAD_CALLBACK (*func)(void *))
{
- CONFIG_OPTS *opts;
- WTPERF_THREAD *thread;
- u_int i;
-
- opts = wtperf->opts;
-
- /* Initialize the threads. */
- for (i = 0, thread = base; i < num; ++i, ++thread) {
- thread->wtperf = wtperf;
- thread->workload = workp;
-
- /*
- * We don't want the threads executing in lock-step, seed each
- * one differently.
- */
- __wt_random_init_seed(NULL, &thread->rnd);
-
- /*
- * Every thread gets a key/data buffer because we don't bother
- * to distinguish between threads needing them and threads that
- * don't, it's not enough memory to bother. These buffers hold
- * strings: trailing NUL is included in the size.
- */
- thread->key_buf = dcalloc(opts->key_sz, 1);
- thread->value_buf = dcalloc(opts->value_sz_max, 1);
-
- /*
- * Initialize and then toss in a bit of random values if needed.
- */
- memset(thread->value_buf, 'a', opts->value_sz - 1);
- if (opts->random_value)
- randomize_value(thread, thread->value_buf);
-
- /*
- * Every thread gets tracking information and is initialized
- * for latency measurements, for the same reason.
- */
- thread->ckpt.min_latency =
- thread->insert.min_latency = thread->read.min_latency =
- thread->update.min_latency = UINT32_MAX;
- thread->ckpt.max_latency = thread->insert.max_latency =
- thread->read.max_latency = thread->update.max_latency = 0;
- }
-
- /* Start the threads. */
- for (i = 0, thread = base; i < num; ++i, ++thread)
- testutil_check(__wt_thread_create(
- NULL, &thread->handle, func, thread));
+ CONFIG_OPTS *opts;
+ WTPERF_THREAD *thread;
+ u_int i;
+
+ opts = wtperf->opts;
+
+ /* Initialize the threads. */
+ for (i = 0, thread = base; i < num; ++i, ++thread) {
+ thread->wtperf = wtperf;
+ thread->workload = workp;
+
+ /*
+ * We don't want the threads executing in lock-step, seed each one differently.
+ */
+ __wt_random_init_seed(NULL, &thread->rnd);
+
+ /*
+ * Every thread gets a key/data buffer because we don't bother to distinguish between
+ * threads needing them and threads that don't, it's not enough memory to bother. These
+ * buffers hold strings: trailing NUL is included in the size.
+ */
+ thread->key_buf = dcalloc(opts->key_sz, 1);
+ thread->value_buf = dcalloc(opts->value_sz_max, 1);
+
+ /*
+ * Initialize and then toss in a bit of random values if needed.
+ */
+ memset(thread->value_buf, 'a', opts->value_sz - 1);
+ if (opts->random_value)
+ randomize_value(thread, thread->value_buf);
+
+ /*
+ * Every thread gets tracking information and is initialized for latency measurements, for
+ * the same reason.
+ */
+ thread->ckpt.min_latency = thread->insert.min_latency = thread->read.min_latency =
+ thread->update.min_latency = UINT32_MAX;
+ thread->ckpt.max_latency = thread->insert.max_latency = thread->read.max_latency =
+ thread->update.max_latency = 0;
+ }
+
+ /* Start the threads. */
+ for (i = 0, thread = base; i < num; ++i, ++thread)
+ testutil_check(__wt_thread_create(NULL, &thread->handle, func, thread));
}
static void
stop_threads(u_int num, WTPERF_THREAD *threads)
{
- u_int i;
-
- if (num == 0 || threads == NULL)
- return;
-
- for (i = 0; i < num; ++i, ++threads) {
- testutil_check(__wt_thread_join(NULL, &threads->handle));
-
- free(threads->key_buf);
- threads->key_buf = NULL;
- free(threads->value_buf);
- threads->value_buf = NULL;
- }
-
- /*
- * We don't free the thread structures or any memory referenced, or NULL
- * the reference when we stop the threads; the thread structure is still
- * being read by the monitor thread (among others). As a standalone
- * program, leaking memory isn't a concern, and it's simpler that way.
- */
+ u_int i;
+
+ if (num == 0 || threads == NULL)
+ return;
+
+ for (i = 0; i < num; ++i, ++threads) {
+ testutil_check(__wt_thread_join(NULL, &threads->handle));
+
+ free(threads->key_buf);
+ threads->key_buf = NULL;
+ free(threads->value_buf);
+ threads->value_buf = NULL;
+ }
+
+ /*
+ * We don't free the thread structures or any memory referenced, or NULL the reference when we
+ * stop the threads; the thread structure is still being read by the monitor thread (among
+ * others). As a standalone program, leaking memory isn't a concern, and it's simpler that way.
+ */
}
static void
recreate_dir(const char *name)
{
- char *buf;
- size_t len;
-
- len = strlen(name) * 2 + 100;
- buf = dmalloc(len);
- testutil_check(__wt_snprintf(
- buf, len, "rm -rf %s && mkdir %s", name, name));
- testutil_checkfmt(system(buf), "system: %s", buf);
- free(buf);
+ char *buf;
+ size_t len;
+
+ len = strlen(name) * 2 + 100;
+ buf = dmalloc(len);
+ testutil_check(__wt_snprintf(buf, len, "rm -rf %s && mkdir %s", name, name));
+ testutil_checkfmt(system(buf), "system: %s", buf);
+ free(buf);
}
static int
drop_all_tables(WTPERF *wtperf)
{
- struct timespec start, stop;
- CONFIG_OPTS *opts;
- WT_SESSION *session;
- size_t i;
- uint64_t msecs;
- int ret, t_ret;
-
- opts = wtperf->opts;
-
- /* Drop any tables. */
- if ((ret = wtperf->conn->open_session(
- wtperf->conn, NULL, opts->sess_config, &session)) != 0) {
- lprintf(wtperf, ret, 0,
- "Error opening a session on %s", wtperf->home);
- return (ret);
- }
- __wt_epoch(NULL, &start);
- for (i = 0; i < opts->table_count; i++) {
- if ((ret =
- session->drop(session, wtperf->uris[i], NULL)) != 0) {
- lprintf(wtperf, ret, 0,
- "Error dropping table %s", wtperf->uris[i]);
- goto err;
- }
- }
- __wt_epoch(NULL, &stop);
- msecs = WT_TIMEDIFF_MS(stop, start);
- lprintf(wtperf, 0, 1,
- "Executed %" PRIu32 " drop operations average time %" PRIu64 "ms",
- opts->table_count, msecs / opts->table_count);
-
-err: if ((t_ret = session->close(session, NULL)) != 0 && ret == 0)
- ret = t_ret;
- return (ret);
+ struct timespec start, stop;
+ CONFIG_OPTS *opts;
+ WT_SESSION *session;
+ size_t i;
+ uint64_t msecs;
+ int ret, t_ret;
+
+ opts = wtperf->opts;
+
+ /* Drop any tables. */
+ if ((ret = wtperf->conn->open_session(wtperf->conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0, "Error opening a session on %s", wtperf->home);
+ return (ret);
+ }
+ __wt_epoch(NULL, &start);
+ for (i = 0; i < opts->table_count; i++) {
+ if ((ret = session->drop(session, wtperf->uris[i], NULL)) != 0) {
+ lprintf(wtperf, ret, 0, "Error dropping table %s", wtperf->uris[i]);
+ goto err;
+ }
+ }
+ __wt_epoch(NULL, &stop);
+ msecs = WT_TIMEDIFF_MS(stop, start);
+ lprintf(wtperf, 0, 1, "Executed %" PRIu32 " drop operations average time %" PRIu64 "ms",
+ opts->table_count, msecs / opts->table_count);
+
+err:
+ if ((t_ret = session->close(session, NULL)) != 0 && ret == 0)
+ ret = t_ret;
+ return (ret);
}
static uint64_t
wtperf_value_range(WTPERF *wtperf)
{
- CONFIG_OPTS *opts;
-
- opts = wtperf->opts;
-
- if (opts->random_range)
- return (opts->icount + opts->random_range);
- /*
- * It is legal to configure a zero size populate phase, hide that
- * from other code by pretending the range is 1 in that case.
- */
- if (opts->icount + wtperf->insert_key == 0)
- return (1);
- return (opts->icount +
- wtperf->insert_key - (u_int)(wtperf->workers_cnt + 1));
+ CONFIG_OPTS *opts;
+
+ opts = wtperf->opts;
+
+ if (opts->random_range)
+ return (opts->icount + opts->random_range);
+ /*
+ * It is legal to configure a zero size populate phase, hide that from other code by pretending
+ * the range is 1 in that case.
+ */
+ if (opts->icount + wtperf->insert_key == 0)
+ return (1);
+ return (opts->icount + wtperf->insert_key - (u_int)(wtperf->workers_cnt + 1));
}
static uint64_t
wtperf_rand(WTPERF_THREAD *thread)
{
- CONFIG_OPTS *opts;
- WT_CURSOR *rnd_cursor;
- WTPERF *wtperf;
- double S1, S2, U;
- uint64_t rval;
- int ret;
- char *key_buf;
-
- wtperf = thread->wtperf;
- opts = wtperf->opts;
-
- /*
- * If we have a random cursor set up then use it.
- */
- if ((rnd_cursor = thread->rand_cursor) != NULL) {
- if ((ret = rnd_cursor->next(rnd_cursor)) != 0) {
- lprintf(wtperf, ret, 0, "worker: rand next failed");
- /* 0 is outside the expected range. */
- return (0);
- }
- if ((ret = rnd_cursor->get_key(rnd_cursor, &key_buf)) != 0) {
- lprintf(wtperf, ret, 0,
- "worker: rand next key retrieval");
- return (0);
- }
- /*
- * Resetting the cursor is not fatal. We still return the
- * value we retrieved above. We do it so that we don't
- * leave a cursor positioned.
- */
- if ((ret = rnd_cursor->reset(rnd_cursor)) != 0)
- lprintf(wtperf, ret, 0,
- "worker: rand cursor reset failed");
- extract_key(key_buf, &rval);
- return (rval);
- }
-
- /*
- * Use WiredTiger's random number routine: it's lock-free and fairly
- * good.
- */
- rval = __wt_random(&thread->rnd);
-
- /* Use Pareto distribution to give 80/20 hot/cold values. */
- if (opts->pareto != 0) {
-#define PARETO_SHAPE 1.5
- S1 = (-1 / PARETO_SHAPE);
- S2 = wtperf_value_range(wtperf) *
- (opts->pareto / 100.0) * (PARETO_SHAPE - 1);
- U = 1 - (double)rval / (double)UINT32_MAX;
- rval = (uint64_t)((pow(U, S1) - 1) * S2);
- /*
- * This Pareto calculation chooses out of range values about
- * 2% of the time, from my testing. That will lead to the
- * first item in the table being "hot".
- */
- if (rval > wtperf_value_range(wtperf))
- rval = 0;
- }
- /*
- * Wrap the key to within the expected range and avoid zero: we never
- * insert that key.
- */
- rval = (rval % wtperf_value_range(wtperf)) + 1;
- return (rval);
+ CONFIG_OPTS *opts;
+ WT_CURSOR *rnd_cursor;
+ WTPERF *wtperf;
+ double S1, S2, U;
+ uint64_t rval;
+ int ret;
+ char *key_buf;
+
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+
+ /*
+ * If we have a random cursor set up then use it.
+ */
+ if ((rnd_cursor = thread->rand_cursor) != NULL) {
+ if ((ret = rnd_cursor->next(rnd_cursor)) != 0) {
+ lprintf(wtperf, ret, 0, "worker: rand next failed");
+ /* 0 is outside the expected range. */
+ return (0);
+ }
+ if ((ret = rnd_cursor->get_key(rnd_cursor, &key_buf)) != 0) {
+ lprintf(wtperf, ret, 0, "worker: rand next key retrieval");
+ return (0);
+ }
+ /*
+ * Resetting the cursor is not fatal. We still return the value we retrieved above. We do it
+ * so that we don't leave a cursor positioned.
+ */
+ if ((ret = rnd_cursor->reset(rnd_cursor)) != 0)
+ lprintf(wtperf, ret, 0, "worker: rand cursor reset failed");
+ extract_key(key_buf, &rval);
+ return (rval);
+ }
+
+ /*
+ * Use WiredTiger's random number routine: it's lock-free and fairly good.
+ */
+ rval = __wt_random(&thread->rnd);
+
+ /* Use Pareto distribution to give 80/20 hot/cold values. */
+ if (opts->pareto != 0) {
+#define PARETO_SHAPE 1.5
+ S1 = (-1 / PARETO_SHAPE);
+ S2 = wtperf_value_range(wtperf) * (opts->pareto / 100.0) * (PARETO_SHAPE - 1);
+ U = 1 - (double)rval / (double)UINT32_MAX;
+ rval = (uint64_t)((pow(U, S1) - 1) * S2);
+ /*
+ * This Pareto calculation chooses out of range values about
+ * 2% of the time, from my testing. That will lead to the
+ * first item in the table being "hot".
+ */
+ if (rval > wtperf_value_range(wtperf))
+ rval = 0;
+ }
+ /*
+ * Wrap the key to within the expected range and avoid zero: we never insert that key.
+ */
+ rval = (rval % wtperf_value_range(wtperf)) + 1;
+ return (rval);
}