summaryrefslogtreecommitdiff
path: root/bench/wtperf/wtperf.c
diff options
context:
space:
mode:
Diffstat (limited to 'bench/wtperf/wtperf.c')
-rw-r--r--bench/wtperf/wtperf.c1630
1 files changed, 883 insertions, 747 deletions
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c
index bf6b156bb69..a7618b19707 100644
--- a/bench/wtperf/wtperf.c
+++ b/bench/wtperf/wtperf.c
@@ -31,78 +31,38 @@
/* Default values. */
#define DEFAULT_HOME "WT_TEST"
#define DEFAULT_MONITOR_DIR "WT_TEST"
-static const CONFIG default_cfg = {
- NULL, /* home */
- NULL, /* monitor dir */
- NULL, /* partial logging */
- NULL, /* reopen config */
- NULL, /* base_uri */
- NULL, /* log_table_uri */
- NULL, /* uris */
- NULL, /* conn */
- NULL, /* logf */
- NULL, /* async */
- NULL, NULL, /* compressor ext, blk */
- NULL, NULL, /* populate, checkpoint threads */
-
- NULL, /* worker threads */
- 0, /* worker thread count */
- NULL, /* workloads */
- 0, /* workload count */
- 0, /* use_asyncops */
- 0, /* checkpoint operations */
- 0, /* insert operations */
- 0, /* read operations */
- 0, /* truncate operations */
- 0, /* update operations */
- 0, /* insert key */
- 0, /* log like table key */
- 0, /* checkpoint in progress */
- 0, /* thread error */
- 0, /* notify threads to stop */
- 0, /* in warmup phase */
- false, /* Signal for idle cycle thread */
- 0, /* total seconds running */
- 0, /* flags */
- {NULL, NULL}, /* the truncate queue */
- {NULL, NULL}, /* the config queue */
-
-#define OPT_DEFINE_DEFAULT
-#include "wtperf_opt.i"
-#undef OPT_DEFINE_DEFAULT
-};
static const char * const debug_cconfig = "";
static const char * const debug_tconfig = "";
static void *checkpoint_worker(void *);
-static int drop_all_tables(CONFIG *);
-static int execute_populate(CONFIG *);
-static int execute_workload(CONFIG *);
-static int find_table_count(CONFIG *);
+static int drop_all_tables(WTPERF *);
+static int execute_populate(WTPERF *);
+static int execute_workload(WTPERF *);
+static int find_table_count(WTPERF *);
static void *monitor(void *);
static void *populate_thread(void *);
-static void randomize_value(CONFIG_THREAD *, char *);
+static void randomize_value(WTPERF_THREAD *, char *);
static void recreate_dir(const char *);
-static int start_all_runs(CONFIG *);
-static int start_run(CONFIG *);
-static int start_threads(CONFIG *,
- WORKLOAD *, CONFIG_THREAD *, u_int, void *(*)(void *));
-static int stop_threads(CONFIG *, u_int, CONFIG_THREAD *);
+static int start_all_runs(WTPERF *);
+static int start_run(WTPERF *);
+static int start_threads(WTPERF *,
+ WORKLOAD *, WTPERF_THREAD *, u_int, void *(*)(void *));
+static int stop_threads(WTPERF *, u_int, WTPERF_THREAD *);
static void *thread_run_wtperf(void *);
-static void update_value_delta(CONFIG_THREAD *);
+static void update_value_delta(WTPERF_THREAD *);
static void *worker(void *);
-static uint64_t wtperf_rand(CONFIG_THREAD *);
-static uint64_t wtperf_value_range(CONFIG *);
+static uint64_t wtperf_rand(WTPERF_THREAD *);
+static uint64_t wtperf_value_range(WTPERF *);
-#define INDEX_COL_NAMES ",columns=(key,val)"
+#define INDEX_COL_NAMES "columns=(key,val)"
/* Retrieve an ID for the next insert operation. */
static inline uint64_t
-get_next_incr(CONFIG *cfg)
+get_next_incr(WTPERF *wtperf)
{
- return (__wt_atomic_add64(&cfg->insert_key, 1));
+ return (__wt_atomic_add64(&wtperf->insert_key, 1));
}
/*
@@ -110,11 +70,14 @@ get_next_incr(CONFIG *cfg)
* other element in the value buffer.
*/
static void
-randomize_value(CONFIG_THREAD *thread, char *value_buf)
+randomize_value(WTPERF_THREAD *thread, char *value_buf)
{
+ CONFIG_OPTS *opts;
uint8_t *vb;
uint32_t i, max_range, rand_val;
+ opts = thread->wtperf->opts;
+
/*
* Limit how much of the buffer we validate for length, this means
* that only threads that do growing updates will ever make changes to
@@ -123,11 +86,11 @@ randomize_value(CONFIG_THREAD *thread, char *value_buf)
* in this performance sensitive function.
*/
if (thread->workload == NULL || thread->workload->update_delta == 0)
- max_range = thread->cfg->value_sz;
+ max_range = opts->value_sz;
else if (thread->workload->update_delta > 0)
- max_range = thread->cfg->value_sz_max;
+ max_range = opts->value_sz_max;
else
- max_range = thread->cfg->value_sz_min;
+ max_range = opts->value_sz_min;
/*
* Generate a single random value and re-use it. We generally only
@@ -157,17 +120,17 @@ randomize_value(CONFIG_THREAD *thread, char *value_buf)
* Partition data by key ranges.
*/
static uint32_t
-map_key_to_table(CONFIG *cfg, uint64_t k)
+map_key_to_table(CONFIG_OPTS *opts, uint64_t k)
{
- if (cfg->range_partition) {
+ if (opts->range_partition) {
/* Take care to return a result in [0..table_count-1]. */
- if (k > cfg->icount + cfg->random_range)
+ if (k > opts->icount + opts->random_range)
return (0);
return ((uint32_t)((k - 1) /
- ((cfg->icount + cfg->random_range + cfg->table_count - 1) /
- cfg->table_count)));
+ ((opts->icount + opts->random_range +
+ opts->table_count - 1) / opts->table_count)));
} else
- return ((uint32_t)(k % cfg->table_count));
+ return ((uint32_t)(k % opts->table_count));
}
/*
@@ -176,26 +139,28 @@ map_key_to_table(CONFIG *cfg, uint64_t k)
* scratch buffer.
*/
static inline void
-update_value_delta(CONFIG_THREAD *thread)
+update_value_delta(WTPERF_THREAD *thread)
{
- CONFIG *cfg;
+ CONFIG_OPTS *opts;
+ WTPERF *wtperf;
char * value;
int64_t delta, len, new_len;
- cfg = thread->cfg;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
value = thread->value_buf;
delta = thread->workload->update_delta;
len = (int64_t)strlen(value);
if (delta == INT64_MAX)
delta = __wt_random(&thread->rnd) %
- (cfg->value_sz_max - cfg->value_sz);
+ (opts->value_sz_max - opts->value_sz);
/* Ensure we aren't changing across boundaries */
- if (delta > 0 && len + delta > cfg->value_sz_max)
- delta = cfg->value_sz_max - len;
- else if (delta < 0 && len + delta < cfg->value_sz_min)
- delta = cfg->value_sz_min - len;
+ if (delta > 0 && len + delta > opts->value_sz_max)
+ delta = opts->value_sz_max - len;
+ else if (delta < 0 && len + delta < opts->value_sz_min)
+ delta = opts->value_sz_min - len;
/* Bail if there isn't anything to do */
if (delta == 0)
@@ -206,7 +171,7 @@ update_value_delta(CONFIG_THREAD *thread)
else {
/* Extend the value by the configured amount. */
for (new_len = len;
- new_len < cfg->value_sz_max && new_len - len < delta;
+ new_len < opts->value_sz_max && new_len - len < delta;
new_len++)
value[new_len] = 'a';
}
@@ -215,24 +180,24 @@ update_value_delta(CONFIG_THREAD *thread)
static int
cb_asyncop(WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP *op, int ret, uint32_t flags)
{
- CONFIG *cfg;
- CONFIG_THREAD *thread;
TRACK *trk;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
WT_ASYNC_OPTYPE type;
- char *value;
uint32_t *tables;
int t_ret;
+ char *value;
(void)cb;
(void)flags;
- cfg = NULL; /* -Wconditional-uninitialized */
+ wtperf = NULL; /* -Wconditional-uninitialized */
thread = NULL; /* -Wconditional-uninitialized */
type = op->get_type(op);
if (type != WT_AOP_COMPACT) {
- thread = (CONFIG_THREAD *)op->app_private;
- cfg = thread->cfg;
+ thread = (WTPERF_THREAD *)op->app_private;
+ wtperf = thread->wtperf;
}
trk = NULL;
@@ -249,7 +214,7 @@ cb_asyncop(WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP *op, int ret, uint32_t flags)
if (ret == 0 &&
(t_ret = op->get_value(op, &value)) != 0) {
ret = t_ret;
- lprintf(cfg, ret, 0, "get_value in read.");
+ lprintf(wtperf, ret, 0, "get_value in read.");
goto err;
}
break;
@@ -259,7 +224,8 @@ cb_asyncop(WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP *op, int ret, uint32_t flags)
case WT_AOP_NONE:
case WT_AOP_REMOVE:
/* We never expect this type. */
- lprintf(cfg, ret, 0, "No type in op %" PRIu64, op->get_id(op));
+ lprintf(wtperf,
+ ret, 0, "No type in op %" PRIu64, op->get_id(op));
goto err;
}
@@ -273,15 +239,14 @@ cb_asyncop(WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP *op, int ret, uint32_t flags)
if (type == WT_AOP_COMPACT)
return (0);
if (ret == 0 || (ret == WT_NOTFOUND && type != WT_AOP_INSERT)) {
- if (!cfg->in_warmup)
+ if (!wtperf->in_warmup)
(void)__wt_atomic_add64(&trk->ops, 1);
return (0);
}
err:
/* Panic if error */
- lprintf(cfg, ret, 0, "Error in op %" PRIu64,
- op->get_id(op));
- cfg->error = cfg->stop = 1;
+ lprintf(wtperf, ret, 0, "Error in op %" PRIu64, op->get_id(op));
+ wtperf->error = wtperf->stop = true;
return (1);
}
@@ -353,8 +318,9 @@ op_name(uint8_t *op)
static void *
worker_async(void *arg)
{
- CONFIG *cfg;
- CONFIG_THREAD *thread;
+ CONFIG_OPTS *opts;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
WT_ASYNC_OP *asyncop;
WT_CONNECTION *conn;
uint64_t next_val;
@@ -362,9 +328,10 @@ worker_async(void *arg)
int ret;
char *key_buf, *value_buf;
- thread = (CONFIG_THREAD *)arg;
- cfg = thread->cfg;
- conn = cfg->conn;
+ thread = (WTPERF_THREAD *)arg;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+ conn = wtperf->conn;
key_buf = thread->key_buf;
value_buf = thread->value_buf;
@@ -372,7 +339,7 @@ worker_async(void *arg)
op = thread->workload->ops;
op_end = op + sizeof(thread->workload->ops);
- while (!cfg->stop) {
+ while (!wtperf->stop) {
/*
* Generate the next key and setup operation specific
* statistics tracking objects.
@@ -380,10 +347,10 @@ worker_async(void *arg)
switch (*op) {
case WORKER_INSERT:
case WORKER_INSERT_RMW:
- if (cfg->random_range)
+ if (opts->random_range)
next_val = wtperf_rand(thread);
else
- next_val = cfg->icount + get_next_incr(cfg);
+ next_val = opts->icount + get_next_incr(wtperf);
break;
case WORKER_READ:
case WORKER_UPDATE:
@@ -394,22 +361,22 @@ worker_async(void *arg)
* we rely on at least one insert to get a valid item
* id.
*/
- if (wtperf_value_range(cfg) < next_val)
+ if (wtperf_value_range(wtperf) < next_val)
continue;
break;
default:
goto err; /* can't happen */
}
- generate_key(cfg, key_buf, next_val);
+ generate_key(opts, key_buf, next_val);
/*
* Spread the data out around the multiple databases.
* Sleep to allow workers a chance to run and process async ops.
* Then retry to get an async op.
*/
- while ((ret = conn->async_new_op(
- conn, cfg->uris[map_key_to_table(cfg, next_val)],
+ while ((ret = conn->async_new_op(conn,
+ wtperf->uris[map_key_to_table(wtperf->opts, next_val)],
NULL, &cb, &asyncop)) == EBUSY)
(void)usleep(10000);
if (ret != 0)
@@ -424,23 +391,23 @@ worker_async(void *arg)
break;
goto op_err;
case WORKER_INSERT:
- if (cfg->random_value)
+ if (opts->random_value)
randomize_value(thread, value_buf);
asyncop->set_value(asyncop, value_buf);
if ((ret = asyncop->insert(asyncop)) == 0)
break;
goto op_err;
case WORKER_UPDATE:
- if (cfg->random_value)
+ if (opts->random_value)
randomize_value(thread, value_buf);
asyncop->set_value(asyncop, value_buf);
if ((ret = asyncop->update(asyncop)) == 0)
break;
goto op_err;
default:
-op_err: lprintf(cfg, ret, 0,
+op_err: lprintf(wtperf, ret, 0,
"%s failed for: %s, range: %"PRIu64,
- op_name(op), key_buf, wtperf_value_range(cfg));
+ op_name(op), key_buf, wtperf_value_range(wtperf));
goto err; /* can't happen */
}
@@ -454,7 +421,7 @@ op_err: lprintf(cfg, ret, 0,
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = 1;
+err: wtperf->error = wtperf->stop = true;
}
return (NULL);
}
@@ -465,17 +432,19 @@ err: cfg->error = cfg->stop = 1;
* search do them. Ensuring the keys we see are always in order.
*/
static int
-do_range_reads(CONFIG *cfg, WT_CURSOR *cursor)
+do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor)
{
+ CONFIG_OPTS *opts;
size_t range;
uint64_t next_val, prev_val;
char *range_key_buf;
char buf[512];
int ret;
+ opts = wtperf->opts;
ret = 0;
- if (cfg->read_range == 0)
+ if (opts->read_range == 0)
return (0);
memset(&buf[0], 0, 512 * sizeof(char));
@@ -485,7 +454,7 @@ do_range_reads(CONFIG *cfg, WT_CURSOR *cursor)
testutil_check(cursor->get_key(cursor, &range_key_buf));
extract_key(range_key_buf, &next_val);
- for (range = 0; range < cfg->read_range; ++range) {
+ for (range = 0; range < opts->read_range; ++range) {
prev_val = next_val;
ret = cursor->next(cursor);
/* We are done if we reach the end. */
@@ -496,7 +465,7 @@ do_range_reads(CONFIG *cfg, WT_CURSOR *cursor)
testutil_check(cursor->get_key(cursor, &range_key_buf));
extract_key(range_key_buf, &next_val);
if (next_val < prev_val) {
- lprintf(cfg, EINVAL, 0,
+ lprintf(wtperf, EINVAL, 0,
"Out of order keys %" PRIu64
" came before %" PRIu64,
prev_val, next_val);
@@ -510,9 +479,10 @@ static void *
worker(void *arg)
{
struct timespec start, stop;
- CONFIG *cfg;
- CONFIG_THREAD *thread;
+ CONFIG_OPTS *opts;
TRACK *trk;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
WT_CONNECTION *conn;
WT_CURSOR **cursors, *cursor, *log_table_cursor, *tmp_cursor;
WT_SESSION *session;
@@ -524,9 +494,10 @@ worker(void *arg)
char *value_buf, *key_buf, *value;
char buf[512];
- thread = (CONFIG_THREAD *)arg;
- cfg = thread->cfg;
- conn = cfg->conn;
+ thread = (WTPERF_THREAD *)arg;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+ conn = wtperf->conn;
cursors = NULL;
log_table_cursor = NULL; /* -Wconditional-initialized */
ops = 0;
@@ -535,42 +506,40 @@ worker(void *arg)
trk = NULL;
if ((ret = conn->open_session(
- conn, NULL, cfg->sess_config, &session)) != 0) {
- lprintf(cfg, ret, 0, "worker: WT_CONNECTION.open_session");
+ conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0, "worker: WT_CONNECTION.open_session");
goto err;
}
- cursors = dcalloc(cfg->table_count, sizeof(WT_CURSOR *));
- for (i = 0; i < cfg->table_count_idle; i++) {
- snprintf(buf, 512, "%s_idle%05d", cfg->uris[0], (int)i);
+ cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
+ for (i = 0; i < opts->table_count_idle; i++) {
+ snprintf(buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i);
if ((ret = session->open_cursor(
session, buf, NULL, NULL, &tmp_cursor)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"Error opening idle table %s", buf);
goto err;
}
if ((ret = tmp_cursor->close(tmp_cursor)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"Error closing idle table %s", buf);
goto err;
}
}
- for (i = 0; i < cfg->table_count; i++) {
+ for (i = 0; i < opts->table_count; i++) {
if ((ret = session->open_cursor(session,
- cfg->uris[i], NULL, NULL, &cursors[i])) != 0) {
- lprintf(cfg, ret, 0,
+ wtperf->uris[i], NULL, NULL, &cursors[i])) != 0) {
+ lprintf(wtperf, ret, 0,
"worker: WT_SESSION.open_cursor: %s",
- cfg->uris[i]);
+ wtperf->uris[i]);
goto err;
}
}
- if (cfg->log_like_table) {
- if ((ret = session->open_cursor(session,
- cfg->log_table_uri, NULL, NULL, &log_table_cursor)) != 0) {
- lprintf(cfg, ret, 0,
- "worker: WT_SESSION.open_cursor: %s",
- cfg->log_table_uri);
- goto err;
- }
+ if (opts->log_like_table && (ret = session->open_cursor(session,
+ wtperf->log_table_uri, NULL, NULL, &log_table_cursor)) != 0) {
+ lprintf(wtperf, ret, 0,
+ "worker: WT_SESSION.open_cursor: %s",
+ wtperf->log_table_uri);
+ goto err;
}
/* Setup the timer for throttling. */
@@ -579,7 +548,7 @@ worker(void *arg)
/* Setup for truncate */
if (thread->workload->truncate != 0)
- if ((ret = setup_truncate(cfg, thread, session)) != 0)
+ if ((ret = setup_truncate(wtperf, thread, session)) != 0)
goto err;
key_buf = thread->key_buf;
@@ -588,13 +557,13 @@ worker(void *arg)
op = thread->workload->ops;
op_end = op + sizeof(thread->workload->ops);
- if ((ops_per_txn != 0 || cfg->log_like_table) &&
+ if ((ops_per_txn != 0 || opts->log_like_table) &&
(ret = session->begin_transaction(session, NULL)) != 0) {
- lprintf(cfg, ret, 0, "First transaction begin failed");
+ lprintf(wtperf, ret, 0, "First transaction begin failed");
goto err;
}
- while (!cfg->stop) {
+ while (!wtperf->stop) {
/*
* Generate the next key and setup operation specific
* statistics tracking objects.
@@ -603,10 +572,10 @@ worker(void *arg)
case WORKER_INSERT:
case WORKER_INSERT_RMW:
trk = &thread->insert;
- if (cfg->random_range)
+ if (opts->random_range)
next_val = wtperf_rand(thread);
else
- next_val = cfg->icount + get_next_incr(cfg);
+ next_val = opts->icount + get_next_incr(wtperf);
break;
case WORKER_READ:
trk = &thread->read;
@@ -621,7 +590,7 @@ worker(void *arg)
* we rely on at least one insert to get a valid item
* id.
*/
- if (wtperf_value_range(cfg) < next_val)
+ if (wtperf_value_range(wtperf) < next_val)
continue;
break;
case WORKER_TRUNCATE:
@@ -632,24 +601,22 @@ worker(void *arg)
goto err; /* can't happen */
}
- generate_key(cfg, key_buf, next_val);
+ generate_key(opts, key_buf, next_val);
/*
* Spread the data out around the multiple databases.
*/
- cursor = cursors[map_key_to_table(cfg, next_val)];
+ cursor = cursors[map_key_to_table(wtperf->opts, next_val)];
/*
* Skip the first time we do an operation, when trk->ops
* is 0, to avoid first time latency spikes.
*/
measure_latency =
- cfg->sample_interval != 0 && trk != NULL &&
- trk->ops != 0 && (trk->ops % cfg->sample_rate == 0);
- if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) {
- lprintf(cfg, ret, 0, "Get time call failed");
- goto err;
- }
+ opts->sample_interval != 0 && trk != NULL &&
+ trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
+ if (measure_latency)
+ __wt_epoch(NULL, &start);
cursor->set_key(cursor, key_buf);
@@ -666,7 +633,7 @@ worker(void *arg)
if (ret == 0) {
if ((ret = cursor->get_value(
cursor, &value)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"get_value in read.");
goto err;
}
@@ -675,7 +642,7 @@ worker(void *arg)
* for several operations, confirming that the
* next key is in the correct order.
*/
- ret = do_range_reads(cfg, cursor);
+ ret = do_range_reads(wtperf, cursor);
}
if (ret == 0 || ret == WT_NOTFOUND)
@@ -690,15 +657,15 @@ worker(void *arg)
/* FALLTHROUGH */
case WORKER_INSERT:
- if (cfg->random_value)
+ if (opts->random_value)
randomize_value(thread, value_buf);
cursor->set_value(cursor, value_buf);
if ((ret = cursor->insert(cursor)) == 0)
break;
goto op_err;
case WORKER_TRUNCATE:
- if ((ret = run_truncate(
- cfg, thread, cursor, session, &truncated)) == 0) {
+ if ((ret = run_truncate(wtperf,
+ thread, cursor, session, &truncated)) == 0) {
if (truncated)
trk = &thread->truncate;
else
@@ -712,7 +679,7 @@ worker(void *arg)
if ((ret = cursor->search(cursor)) == 0) {
if ((ret = cursor->get_value(
cursor, &value)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"get_value in update.");
goto err;
}
@@ -721,14 +688,14 @@ worker(void *arg)
* safe, and be sure to NUL-terminate.
*/
strncpy(value_buf,
- value, cfg->value_sz_max - 1);
+ value, opts->value_sz_max - 1);
if (thread->workload->update_delta != 0)
update_value_delta(thread);
if (value_buf[0] == 'a')
value_buf[0] = 'b';
else
value_buf[0] = 'a';
- if (cfg->random_value)
+ if (opts->random_value)
randomize_value(thread, value_buf);
cursor->set_value(cursor, value_buf);
if ((ret = cursor->update(cursor)) == 0)
@@ -756,62 +723,59 @@ op_err: if (ret == WT_ROLLBACK && ops_per_txn != 0) {
* order in cases of ordered inserts, as we
* aren't retrying here.
*/
- lprintf(cfg, ret, 1,
+ lprintf(wtperf, ret, 1,
"%s for: %s, range: %"PRIu64, op_name(op),
- key_buf, wtperf_value_range(cfg));
+ key_buf, wtperf_value_range(wtperf));
if ((ret = session->rollback_transaction(
session, NULL)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"Failed rollback_transaction");
goto err;
}
if ((ret = session->begin_transaction(
session, NULL)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"Worker begin transaction failed");
goto err;
}
break;
}
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"%s failed for: %s, range: %"PRIu64,
- op_name(op), key_buf, wtperf_value_range(cfg));
+ op_name(op), key_buf, wtperf_value_range(wtperf));
goto err;
default:
goto err; /* can't happen */
}
/* Update the log-like table. */
- if (cfg->log_like_table &&
+ if (opts->log_like_table &&
(*op != WORKER_READ && *op != WORKER_TRUNCATE)) {
- log_id = __wt_atomic_add64(&cfg->log_like_table_key, 1);
+ log_id =
+ __wt_atomic_add64(&wtperf->log_like_table_key, 1);
log_table_cursor->set_key(log_table_cursor, log_id);
log_table_cursor->set_value(
log_table_cursor, value_buf);
if ((ret =
log_table_cursor->insert(log_table_cursor)) != 0) {
- lprintf(cfg, ret, 0, "Cursor insert failed");
+ lprintf(wtperf, ret, 0, "Cursor insert failed");
goto err;
}
}
/* Release the cursor, if we have multiple tables. */
- if (cfg->table_count > 1 && ret == 0 &&
+ if (opts->table_count > 1 && ret == 0 &&
*op != WORKER_INSERT && *op != WORKER_INSERT_RMW) {
if ((ret = cursor->reset(cursor)) != 0) {
- lprintf(cfg, ret, 0, "Cursor reset failed");
+ lprintf(wtperf, ret, 0, "Cursor reset failed");
goto err;
}
}
/* Gather statistics */
- if (!cfg->in_warmup) {
+ if (!wtperf->in_warmup) {
if (measure_latency) {
- if ((ret = __wt_epoch(NULL, &stop)) != 0) {
- lprintf(cfg, ret, 0,
- "Get time call failed");
- goto err;
- }
+ __wt_epoch(NULL, &stop);
++trk->latency_ops;
usecs = WT_TIMEDIFF_US(stop, start);
track_operation(trk, usecs);
@@ -824,17 +788,17 @@ op_err: if (ret == WT_ROLLBACK && ops_per_txn != 0) {
* Commit the transaction if grouping operations together
* or tracking changes in our log table.
*/
- if ((cfg->log_like_table && ops_per_txn == 0) ||
+ if ((opts->log_like_table && ops_per_txn == 0) ||
(ops_per_txn != 0 && ops++ % ops_per_txn == 0)) {
if ((ret = session->commit_transaction(
session, NULL)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"Worker transaction commit failed");
goto err;
}
if ((ret = session->begin_transaction(
session, NULL)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"Worker begin transaction failed");
goto err;
}
@@ -854,13 +818,13 @@ op_err: if (ret == WT_ROLLBACK && ops_per_txn != 0) {
}
if ((ret = session->close(session, NULL)) != 0) {
- lprintf(cfg, ret, 0, "Session close in worker failed");
+ lprintf(wtperf, ret, 0, "Session close in worker failed");
goto err;
}
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = 1;
+err: wtperf->error = wtperf->stop = true;
}
free(cursors);
@@ -913,14 +877,17 @@ run_mix_schedule_op(WORKLOAD *workp, int op, int64_t op_cnt)
* Schedule the mixed-run operations.
*/
static int
-run_mix_schedule(CONFIG *cfg, WORKLOAD *workp)
+run_mix_schedule(WTPERF *wtperf, WORKLOAD *workp)
{
+ CONFIG_OPTS *opts;
int64_t pct;
+ opts = wtperf->opts;
+
/* Confirm reads, inserts, truncates and updates cannot all be zero. */
if (workp->insert == 0 && workp->read == 0 &&
workp->truncate == 0 && workp->update == 0) {
- lprintf(cfg, EINVAL, 0, "no operations scheduled");
+ lprintf(wtperf, EINVAL, 0, "no operations scheduled");
return (EINVAL);
}
@@ -931,7 +898,7 @@ run_mix_schedule(CONFIG *cfg, WORKLOAD *workp)
if (workp->truncate != 0) {
if (workp->insert != 0 ||
workp->read != 0 || workp->update != 0) {
- lprintf(cfg, EINVAL, 0,
+ lprintf(wtperf, EINVAL, 0,
"Can't configure truncate in a mixed workload");
return (EINVAL);
}
@@ -947,7 +914,7 @@ run_mix_schedule(CONFIG *cfg, WORKLOAD *workp)
*/
if (workp->insert != 0 && workp->read == 0 && workp->update == 0) {
memset(workp->ops,
- cfg->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT,
+ opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT,
sizeof(workp->ops));
return (0);
}
@@ -979,7 +946,7 @@ run_mix_schedule(CONFIG *cfg, WORKLOAD *workp)
(workp->insert + workp->read + workp->update);
if (pct != 0)
run_mix_schedule_op(workp,
- cfg->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT, pct);
+ opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT, pct);
pct = (workp->update * 100) /
(workp->insert + workp->read + workp->update);
if (pct != 0)
@@ -991,9 +958,10 @@ static void *
populate_thread(void *arg)
{
struct timespec start, stop;
- CONFIG *cfg;
- CONFIG_THREAD *thread;
+ CONFIG_OPTS *opts;
TRACK *trk;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
WT_CONNECTION *conn;
WT_CURSOR **cursors, *cursor;
WT_SESSION *session;
@@ -1004,9 +972,10 @@ populate_thread(void *arg)
char *value_buf, *key_buf;
const char *cursor_config;
- thread = (CONFIG_THREAD *)arg;
- cfg = thread->cfg;
- conn = cfg->conn;
+ thread = (WTPERF_THREAD *)arg;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+ conn = wtperf->conn;
session = NULL;
cursors = NULL;
ret = stress_checkpoint_due = 0;
@@ -1016,37 +985,37 @@ populate_thread(void *arg)
value_buf = thread->value_buf;
if ((ret = conn->open_session(
- conn, NULL, cfg->sess_config, &session)) != 0) {
- lprintf(cfg, ret, 0, "populate: WT_CONNECTION.open_session");
+ conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0, "populate: WT_CONNECTION.open_session");
goto err;
}
/* Do bulk loads if populate is single-threaded. */
cursor_config =
- (cfg->populate_threads == 1 && !cfg->index) ? "bulk" : NULL;
+ (opts->populate_threads == 1 && !opts->index) ? "bulk" : NULL;
/* Create the cursors. */
- cursors = dcalloc(cfg->table_count, sizeof(WT_CURSOR *));
- for (i = 0; i < cfg->table_count; i++) {
+ cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
+ for (i = 0; i < opts->table_count; i++) {
if ((ret = session->open_cursor(
- session, cfg->uris[i], NULL,
+ session, wtperf->uris[i], NULL,
cursor_config, &cursors[i])) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"populate: WT_SESSION.open_cursor: %s",
- cfg->uris[i]);
+ wtperf->uris[i]);
goto err;
}
}
/* Populate the databases. */
for (intxn = 0, opcount = 0;;) {
- op = get_next_incr(cfg);
- if (op > cfg->icount)
+ op = get_next_incr(wtperf);
+ if (op > opts->icount)
break;
- if (cfg->populate_ops_per_txn != 0 && !intxn) {
+ if (opts->populate_ops_per_txn != 0 && !intxn) {
if ((ret = session->begin_transaction(
- session, cfg->transaction_config)) != 0) {
- lprintf(cfg, ret, 0,
+ session, opts->transaction_config)) != 0) {
+ lprintf(wtperf, ret, 0,
"Failed starting transaction.");
goto err;
}
@@ -1055,31 +1024,29 @@ populate_thread(void *arg)
/*
* Figure out which table this op belongs to.
*/
- cursor = cursors[map_key_to_table(cfg, op)];
- generate_key(cfg, key_buf, op);
+ cursor = cursors[map_key_to_table(wtperf->opts, op)];
+ generate_key(opts, key_buf, op);
measure_latency =
- cfg->sample_interval != 0 &&
- trk->ops != 0 && (trk->ops % cfg->sample_rate == 0);
- if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) {
- lprintf(cfg, ret, 0, "Get time call failed");
- goto err;
- }
+ opts->sample_interval != 0 &&
+ trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
+ if (measure_latency)
+ __wt_epoch(NULL, &start);
cursor->set_key(cursor, key_buf);
- if (cfg->random_value)
+ if (opts->random_value)
randomize_value(thread, value_buf);
cursor->set_value(cursor, value_buf);
if ((ret = cursor->insert(cursor)) == WT_ROLLBACK) {
- lprintf(cfg, ret, 0, "insert retrying");
+ lprintf(wtperf, ret, 0, "insert retrying");
if ((ret = session->rollback_transaction(
session, NULL)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"Failed rollback_transaction");
goto err;
}
intxn = 0;
continue;
} else if (ret != 0) {
- lprintf(cfg, ret, 0, "Failed inserting");
+ lprintf(wtperf, ret, 0, "Failed inserting");
goto err;
}
/*
@@ -1089,28 +1056,25 @@ populate_thread(void *arg)
* of them.
*/
if (measure_latency) {
- if ((ret = __wt_epoch(NULL, &stop)) != 0) {
- lprintf(cfg, ret, 0, "Get time call failed");
- goto err;
- }
+ __wt_epoch(NULL, &stop);
++trk->latency_ops;
usecs = WT_TIMEDIFF_US(stop, start);
track_operation(trk, usecs);
}
++thread->insert.ops; /* Same as trk->ops */
- if (cfg->checkpoint_stress_rate != 0 &&
- (op % cfg->checkpoint_stress_rate) == 0)
+ if (opts->checkpoint_stress_rate != 0 &&
+ (op % opts->checkpoint_stress_rate) == 0)
stress_checkpoint_due = 1;
- if (cfg->populate_ops_per_txn != 0) {
- if (++opcount < cfg->populate_ops_per_txn)
+ if (opts->populate_ops_per_txn != 0) {
+ if (++opcount < opts->populate_ops_per_txn)
continue;
opcount = 0;
if ((ret = session->commit_transaction(
session, NULL)) != 0)
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"Fail committing, transaction was aborted");
intxn = 0;
}
@@ -1118,24 +1082,24 @@ populate_thread(void *arg)
if (stress_checkpoint_due && intxn == 0) {
stress_checkpoint_due = 0;
if ((ret = session->checkpoint(session, NULL)) != 0) {
- lprintf(cfg, ret, 0, "Checkpoint failed");
+ lprintf(wtperf, ret, 0, "Checkpoint failed");
goto err;
}
}
}
if (intxn &&
(ret = session->commit_transaction(session, NULL)) != 0)
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"Fail committing, transaction was aborted");
if ((ret = session->close(session, NULL)) != 0) {
- lprintf(cfg, ret, 0, "Error closing session in populate");
+ lprintf(wtperf, ret, 0, "Error closing session in populate");
goto err;
}
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = 1;
+err: wtperf->error = wtperf->stop = true;
}
free(cursors);
@@ -1146,9 +1110,10 @@ static void *
populate_async(void *arg)
{
struct timespec start, stop;
- CONFIG *cfg;
- CONFIG_THREAD *thread;
+ CONFIG_OPTS *opts;
TRACK *trk;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
WT_ASYNC_OP *asyncop;
WT_CONNECTION *conn;
WT_SESSION *session;
@@ -1156,9 +1121,10 @@ populate_async(void *arg)
int measure_latency, ret;
char *value_buf, *key_buf;
- thread = (CONFIG_THREAD *)arg;
- cfg = thread->cfg;
- conn = cfg->conn;
+ thread = (WTPERF_THREAD *)arg;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+ conn = wtperf->conn;
session = NULL;
ret = 0;
trk = &thread->insert;
@@ -1167,8 +1133,8 @@ populate_async(void *arg)
value_buf = thread->value_buf;
if ((ret = conn->open_session(
- conn, NULL, cfg->sess_config, &session)) != 0) {
- lprintf(cfg, ret, 0, "populate: WT_CONNECTION.open_session");
+ conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0, "populate: WT_CONNECTION.open_session");
goto err;
}
@@ -1178,38 +1144,38 @@ populate_async(void *arg)
* the time to process by workers.
*/
measure_latency =
- cfg->sample_interval != 0 &&
- trk->ops != 0 && (trk->ops % cfg->sample_rate == 0);
- if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) {
- lprintf(cfg, ret, 0, "Get time call failed");
- goto err;
- }
+ opts->sample_interval != 0 &&
+ trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
+ if (measure_latency)
+ __wt_epoch(NULL, &start);
+
/* Populate the databases. */
for (;;) {
- op = get_next_incr(cfg);
- if (op > cfg->icount)
+ op = get_next_incr(wtperf);
+ if (op > opts->icount)
break;
/*
* Allocate an async op for whichever table.
*/
while ((ret = conn->async_new_op(
- conn, cfg->uris[map_key_to_table(cfg, op)],
+ conn, wtperf->uris[map_key_to_table(wtperf->opts, op)],
NULL, &cb, &asyncop)) == EBUSY)
(void)usleep(10000);
if (ret != 0)
goto err;
asyncop->app_private = thread;
- generate_key(cfg, key_buf, op);
+ generate_key(opts, key_buf, op);
asyncop->set_key(asyncop, key_buf);
- if (cfg->random_value)
+ if (opts->random_value)
randomize_value(thread, value_buf);
asyncop->set_value(asyncop, value_buf);
if ((ret = asyncop->insert(asyncop)) != 0) {
- lprintf(cfg, ret, 0, "Failed inserting");
+ lprintf(wtperf, ret, 0, "Failed inserting");
goto err;
}
}
+
/*
* Gather statistics.
* We measure the latency of inserting a single key. If there
@@ -1221,22 +1187,19 @@ populate_async(void *arg)
if (conn->async_flush(conn) != 0)
goto err;
if (measure_latency) {
- if ((ret = __wt_epoch(NULL, &stop)) != 0) {
- lprintf(cfg, ret, 0, "Get time call failed");
- goto err;
- }
+ __wt_epoch(NULL, &stop);
++trk->latency_ops;
usecs = WT_TIMEDIFF_US(stop, start);
track_operation(trk, usecs);
}
if ((ret = session->close(session, NULL)) != 0) {
- lprintf(cfg, ret, 0, "Error closing session in populate");
+ lprintf(wtperf, ret, 0, "Error closing session in populate");
goto err;
}
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = 1;
+err: wtperf->error = wtperf->stop = true;
}
return (NULL);
}
@@ -1246,8 +1209,9 @@ monitor(void *arg)
{
struct timespec t;
struct tm *tm, _tm;
- CONFIG *cfg;
+ CONFIG_OPTS *opts;
FILE *fp;
+ WTPERF *wtperf;
size_t len;
uint64_t min_thr, reads, inserts, updates;
uint64_t cur_reads, cur_inserts, cur_updates;
@@ -1257,24 +1221,26 @@ monitor(void *arg)
uint32_t update_avg, update_min, update_max;
uint32_t latency_max, level;
u_int i;
- int msg_err, ret;
+ int msg_err;
const char *str;
char buf[64], *path;
- cfg = (CONFIG *)arg;
- assert(cfg->sample_interval != 0);
+ wtperf = (WTPERF *)arg;
+ opts = wtperf->opts;
+ assert(opts->sample_interval != 0);
+
fp = NULL;
path = NULL;
- min_thr = (uint64_t)cfg->min_throughput;
- latency_max = (uint32_t)ms_to_us(cfg->max_latency);
+ min_thr = (uint64_t)opts->min_throughput;
+ latency_max = (uint32_t)ms_to_us(opts->max_latency);
/* Open the logging file. */
- len = strlen(cfg->monitor_dir) + 100;
+ len = strlen(wtperf->monitor_dir) + 100;
path = dmalloc(len);
- snprintf(path, len, "%s/monitor", cfg->monitor_dir);
+ snprintf(path, len, "%s/monitor", wtperf->monitor_dir);
if ((fp = fopen(path, "w")) == NULL) {
- lprintf(cfg, errno, 0, "%s", path);
+ lprintf(wtperf, errno, 0, "%s", path);
goto err;
}
/* Set line buffering for monitor file. */
@@ -1297,34 +1263,31 @@ monitor(void *arg)
"update maximum latency(uS)"
"\n");
last_reads = last_inserts = last_updates = 0;
- while (!cfg->stop) {
- for (i = 0; i < cfg->sample_interval; i++) {
+ while (!wtperf->stop) {
+ for (i = 0; i < opts->sample_interval; i++) {
sleep(1);
- if (cfg->stop)
+ if (wtperf->stop)
break;
}
/* If the workers are done, don't bother with a final call. */
- if (cfg->stop)
+ if (wtperf->stop)
break;
- if (cfg->in_warmup)
+ if (wtperf->in_warmup)
continue;
- if ((ret = __wt_epoch(NULL, &t)) != 0) {
- lprintf(cfg, ret, 0, "Get time call failed");
- goto err;
- }
+ __wt_epoch(NULL, &t);
tm = localtime_r(&t.tv_sec, &_tm);
(void)strftime(buf, sizeof(buf), "%b %d %H:%M:%S", tm);
- reads = sum_read_ops(cfg);
- inserts = sum_insert_ops(cfg);
- updates = sum_update_ops(cfg);
- latency_read(cfg, &read_avg, &read_min, &read_max);
- latency_insert(cfg, &insert_avg, &insert_min, &insert_max);
- latency_update(cfg, &update_avg, &update_min, &update_max);
+ reads = sum_read_ops(wtperf);
+ inserts = sum_insert_ops(wtperf);
+ updates = sum_update_ops(wtperf);
+ latency_read(wtperf, &read_avg, &read_min, &read_max);
+ latency_insert(wtperf, &insert_avg, &insert_min, &insert_max);
+ latency_update(wtperf, &update_avg, &update_min, &update_max);
- cur_reads = (reads - last_reads) / cfg->sample_interval;
- cur_updates = (updates - last_updates) / cfg->sample_interval;
+ cur_reads = (reads - last_reads) / opts->sample_interval;
+ cur_updates = (updates - last_updates) / opts->sample_interval;
/*
* For now the only item we need to worry about changing is
* inserts when we transition from the populate phase to
@@ -1334,7 +1297,7 @@ monitor(void *arg)
cur_inserts = 0;
else
cur_inserts =
- (inserts - last_inserts) / cfg->sample_interval;
+ (inserts - last_inserts) / opts->sample_interval;
(void)fprintf(fp,
"%s,%" PRIu32
@@ -1344,9 +1307,9 @@ monitor(void *arg)
",%" PRIu32 ",%" PRIu32 ",%" PRIu32
",%" PRIu32 ",%" PRIu32 ",%" PRIu32
"\n",
- buf, cfg->totalsec,
+ buf, wtperf->totalsec,
cur_reads, cur_inserts, cur_updates,
- cfg->ckpt ? 'Y' : 'N',
+ wtperf->ckpt ? 'Y' : 'N',
read_avg, read_min, read_max,
insert_avg, insert_min, insert_max,
update_avg, update_min, update_max);
@@ -1354,7 +1317,7 @@ monitor(void *arg)
if (latency_max != 0 &&
(read_max > latency_max || insert_max > latency_max ||
update_max > latency_max)) {
- if (cfg->max_latency_fatal) {
+ if (opts->max_latency_fatal) {
level = 1;
msg_err = WT_PANIC;
str = "ERROR";
@@ -1363,7 +1326,7 @@ monitor(void *arg)
msg_err = 0;
str = "WARNING";
}
- lprintf(cfg, msg_err, level,
+ lprintf(wtperf, msg_err, level,
"%s: max latency exceeded: threshold %" PRIu32
" read max %" PRIu32 " insert max %" PRIu32
" update max %" PRIu32, str, latency_max,
@@ -1373,7 +1336,7 @@ monitor(void *arg)
((cur_reads != 0 && cur_reads < min_thr) ||
(cur_inserts != 0 && cur_inserts < min_thr) ||
(cur_updates != 0 && cur_updates < min_thr))) {
- if (cfg->min_throughput_fatal) {
+ if (opts->min_throughput_fatal) {
level = 1;
msg_err = WT_PANIC;
str = "ERROR";
@@ -1382,7 +1345,7 @@ monitor(void *arg)
msg_err = 0;
str = "WARNING";
}
- lprintf(cfg, msg_err, level,
+ lprintf(wtperf, msg_err, level,
"%s: minimum throughput not met: threshold %" PRIu64
" reads %" PRIu64 " inserts %" PRIu64
" updates %" PRIu64, str, min_thr, cur_reads,
@@ -1395,7 +1358,7 @@ monitor(void *arg)
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = 1;
+err: wtperf->error = wtperf->stop = true;
}
if (fp != NULL)
@@ -1408,75 +1371,73 @@ err: cfg->error = cfg->stop = 1;
static void *
checkpoint_worker(void *arg)
{
- CONFIG *cfg;
- CONFIG_THREAD *thread;
+ CONFIG_OPTS *opts;
+ WTPERF *wtperf;
+ WTPERF_THREAD *thread;
WT_CONNECTION *conn;
WT_SESSION *session;
struct timespec e, s;
uint32_t i;
int ret;
- thread = (CONFIG_THREAD *)arg;
- cfg = thread->cfg;
- conn = cfg->conn;
+ thread = (WTPERF_THREAD *)arg;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
+ conn = wtperf->conn;
session = NULL;
if ((ret = conn->open_session(
- conn, NULL, cfg->sess_config, &session)) != 0) {
- lprintf(cfg, ret, 0,
+ conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0,
"open_session failed in checkpoint thread.");
goto err;
}
- while (!cfg->stop) {
+ while (!wtperf->stop) {
/* Break the sleep up, so we notice interrupts faster. */
- for (i = 0; i < cfg->checkpoint_interval; i++) {
+ for (i = 0; i < opts->checkpoint_interval; i++) {
sleep(1);
- if (cfg->stop)
+ if (wtperf->stop)
break;
}
/* If the workers are done, don't bother with a final call. */
- if (cfg->stop)
+ if (wtperf->stop)
break;
- if ((ret = __wt_epoch(NULL, &s)) != 0) {
- lprintf(cfg, ret, 0, "Get time failed in checkpoint.");
- goto err;
- }
- cfg->ckpt = 1;
+ __wt_epoch(NULL, &s);
+
+ wtperf->ckpt = true;
if ((ret = session->checkpoint(session, NULL)) != 0) {
- lprintf(cfg, ret, 0, "Checkpoint failed.");
+ lprintf(wtperf, ret, 0, "Checkpoint failed.");
goto err;
}
- cfg->ckpt = 0;
+ wtperf->ckpt = false;
++thread->ckpt.ops;
- if ((ret = __wt_epoch(NULL, &e)) != 0) {
- lprintf(cfg, ret, 0, "Get time failed in checkpoint.");
- goto err;
- }
+ __wt_epoch(NULL, &e);
}
if (session != NULL &&
((ret = session->close(session, NULL)) != 0)) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"Error closing session in checkpoint worker.");
goto err;
}
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = 1;
+err: wtperf->error = wtperf->stop = true;
}
return (NULL);
}
static int
-execute_populate(CONFIG *cfg)
+execute_populate(WTPERF *wtperf)
{
struct timespec start, stop;
- CONFIG_THREAD *popth;
+ CONFIG_OPTS *opts;
+ WTPERF_THREAD *popth;
WT_ASYNC_OP *asyncop;
pthread_t idle_table_cycle_thread;
size_t i;
@@ -1486,59 +1447,57 @@ execute_populate(CONFIG *cfg)
int elapsed, ret;
void *(*pfunc)(void *);
- lprintf(cfg, 0, 1,
+ opts = wtperf->opts;
+
+ lprintf(wtperf, 0, 1,
"Starting %" PRIu32
" populate thread(s) for %" PRIu32 " items",
- cfg->populate_threads, cfg->icount);
+ opts->populate_threads, opts->icount);
/* Start cycling idle tables if configured. */
- if ((ret = start_idle_table_cycle(cfg, &idle_table_cycle_thread)) != 0)
+ if ((ret =
+ start_idle_table_cycle(wtperf, &idle_table_cycle_thread)) != 0)
return (ret);
- cfg->insert_key = 0;
+ wtperf->insert_key = 0;
- cfg->popthreads = dcalloc(cfg->populate_threads, sizeof(CONFIG_THREAD));
- if (cfg->use_asyncops > 0) {
- lprintf(cfg, 0, 1, "Starting %" PRIu32 " async thread(s)",
- cfg->async_threads);
+ wtperf->popthreads =
+ dcalloc(opts->populate_threads, sizeof(WTPERF_THREAD));
+ if (wtperf->use_asyncops) {
+ lprintf(wtperf, 0, 1, "Starting %" PRIu32 " async thread(s)",
+ opts->async_threads);
pfunc = populate_async;
} else
pfunc = populate_thread;
- if ((ret = start_threads(cfg, NULL,
- cfg->popthreads, cfg->populate_threads, pfunc)) != 0)
+ if ((ret = start_threads(wtperf, NULL,
+ wtperf->popthreads, opts->populate_threads, pfunc)) != 0)
return (ret);
- if ((ret = __wt_epoch(NULL, &start)) != 0) {
- lprintf(cfg, ret, 0, "Get time failed in populate.");
- return (ret);
- }
+ __wt_epoch(NULL, &start);
for (elapsed = 0, interval = 0, last_ops = 0;
- cfg->insert_key < cfg->icount && cfg->error == 0;) {
+ wtperf->insert_key < opts->icount && !wtperf->error;) {
/*
* Sleep for 100th of a second, report_interval is in second
* granularity, each 100th increment of elapsed is a single
* increment of interval.
*/
(void)usleep(10000);
- if (cfg->report_interval == 0 || ++elapsed < 100)
+ if (opts->report_interval == 0 || ++elapsed < 100)
continue;
elapsed = 0;
- if (++interval < cfg->report_interval)
+ if (++interval < opts->report_interval)
continue;
interval = 0;
- cfg->totalsec += cfg->report_interval;
- cfg->insert_ops = sum_pop_ops(cfg);
- lprintf(cfg, 0, 1,
+ wtperf->totalsec += opts->report_interval;
+ wtperf->insert_ops = sum_pop_ops(wtperf);
+ lprintf(wtperf, 0, 1,
"%" PRIu64 " populate inserts (%" PRIu64 " of %"
PRIu32 ") in %" PRIu32 " secs (%" PRIu32 " total secs)",
- cfg->insert_ops - last_ops, cfg->insert_ops,
- cfg->icount, cfg->report_interval, cfg->totalsec);
- last_ops = cfg->insert_ops;
- }
- if ((ret = __wt_epoch(NULL, &stop)) != 0) {
- lprintf(cfg, ret, 0, "Get time failed in populate.");
- return (ret);
+ wtperf->insert_ops - last_ops, wtperf->insert_ops,
+ opts->icount, opts->report_interval, wtperf->totalsec);
+ last_ops = wtperf->insert_ops;
}
+ __wt_epoch(NULL, &stop);
/*
* Move popthreads aside to narrow possible race with the monitor
@@ -1546,21 +1505,22 @@ execute_populate(CONFIG *cfg)
* NULL when the populate phase is finished, to know that the workload
* phase has started.
*/
- popth = cfg->popthreads;
- cfg->popthreads = NULL;
- ret = stop_threads(cfg, cfg->populate_threads, popth);
+ popth = wtperf->popthreads;
+ wtperf->popthreads = NULL;
+ ret = stop_threads(wtperf, opts->populate_threads, popth);
free(popth);
if (ret != 0)
return (ret);
/* Report if any worker threads didn't finish. */
- if (cfg->error != 0) {
- lprintf(cfg, WT_ERROR, 0,
+ if (wtperf->error) {
+ lprintf(wtperf, WT_ERROR, 0,
"Populate thread(s) exited without finishing.");
return (WT_ERROR);
}
- lprintf(cfg, 0, 1, "Finished load of %" PRIu32 " items", cfg->icount);
+ lprintf(wtperf,
+ 0, 1, "Finished load of %" PRIu32 " items", opts->icount);
msecs = WT_TIMEDIFF_MS(stop, start);
/*
@@ -1572,9 +1532,9 @@ execute_populate(CONFIG *cfg)
print_ops_sec = 0;
} else {
print_secs = (double)msecs / (double)MSEC_PER_SEC;
- print_ops_sec = (uint64_t)(cfg->icount / print_secs);
+ print_ops_sec = (uint64_t)(opts->icount / print_secs);
}
- lprintf(cfg, 0, 1,
+ lprintf(wtperf, 0, 1,
"Load time: %.2f\n" "load ops/sec: %" PRIu64,
print_secs, print_ops_sec);
@@ -1583,58 +1543,57 @@ execute_populate(CONFIG *cfg)
* set an unlimited timeout because if we close the connection
* then any in-progress compact/merge is aborted.
*/
- if (cfg->compact) {
- assert(cfg->async_threads > 0);
- lprintf(cfg, 0, 1, "Compact after populate");
- if ((ret = __wt_epoch(NULL, &start)) != 0) {
- lprintf(cfg, ret, 0, "Get time failed in populate.");
- return (ret);
- }
- tables = cfg->table_count;
- for (i = 0; i < cfg->table_count; i++) {
+ if (opts->compact) {
+ assert(opts->async_threads > 0);
+ lprintf(wtperf, 0, 1, "Compact after populate");
+ __wt_epoch(NULL, &start);
+ tables = opts->table_count;
+ for (i = 0; i < opts->table_count; i++) {
/*
* If no ops are available, retry. Any other error,
* return.
*/
- while ((ret = cfg->conn->async_new_op(cfg->conn,
- cfg->uris[i], "timeout=0", &cb, &asyncop)) == EBUSY)
+ while ((ret = wtperf->conn->async_new_op(
+ wtperf->conn, wtperf->uris[i],
+ "timeout=0", &cb, &asyncop)) == EBUSY)
(void)usleep(10000);
if (ret != 0)
return (ret);
asyncop->app_private = &tables;
if ((ret = asyncop->compact(asyncop)) != 0) {
- lprintf(cfg, ret, 0, "Async compact failed.");
+ lprintf(wtperf,
+ ret, 0, "Async compact failed.");
return (ret);
}
}
- if ((ret = cfg->conn->async_flush(cfg->conn)) != 0) {
- lprintf(cfg, ret, 0, "Populate async flush failed.");
- return (ret);
- }
- if ((ret = __wt_epoch(NULL, &stop)) != 0) {
- lprintf(cfg, ret, 0, "Get time failed in populate.");
+ if ((ret = wtperf->conn->async_flush(wtperf->conn)) != 0) {
+ lprintf(wtperf, ret, 0, "Populate async flush failed.");
return (ret);
}
- lprintf(cfg, 0, 1,
+ __wt_epoch(NULL, &stop);
+ lprintf(wtperf, 0, 1,
"Compact completed in %" PRIu64 " seconds",
(uint64_t)(WT_TIMEDIFF_SEC(stop, start)));
assert(tables == 0);
}
/* Stop cycling idle tables. */
- if ((ret = stop_idle_table_cycle(cfg, idle_table_cycle_thread)) != 0)
+ if ((ret = stop_idle_table_cycle(wtperf, idle_table_cycle_thread)) != 0)
return (ret);
return (0);
}
static int
-close_reopen(CONFIG *cfg)
+close_reopen(WTPERF *wtperf)
{
+ CONFIG_OPTS *opts;
int ret;
- if (!cfg->readonly && !cfg->reopen_connection)
+ opts = wtperf->opts;
+
+ if (!opts->readonly && !opts->reopen_connection)
return (0);
/*
* Reopen the connection. We do this so that the workload phase always
@@ -1642,16 +1601,16 @@ close_reopen(CONFIG *cfg)
* be identified. This is particularly important for LSM, where the
* merge algorithm is more aggressive for read-only trees.
*/
- /* cfg->conn is released no matter the return value from close(). */
- ret = cfg->conn->close(cfg->conn, NULL);
- cfg->conn = NULL;
+ /* wtperf->conn is released no matter the return value from close(). */
+ ret = wtperf->conn->close(wtperf->conn, NULL);
+ wtperf->conn = NULL;
if (ret != 0) {
- lprintf(cfg, ret, 0, "Closing the connection failed");
+ lprintf(wtperf, ret, 0, "Closing the connection failed");
return (ret);
}
if ((ret = wiredtiger_open(
- cfg->home, NULL, cfg->reopen_config, &cfg->conn)) != 0) {
- lprintf(cfg, ret, 0, "Re-opening the connection failed");
+ wtperf->home, NULL, wtperf->reopen_config, &wtperf->conn)) != 0) {
+ lprintf(wtperf, ret, 0, "Re-opening the connection failed");
return (ret);
}
/*
@@ -1660,10 +1619,10 @@ close_reopen(CONFIG *cfg)
* threads looking for work that will never arrive don't affect
* performance.
*/
- if (cfg->compact && cfg->use_asyncops == 0) {
- if ((ret = cfg->conn->reconfigure(
- cfg->conn, "async=(enabled=false)")) != 0) {
- lprintf(cfg, ret, 0, "Reconfigure async off failed");
+ if (opts->compact && !wtperf->use_asyncops) {
+ if ((ret = wtperf->conn->reconfigure(
+ wtperf->conn, "async=(enabled=false)")) != 0) {
+ lprintf(wtperf, ret, 0, "Reconfigure async off failed");
return (ret);
}
}
@@ -1671,10 +1630,11 @@ close_reopen(CONFIG *cfg)
}
static int
-execute_workload(CONFIG *cfg)
+execute_workload(WTPERF *wtperf)
{
- CONFIG_THREAD *threads;
+ CONFIG_OPTS *opts;
WORKLOAD *workp;
+ WTPERF_THREAD *threads;
WT_CONNECTION *conn;
WT_SESSION **sessions;
pthread_t idle_table_cycle_thread;
@@ -1685,9 +1645,11 @@ execute_workload(CONFIG *cfg)
int ret, t_ret;
void *(*pfunc)(void *);
- cfg->insert_key = 0;
- cfg->insert_ops = cfg->read_ops = cfg->truncate_ops = 0;
- cfg->update_ops = 0;
+ opts = wtperf->opts;
+
+ wtperf->insert_key = 0;
+ wtperf->insert_ops = wtperf->read_ops = wtperf->truncate_ops = 0;
+ wtperf->update_ops = 0;
last_ckpts = last_inserts = last_reads = last_truncates = 0;
last_updates = 0;
@@ -1696,38 +1658,40 @@ execute_workload(CONFIG *cfg)
sessions = NULL;
/* Start cycling idle tables. */
- if ((ret = start_idle_table_cycle(cfg, &idle_table_cycle_thread)) != 0)
+ if ((ret =
+ start_idle_table_cycle(wtperf, &idle_table_cycle_thread)) != 0)
return (ret);
- if (cfg->warmup != 0)
- cfg->in_warmup = 1;
+ if (opts->warmup != 0)
+ wtperf->in_warmup = true;
/* Allocate memory for the worker threads. */
- cfg->workers = dcalloc((size_t)cfg->workers_cnt, sizeof(CONFIG_THREAD));
+ wtperf->workers =
+ dcalloc((size_t)wtperf->workers_cnt, sizeof(WTPERF_THREAD));
- if (cfg->use_asyncops > 0) {
- lprintf(cfg, 0, 1, "Starting %" PRIu32 " async thread(s)",
- cfg->async_threads);
+ if (wtperf->use_asyncops) {
+ lprintf(wtperf, 0, 1, "Starting %" PRIu32 " async thread(s)",
+ opts->async_threads);
pfunc = worker_async;
} else
pfunc = worker;
- if (cfg->session_count_idle != 0) {
- sessions = dcalloc((size_t)cfg->session_count_idle,
+ if (opts->session_count_idle != 0) {
+ sessions = dcalloc((size_t)opts->session_count_idle,
sizeof(WT_SESSION *));
- conn = cfg->conn;
- for (i = 0; i < cfg->session_count_idle; ++i)
- if ((ret = conn->open_session(
- conn, NULL, cfg->sess_config, &sessions[i])) != 0) {
- lprintf(cfg, ret, 0,
+ conn = wtperf->conn;
+ for (i = 0; i < opts->session_count_idle; ++i)
+ if ((ret = conn->open_session(conn,
+ NULL, opts->sess_config, &sessions[i])) != 0) {
+ lprintf(wtperf, ret, 0,
"execute_workload: idle open_session");
goto err;
}
}
/* Start each workload. */
- for (threads = cfg->workers, i = 0,
- workp = cfg->workload; i < cfg->workload_cnt; ++i, ++workp) {
- lprintf(cfg, 0, 1,
+ for (threads = wtperf->workers, i = 0,
+ workp = wtperf->workload; i < wtperf->workload_cnt; ++i, ++workp) {
+ lprintf(wtperf, 0, 1,
"Starting workload #%u: %" PRId64 " threads, inserts=%"
PRId64 ", reads=%" PRId64 ", updates=%" PRId64
", truncate=%" PRId64 ", throttle=%" PRId64,
@@ -1736,25 +1700,26 @@ execute_workload(CONFIG *cfg)
workp->throttle);
/* Figure out the workload's schedule. */
- if ((ret = run_mix_schedule(cfg, workp)) != 0)
+ if ((ret = run_mix_schedule(wtperf, workp)) != 0)
goto err;
/* Start the workload's threads. */
if ((ret = start_threads(
- cfg, workp, threads, (u_int)workp->threads, pfunc)) != 0)
+ wtperf, workp, threads, (u_int)workp->threads, pfunc)) != 0)
goto err;
threads += workp->threads;
}
- if (cfg->warmup != 0) {
- lprintf(cfg, 0, 1,
- "Waiting for warmup duration of %" PRIu32, cfg->warmup);
- sleep(cfg->warmup);
- cfg->in_warmup = 0;
+ if (opts->warmup != 0) {
+ lprintf(wtperf, 0, 1,
+ "Waiting for warmup duration of %" PRIu32, opts->warmup);
+ sleep(opts->warmup);
+ wtperf->in_warmup = false;
}
- for (interval = cfg->report_interval, run_time = cfg->run_time,
- run_ops = cfg->run_ops; cfg->error == 0;) {
+ for (interval = opts->report_interval,
+ run_time = opts->run_time, run_ops = opts->run_ops;
+ !wtperf->error;) {
/*
* Sleep for one second at a time.
* If we are tracking run time, check to see if we're done, and
@@ -1769,59 +1734,60 @@ execute_workload(CONFIG *cfg)
}
/* Sum the operations we've done. */
- cfg->ckpt_ops = sum_ckpt_ops(cfg);
- cfg->insert_ops = sum_insert_ops(cfg);
- cfg->read_ops = sum_read_ops(cfg);
- cfg->update_ops = sum_update_ops(cfg);
- cfg->truncate_ops = sum_truncate_ops(cfg);
+ wtperf->ckpt_ops = sum_ckpt_ops(wtperf);
+ wtperf->insert_ops = sum_insert_ops(wtperf);
+ wtperf->read_ops = sum_read_ops(wtperf);
+ wtperf->update_ops = sum_update_ops(wtperf);
+ wtperf->truncate_ops = sum_truncate_ops(wtperf);
/* If we're checking total operations, see if we're done. */
if (run_ops != 0 && run_ops <=
- cfg->insert_ops + cfg->read_ops + cfg->update_ops)
+ wtperf->insert_ops + wtperf->read_ops + wtperf->update_ops)
break;
/* If writing out throughput information, see if it's time. */
if (interval == 0 || --interval > 0)
continue;
- interval = cfg->report_interval;
- cfg->totalsec += cfg->report_interval;
+ interval = opts->report_interval;
+ wtperf->totalsec += opts->report_interval;
- lprintf(cfg, 0, 1,
+ lprintf(wtperf, 0, 1,
"%" PRIu64 " reads, %" PRIu64 " inserts, %" PRIu64
" updates, %" PRIu64 " truncates, %" PRIu64
" checkpoints in %" PRIu32 " secs (%" PRIu32 " total secs)",
- cfg->read_ops - last_reads,
- cfg->insert_ops - last_inserts,
- cfg->update_ops - last_updates,
- cfg->truncate_ops - last_truncates,
- cfg->ckpt_ops - last_ckpts,
- cfg->report_interval, cfg->totalsec);
- last_reads = cfg->read_ops;
- last_inserts = cfg->insert_ops;
- last_updates = cfg->update_ops;
- last_truncates = cfg->truncate_ops;
- last_ckpts = cfg->ckpt_ops;
+ wtperf->read_ops - last_reads,
+ wtperf->insert_ops - last_inserts,
+ wtperf->update_ops - last_updates,
+ wtperf->truncate_ops - last_truncates,
+ wtperf->ckpt_ops - last_ckpts,
+ opts->report_interval, wtperf->totalsec);
+ last_reads = wtperf->read_ops;
+ last_inserts = wtperf->insert_ops;
+ last_updates = wtperf->update_ops;
+ last_truncates = wtperf->truncate_ops;
+ last_ckpts = wtperf->ckpt_ops;
}
/* Notify the worker threads they are done. */
-err: cfg->stop = 1;
+err: wtperf->stop = true;
/* Stop cycling idle tables. */
- if ((ret = stop_idle_table_cycle(cfg, idle_table_cycle_thread)) != 0)
+ if ((ret = stop_idle_table_cycle(wtperf, idle_table_cycle_thread)) != 0)
return (ret);
- if ((t_ret = stop_threads(
- cfg, (u_int)cfg->workers_cnt, cfg->workers)) != 0 && ret == 0)
+ if ((t_ret = stop_threads(wtperf,
+ (u_int)wtperf->workers_cnt, wtperf->workers)) != 0 && ret == 0)
ret = t_ret;
/* Drop tables if configured to and this isn't an error path */
- if (ret == 0 && cfg->drop_tables && (ret = drop_all_tables(cfg)) != 0)
- lprintf(cfg, ret, 0, "Drop tables failed.");
+ if (ret == 0 &&
+ opts->drop_tables && (ret = drop_all_tables(wtperf)) != 0)
+ lprintf(wtperf, ret, 0, "Drop tables failed.");
free(sessions);
/* Report if any worker threads didn't finish. */
- if (cfg->error != 0) {
- lprintf(cfg, WT_ERROR, 0,
+ if (wtperf->error) {
+ lprintf(wtperf, WT_ERROR, 0,
"Worker thread(s) exited without finishing.");
if (ret == 0)
ret = WT_ERROR;
@@ -1834,8 +1800,9 @@ err: cfg->stop = 1;
* existing table.
*/
static int
-find_table_count(CONFIG *cfg)
+find_table_count(WTPERF *wtperf)
{
+ CONFIG_OPTS *opts;
WT_CONNECTION *conn;
WT_CURSOR *cursor;
WT_SESSION *session;
@@ -1843,29 +1810,30 @@ find_table_count(CONFIG *cfg)
int ret, t_ret;
char *key;
- conn = cfg->conn;
+ opts = wtperf->opts;
+ conn = wtperf->conn;
max_icount = 0;
if ((ret = conn->open_session(
- conn, NULL, cfg->sess_config, &session)) != 0) {
- lprintf(cfg, ret, 0,
+ conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0,
"find_table_count: open_session failed");
goto out;
}
- for (i = 0; i < cfg->table_count; i++) {
- if ((ret = session->open_cursor(session, cfg->uris[i],
+ for (i = 0; i < opts->table_count; i++) {
+ if ((ret = session->open_cursor(session, wtperf->uris[i],
NULL, NULL, &cursor)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"find_table_count: open_cursor failed");
goto err;
}
if ((ret = cursor->prev(cursor)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"find_table_count: cursor prev failed");
goto err;
}
if ((ret = cursor->get_key(cursor, &key)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"find_table_count: cursor get_key failed");
goto err;
}
@@ -1874,7 +1842,7 @@ find_table_count(CONFIG *cfg)
max_icount = table_icount;
if ((ret = cursor->close(cursor)) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"find_table_count: cursor close failed");
goto err;
}
@@ -1882,91 +1850,99 @@ find_table_count(CONFIG *cfg)
err: if ((t_ret = session->close(session, NULL)) != 0) {
if (ret == 0)
ret = t_ret;
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"find_table_count: session close failed");
}
- cfg->icount = max_icount;
+ opts->icount = max_icount;
out: return (ret);
}
/*
- * Populate the uri array if more than one table is being used.
+ * Populate the uri array.
*/
static void
-create_uris(CONFIG *cfg)
+create_uris(WTPERF *wtperf)
{
- size_t base_uri_len;
+ CONFIG_OPTS *opts;
+ size_t len;
uint32_t i;
- char *uri;
- base_uri_len = strlen(cfg->base_uri);
- cfg->uris = dcalloc(cfg->table_count, sizeof(char *));
- for (i = 0; i < cfg->table_count; i++) {
- uri = cfg->uris[i] = dcalloc(base_uri_len + 6, 1);
- /*
- * If there is only one table, just use base name.
- */
- if (cfg->table_count == 1)
- memcpy(uri, cfg->base_uri, base_uri_len);
+ opts = wtperf->opts;
+
+ wtperf->uris = dcalloc(opts->table_count, sizeof(char *));
+ len = strlen("table:") + strlen(opts->table_name) + 20;
+ for (i = 0; i < opts->table_count; i++) {
+ /* If there is only one table, just use the base name. */
+ wtperf->uris[i] = dmalloc(len);
+ if (opts->table_count == 1)
+ snprintf(wtperf->uris[i],
+ len, "table:%s", opts->table_name);
else
- sprintf(uri, "%s%05d", cfg->base_uri, i);
+ snprintf(wtperf->uris[i],
+ len, "table:%s%05d", opts->table_name, i);
}
/* Create the log-like-table URI. */
- cfg->log_table_uri = dcalloc(base_uri_len + 11, 1);
- sprintf(cfg->log_table_uri, "%s_log_table", cfg->base_uri);
+ len = strlen("table:") +
+ strlen(opts->table_name) + strlen("_log_table") + 1;
+ wtperf->log_table_uri = dmalloc(len);
+ snprintf(
+ wtperf->log_table_uri, len, "table:%s_log_table", opts->table_name);
}
static int
-create_tables(CONFIG *cfg)
+create_tables(WTPERF *wtperf)
{
+ CONFIG_OPTS *opts;
WT_SESSION *session;
size_t i;
int ret;
char buf[512];
- if ((ret = cfg->conn->open_session(
- cfg->conn, NULL, cfg->sess_config, &session)) != 0) {
- lprintf(cfg, ret, 0,
- "Error opening a session on %s", cfg->home);
+ opts = wtperf->opts;
+
+ if ((ret = wtperf->conn->open_session(
+ wtperf->conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0,
+ "Error opening a session on %s", wtperf->home);
return (ret);
}
- for (i = 0; i < cfg->table_count_idle; i++) {
- snprintf(buf, 512, "%s_idle%05d", cfg->uris[0], (int)i);
+ for (i = 0; i < opts->table_count_idle; i++) {
+ snprintf(buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i);
if ((ret = session->create(
- session, buf, cfg->table_config)) != 0) {
- lprintf(cfg, ret, 0,
+ session, buf, opts->table_config)) != 0) {
+ lprintf(wtperf, ret, 0,
"Error creating idle table %s", buf);
return (ret);
}
}
- if (cfg->log_like_table && (ret = session->create(session,
- cfg->log_table_uri, "key_format=Q,value_format=S")) != 0) {
- lprintf(cfg, ret, 0, "Error creating log table %s", buf);
+ if (opts->log_like_table && (ret = session->create(session,
+ wtperf->log_table_uri, "key_format=Q,value_format=S")) != 0) {
+ lprintf(wtperf, ret, 0, "Error creating log table %s", buf);
return (ret);
}
- for (i = 0; i < cfg->table_count; i++) {
- if (cfg->log_partial && i > 0) {
+ for (i = 0; i < opts->table_count; i++) {
+ if (opts->log_partial && i > 0) {
if (((ret = session->create(session,
- cfg->uris[i], cfg->partial_config)) != 0)) {
- lprintf(cfg, ret, 0,
- "Error creating table %s", cfg->uris[i]);
+ wtperf->uris[i], wtperf->partial_config)) != 0)) {
+ lprintf(wtperf, ret, 0,
+ "Error creating table %s", wtperf->uris[i]);
return (ret);
}
} else if ((ret = session->create(
- session, cfg->uris[i], cfg->table_config)) != 0) {
- lprintf(cfg, ret, 0,
- "Error creating table %s", cfg->uris[i]);
+ session, wtperf->uris[i], opts->table_config)) != 0) {
+ lprintf(wtperf, ret, 0,
+ "Error creating table %s", wtperf->uris[i]);
return (ret);
}
- if (cfg->index) {
+ if (opts->index) {
snprintf(buf, 512, "index:%s:val_idx",
- cfg->uris[i] + strlen("table:"));
+ wtperf->uris[i] + strlen("table:"));
if ((ret = session->create(
session, buf, "columns=(val)")) != 0) {
- lprintf(cfg, ret, 0,
+ lprintf(wtperf, ret, 0,
"Error creating index %s", buf);
return (ret);
}
@@ -1974,76 +1950,208 @@ create_tables(CONFIG *cfg)
}
if ((ret = session->close(session, NULL)) != 0) {
- lprintf(cfg, ret, 0, "Error closing session");
+ lprintf(wtperf, ret, 0, "Error closing session");
return (ret);
}
return (0);
}
+/*
+ * wtperf_copy --
+ * Create a new WTPERF structure as a duplicate of a previous one.
+ */
+static void
+wtperf_copy(const WTPERF *src, WTPERF **retp)
+{
+ CONFIG_OPTS *opts;
+ WTPERF *dest;
+ size_t i;
+
+ opts = src->opts;
+
+ dest = dcalloc(1, sizeof(WTPERF));
+
+ /*
+ * Don't copy the home and monitor directories, they are filled in by
+ * our caller, explicitly.
+ */
+
+ if (src->partial_config != NULL)
+ dest->partial_config = dstrdup(src->partial_config);
+ if (src->reopen_config != NULL)
+ dest->reopen_config = dstrdup(src->reopen_config);
+
+ if (src->uris != NULL) {
+ dest->uris = dcalloc(opts->table_count, sizeof(char *));
+ for (i = 0; i < opts->table_count; i++)
+ dest->uris[i] = dstrdup(src->uris[i]);
+ }
+
+ if (src->async_config != NULL)
+ dest->async_config = dstrdup(src->async_config);
+
+ dest->ckptthreads = NULL;
+ dest->popthreads = NULL;
+
+ dest->workers = NULL;
+ dest->workers_cnt = src->workers_cnt;
+ if (src->workload_cnt != 0) {
+ dest->workload_cnt = src->workload_cnt;
+ dest->workload = dcalloc(src->workload_cnt, sizeof(WORKLOAD));
+ memcpy(dest->workload,
+ src->workload, src->workload_cnt * sizeof(WORKLOAD));
+ }
+
+ TAILQ_INIT(&dest->stone_head);
+
+ dest->opts = src->opts;
+
+ *retp = dest;
+}
+
+/*
+ * wtperf_free --
+ * Free any storage allocated in the WTPERF structure.
+ */
+static void
+wtperf_free(WTPERF *wtperf)
+{
+ CONFIG_OPTS *opts;
+ size_t i;
+
+ opts = wtperf->opts;
+
+ free(wtperf->home);
+ free(wtperf->monitor_dir);
+ free(wtperf->partial_config);
+ free(wtperf->reopen_config);
+ free(wtperf->log_table_uri);
+
+ if (wtperf->uris != NULL) {
+ for (i = 0; i < opts->table_count; i++)
+ free(wtperf->uris[i]);
+ free(wtperf->uris);
+ }
+
+ free(wtperf->async_config);
+
+ free(wtperf->ckptthreads);
+ free(wtperf->popthreads);
+
+ free(wtperf->workers);
+ free(wtperf->workload);
+
+ cleanup_truncate_config(wtperf);
+}
+
+/*
+ * config_compress --
+ * Parse the compression configuration.
+ */
+static int
+config_compress(WTPERF *wtperf)
+{
+ CONFIG_OPTS *opts;
+ int ret;
+ const char *s;
+
+ opts = wtperf->opts;
+ ret = 0;
+
+ s = opts->compression;
+ if (strcmp(s, "none") == 0) {
+ wtperf->compress_ext = NULL;
+ wtperf->compress_table = NULL;
+ } else if (strcmp(s, "lz4") == 0) {
+#ifndef HAVE_BUILTIN_EXTENSION_LZ4
+ wtperf->compress_ext = LZ4_EXT;
+#endif
+ wtperf->compress_table = LZ4_BLK;
+ } else if (strcmp(s, "snappy") == 0) {
+#ifndef HAVE_BUILTIN_EXTENSION_SNAPPY
+ wtperf->compress_ext = SNAPPY_EXT;
+#endif
+ wtperf->compress_table = SNAPPY_BLK;
+ } else if (strcmp(s, "zlib") == 0) {
+#ifndef HAVE_BUILTIN_EXTENSION_ZLIB
+ wtperf->compress_ext = ZLIB_EXT;
+#endif
+ wtperf->compress_table = ZLIB_BLK;
+ } else {
+ fprintf(stderr,
+ "invalid compression configuration: %s\n", s);
+ ret = EINVAL;
+ }
+ return (ret);
+
+}
+
static int
-start_all_runs(CONFIG *cfg)
+start_all_runs(WTPERF *wtperf)
{
- CONFIG *next_cfg, **configs;
+ CONFIG_OPTS *opts;
+ WTPERF *next_wtperf, **wtperfs;
pthread_t *threads;
- size_t home_len, i;
+ size_t i, len;
int ret, t_ret;
- char *new_home;
+ opts = wtperf->opts;
+ wtperfs = NULL;
ret = 0;
- configs = NULL;
- if (cfg->database_count == 1)
- return (start_run(cfg));
+ if (opts->database_count == 1)
+ return (start_run(wtperf));
- /* Allocate an array to hold our config struct copies. */
- configs = dcalloc(cfg->database_count, sizeof(CONFIG *));
+ /* Allocate an array to hold our WTPERF copies. */
+ wtperfs = dcalloc(opts->database_count, sizeof(WTPERF *));
/* Allocate an array to hold our thread IDs. */
- threads = dcalloc(cfg->database_count, sizeof(pthread_t));
-
- home_len = strlen(cfg->home);
- for (i = 0; i < cfg->database_count; i++) {
- next_cfg = dcalloc(1, sizeof(CONFIG));
- configs[i] = next_cfg;
- config_copy(next_cfg, cfg);
-
- /* Setup a unique home directory for each database. */
- new_home = dmalloc(home_len + 5);
- snprintf(new_home, home_len + 5, "%s/D%02d", cfg->home, (int)i);
- free(next_cfg->home);
- next_cfg->home = new_home;
-
- /* If the monitor dir is default, update it too. */
- if (strcmp(cfg->monitor_dir, cfg->home) == 0) {
- free(next_cfg->monitor_dir);
- next_cfg->monitor_dir = dstrdup(new_home);
- }
+ threads = dcalloc(opts->database_count, sizeof(pthread_t));
- /* If creating the sub-database, recreate its home */
- if (cfg->create != 0)
- recreate_dir(next_cfg->home);
+ for (i = 0; i < opts->database_count; i++) {
+ wtperf_copy(wtperf, &next_wtperf);
+ wtperfs[i] = next_wtperf;
+
+ /*
+ * Set up unique home/monitor directories for each database.
+ * Re-create the directories if creating the databases.
+ */
+ len = strlen(wtperf->home) + 5;
+ next_wtperf->home = dmalloc(len);
+ snprintf(
+ next_wtperf->home, len, "%s/D%02d", wtperf->home, (int)i);
+ if (opts->create != 0)
+ recreate_dir(next_wtperf->home);
+
+ len = strlen(wtperf->monitor_dir) + 5;
+ next_wtperf->monitor_dir = dmalloc(len);
+ snprintf(next_wtperf->monitor_dir,
+ len, "%s/D%02d", wtperf->monitor_dir, (int)i);
+ if (opts->create != 0 &&
+ strcmp(next_wtperf->home, next_wtperf->monitor_dir) != 0)
+ recreate_dir(next_wtperf->monitor_dir);
if ((ret = pthread_create(
- &threads[i], NULL, thread_run_wtperf, next_cfg)) != 0) {
- lprintf(cfg, ret, 0, "Error creating thread");
+ &threads[i], NULL, thread_run_wtperf, next_wtperf)) != 0) {
+ lprintf(wtperf, ret, 0, "Error creating thread");
goto err;
}
}
/* Wait for threads to finish. */
- for (i = 0; i < cfg->database_count; i++)
+ for (i = 0; i < opts->database_count; i++)
if ((t_ret = pthread_join(threads[i], NULL)) != 0) {
- lprintf(cfg, ret, 0, "Error joining thread");
+ lprintf(wtperf, ret, 0, "Error joining thread");
if (ret == 0)
ret = t_ret;
}
-err: for (i = 0; i < cfg->database_count && configs[i] != NULL; i++) {
- config_free(configs[i]);
- free(configs[i]);
+err: for (i = 0; i < opts->database_count && wtperfs[i] != NULL; i++) {
+ wtperf_free(wtperfs[i]);
+ free(wtperfs[i]);
}
- free(configs);
+ free(wtperfs);
free(threads);
return (ret);
@@ -2053,120 +2161,124 @@ err: for (i = 0; i < cfg->database_count && configs[i] != NULL; i++) {
static void *
thread_run_wtperf(void *arg)
{
- CONFIG *cfg;
+ WTPERF *wtperf;
int ret;
- cfg = (CONFIG *)arg;
- if ((ret = start_run(cfg)) != 0)
- lprintf(cfg, ret, 0, "Run failed for: %s.", cfg->home);
+ wtperf = (WTPERF *)arg;
+ if ((ret = start_run(wtperf)) != 0)
+ lprintf(wtperf, ret, 0, "Run failed for: %s.", wtperf->home);
return (NULL);
}
static int
-start_run(CONFIG *cfg)
+start_run(WTPERF *wtperf)
{
+ CONFIG_OPTS *opts;
pthread_t monitor_thread;
uint64_t total_ops;
uint32_t run_time;
int monitor_created, ret, t_ret;
+ opts = wtperf->opts;
monitor_created = ret = 0;
/* [-Wconditional-uninitialized] */
memset(&monitor_thread, 0, sizeof(monitor_thread));
- if ((ret = setup_log_file(cfg)) != 0)
+ if ((ret = setup_log_file(wtperf)) != 0)
goto err;
if ((ret = wiredtiger_open( /* Open the real connection. */
- cfg->home, NULL, cfg->conn_config, &cfg->conn)) != 0) {
- lprintf(cfg, ret, 0, "Error connecting to %s", cfg->home);
+ wtperf->home, NULL, opts->conn_config, &wtperf->conn)) != 0) {
+ lprintf(wtperf, ret, 0, "Error connecting to %s", wtperf->home);
goto err;
}
- create_uris(cfg);
+ create_uris(wtperf);
/* If creating, create the tables. */
- if (cfg->create != 0 && (ret = create_tables(cfg)) != 0)
+ if (opts->create != 0 && (ret = create_tables(wtperf)) != 0)
goto err;
/* Start the monitor thread. */
- if (cfg->sample_interval != 0) {
+ if (opts->sample_interval != 0) {
if ((ret = pthread_create(
- &monitor_thread, NULL, monitor, cfg)) != 0) {
- lprintf(
- cfg, ret, 0, "Error creating monitor thread.");
+ &monitor_thread, NULL, monitor, wtperf)) != 0) {
+ lprintf(wtperf,
+ ret, 0, "Error creating monitor thread.");
goto err;
}
monitor_created = 1;
}
/* If creating, populate the table. */
- if (cfg->create != 0 && execute_populate(cfg) != 0)
+ if (opts->create != 0 && execute_populate(wtperf) != 0)
goto err;
/* Optional workload. */
- if (cfg->workers_cnt != 0 &&
- (cfg->run_time != 0 || cfg->run_ops != 0)) {
+ if (wtperf->workers_cnt != 0 &&
+ (opts->run_time != 0 || opts->run_ops != 0)) {
/*
* If we have a workload, close and reopen the connection so
* that LSM can detect read-only workloads.
*/
- if (close_reopen(cfg) != 0)
+ if (close_reopen(wtperf) != 0)
goto err;
/* Didn't create, set insert count. */
- if (cfg->create == 0 && cfg->random_range == 0 &&
- find_table_count(cfg) != 0)
+ if (opts->create == 0 &&
+ opts->random_range == 0 && find_table_count(wtperf) != 0)
goto err;
/* Start the checkpoint thread. */
- if (cfg->checkpoint_threads != 0) {
- lprintf(cfg, 0, 1,
+ if (opts->checkpoint_threads != 0) {
+ lprintf(wtperf, 0, 1,
"Starting %" PRIu32 " checkpoint thread(s)",
- cfg->checkpoint_threads);
- cfg->ckptthreads = dcalloc(
- cfg->checkpoint_threads, sizeof(CONFIG_THREAD));
- if (start_threads(cfg, NULL, cfg->ckptthreads,
- cfg->checkpoint_threads, checkpoint_worker) != 0)
+ opts->checkpoint_threads);
+ wtperf->ckptthreads = dcalloc(
+ opts->checkpoint_threads, sizeof(WTPERF_THREAD));
+ if (start_threads(wtperf, NULL, wtperf->ckptthreads,
+ opts->checkpoint_threads, checkpoint_worker) != 0)
goto err;
}
/* Execute the workload. */
- if ((ret = execute_workload(cfg)) != 0)
+ if ((ret = execute_workload(wtperf)) != 0)
goto err;
/* One final summation of the operations we've completed. */
- cfg->read_ops = sum_read_ops(cfg);
- cfg->insert_ops = sum_insert_ops(cfg);
- cfg->truncate_ops = sum_truncate_ops(cfg);
- cfg->update_ops = sum_update_ops(cfg);
- cfg->ckpt_ops = sum_ckpt_ops(cfg);
- total_ops = cfg->read_ops + cfg->insert_ops + cfg->update_ops;
-
- run_time = cfg->run_time == 0 ? 1 : cfg->run_time;
- lprintf(cfg, 0, 1,
+ wtperf->read_ops = sum_read_ops(wtperf);
+ wtperf->insert_ops = sum_insert_ops(wtperf);
+ wtperf->truncate_ops = sum_truncate_ops(wtperf);
+ wtperf->update_ops = sum_update_ops(wtperf);
+ wtperf->ckpt_ops = sum_ckpt_ops(wtperf);
+ total_ops =
+ wtperf->read_ops + wtperf->insert_ops + wtperf->update_ops;
+
+ run_time = opts->run_time == 0 ? 1 : opts->run_time;
+ lprintf(wtperf, 0, 1,
"Executed %" PRIu64 " read operations (%" PRIu64
"%%) %" PRIu64 " ops/sec",
- cfg->read_ops, (cfg->read_ops * 100) / total_ops,
- cfg->read_ops / run_time);
- lprintf(cfg, 0, 1,
+ wtperf->read_ops, (wtperf->read_ops * 100) / total_ops,
+ wtperf->read_ops / run_time);
+ lprintf(wtperf, 0, 1,
"Executed %" PRIu64 " insert operations (%" PRIu64
"%%) %" PRIu64 " ops/sec",
- cfg->insert_ops, (cfg->insert_ops * 100) / total_ops,
- cfg->insert_ops / run_time);
- lprintf(cfg, 0, 1,
+ wtperf->insert_ops, (wtperf->insert_ops * 100) / total_ops,
+ wtperf->insert_ops / run_time);
+ lprintf(wtperf, 0, 1,
"Executed %" PRIu64 " truncate operations (%" PRIu64
"%%) %" PRIu64 " ops/sec",
- cfg->truncate_ops, (cfg->truncate_ops * 100) / total_ops,
- cfg->truncate_ops / run_time);
- lprintf(cfg, 0, 1,
+ wtperf->truncate_ops,
+ (wtperf->truncate_ops * 100) / total_ops,
+ wtperf->truncate_ops / run_time);
+ lprintf(wtperf, 0, 1,
"Executed %" PRIu64 " update operations (%" PRIu64
"%%) %" PRIu64 " ops/sec",
- cfg->update_ops, (cfg->update_ops * 100) / total_ops,
- cfg->update_ops / run_time);
- lprintf(cfg, 0, 1,
+ wtperf->update_ops, (wtperf->update_ops * 100) / total_ops,
+ wtperf->update_ops / run_time);
+ lprintf(wtperf, 0, 1,
"Executed %" PRIu64 " checkpoint operations",
- cfg->ckpt_ops);
+ wtperf->ckpt_ops);
- latency_print(cfg);
+ latency_print(wtperf);
}
if (0) {
@@ -2175,40 +2287,41 @@ err: if (ret == 0)
}
/* Notify the worker threads they are done. */
- cfg->stop = 1;
+ wtperf->stop = true;
- if ((t_ret = stop_threads(cfg, 1, cfg->ckptthreads)) != 0)
+ if ((t_ret = stop_threads(wtperf, 1, wtperf->ckptthreads)) != 0)
if (ret == 0)
ret = t_ret;
if (monitor_created != 0 &&
(t_ret = pthread_join(monitor_thread, NULL)) != 0) {
- lprintf(cfg, ret, 0, "Error joining monitor thread.");
+ lprintf(wtperf, ret, 0, "Error joining monitor thread.");
if (ret == 0)
ret = t_ret;
}
- if (cfg->conn != NULL &&
- (t_ret = cfg->conn->close(cfg->conn, NULL)) != 0) {
- lprintf(cfg, t_ret, 0,
- "Error closing connection to %s", cfg->home);
+ if (wtperf->conn != NULL &&
+ (t_ret = wtperf->conn->close(wtperf->conn, NULL)) != 0) {
+ lprintf(wtperf, t_ret, 0,
+ "Error closing connection to %s", wtperf->home);
if (ret == 0)
ret = t_ret;
}
if (ret == 0) {
- if (cfg->run_time == 0 && cfg->run_ops == 0)
- lprintf(cfg, 0, 1, "Run completed");
+ if (opts->run_time == 0 && opts->run_ops == 0)
+ lprintf(wtperf, 0, 1, "Run completed");
else
- lprintf(cfg, 0, 1, "Run completed: %" PRIu32 " %s",
- cfg->run_time == 0 ? cfg->run_ops : cfg->run_time,
- cfg->run_time == 0 ? "operations" : "seconds");
+ lprintf(wtperf, 0, 1, "Run completed: %" PRIu32 " %s",
+ opts->run_time == 0 ?
+ opts->run_ops : opts->run_time,
+ opts->run_time == 0 ? "operations" : "seconds");
}
- if (cfg->logf != NULL) {
- if ((t_ret = fflush(cfg->logf)) != 0 && ret == 0)
+ if (wtperf->logf != NULL) {
+ if ((t_ret = fflush(wtperf->logf)) != 0 && ret == 0)
ret = t_ret;
- if ((t_ret = fclose(cfg->logf)) != 0 && ret == 0)
+ if ((t_ret = fclose(wtperf->logf)) != 0 && ret == 0)
ret = t_ret;
}
return (ret);
@@ -2218,31 +2331,55 @@ extern int __wt_optind, __wt_optreset;
extern char *__wt_optarg;
void (*custom_die)(void) = NULL;
+/*
+ * usage --
+ * wtperf usage print, no error.
+ */
+static void
+usage(void)
+{
+ printf("wtperf [-C config] "
+ "[-H mount] [-h home] [-O file] [-o option] [-T config]\n");
+ printf("\t-C <string> additional connection configuration\n");
+ printf("\t (added to option conn_config)\n");
+ printf("\t-H <mount> configure Helium volume mount point\n");
+ printf("\t-h <string> Wired Tiger home must exist, default WT_TEST\n");
+ printf("\t-O <file> file contains options as listed below\n");
+ printf("\t-o option=val[,option=val,...] set options listed below\n");
+ printf("\t-T <string> additional table configuration\n");
+ printf("\t (added to option table_config)\n");
+ printf("\n");
+ config_opt_usage();
+}
+
int
main(int argc, char *argv[])
{
- CONFIG *cfg, _cfg;
+ CONFIG_OPTS *opts;
+ WTPERF *wtperf, _wtperf;
size_t req_len, sreq_len;
bool monitor_set;
int ch, ret;
- const char *opts = "C:h:m:O:o:T:";
+ const char *cmdflags = "C:h:m:O:o:T:";
const char *config_opts;
- char *cc_buf, *sess_cfg, *tc_buf, *user_cconfig, *user_tconfig;
+ char *cc_buf, *path, *sess_cfg, *tc_buf, *user_cconfig, *user_tconfig;
+
+ /* The first WTPERF structure (from which all others are derived). */
+ wtperf = &_wtperf;
+ memset(wtperf, 0, sizeof(*wtperf));
+ wtperf->home = dstrdup(DEFAULT_HOME);
+ wtperf->monitor_dir = dstrdup(DEFAULT_MONITOR_DIR);
+ TAILQ_INIT(&wtperf->stone_head);
+ config_opt_init(&wtperf->opts);
+ opts = wtperf->opts;
monitor_set = false;
ret = 0;
config_opts = NULL;
cc_buf = sess_cfg = tc_buf = user_cconfig = user_tconfig = NULL;
- /* Setup the default configuration values. */
- cfg = &_cfg;
- memset(cfg, 0, sizeof(*cfg));
- config_copy(cfg, &default_cfg);
- cfg->home = dstrdup(DEFAULT_HOME);
- cfg->monitor_dir = dstrdup(DEFAULT_MONITOR_DIR);
-
/* Do a basic validation of options, and home is needed before open. */
- while ((ch = __wt_getopt("wtperf", argc, argv, opts)) != EOF)
+ while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
switch (ch) {
case 'C':
if (user_cconfig == NULL)
@@ -2256,12 +2393,12 @@ main(int argc, char *argv[])
}
break;
case 'h':
- free(cfg->home);
- cfg->home = dstrdup(__wt_optarg);
+ free(wtperf->home);
+ wtperf->home = dstrdup(__wt_optarg);
break;
case 'm':
- free(cfg->monitor_dir);
- cfg->monitor_dir = dstrdup(__wt_optarg);
+ free(wtperf->monitor_dir);
+ wtperf->monitor_dir = dstrdup(__wt_optarg);
monitor_set = true;
break;
case 'O':
@@ -2288,47 +2425,48 @@ main(int argc, char *argv[])
* monitor directory to the home dir.
*/
if (!monitor_set) {
- free(cfg->monitor_dir);
- cfg->monitor_dir = dstrdup(cfg->home);
+ free(wtperf->monitor_dir);
+ wtperf->monitor_dir = dstrdup(wtperf->home);
}
/* Parse configuration settings from configuration file. */
- if (config_opts != NULL && config_opt_file(cfg, config_opts) != 0)
+ if (config_opts != NULL && config_opt_file(wtperf, config_opts) != 0)
goto einval;
/* Parse options that override values set via a configuration file. */
__wt_optreset = __wt_optind = 1;
- while ((ch = __wt_getopt("wtperf", argc, argv, opts)) != EOF)
+ while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
switch (ch) {
case 'o':
/* Allow -o key=value */
- if (config_opt_line(cfg, __wt_optarg) != 0)
+ if (config_opt_str(wtperf, __wt_optarg) != 0)
goto einval;
break;
}
- if (cfg->populate_threads == 0 && cfg->icount != 0) {
- lprintf(cfg, 1, 0,
+ if (opts->populate_threads == 0 && opts->icount != 0) {
+ lprintf(wtperf, 1, 0,
"Cannot have 0 populate threads when icount is set\n");
goto err;
}
- cfg->async_config = NULL;
+ wtperf->async_config = NULL;
/*
* If the user specified async_threads we use async for all ops.
* If the user wants compaction, then we also enable async for
* the compact operation, but not for the workloads.
*/
- if (cfg->async_threads > 0) {
- if (F_ISSET(cfg, CFG_TRUNCATE)) {
- lprintf(cfg, 1, 0, "Cannot run truncate and async\n");
+ if (opts->async_threads > 0) {
+ if (F_ISSET(wtperf, CFG_TRUNCATE)) {
+ lprintf(wtperf,
+ 1, 0, "Cannot run truncate and async\n");
goto err;
}
- cfg->use_asyncops = 1;
+ wtperf->use_asyncops = true;
}
- if (cfg->compact && cfg->async_threads == 0)
- cfg->async_threads = 2;
- if (cfg->async_threads > 0) {
+ if (opts->compact && opts->async_threads == 0)
+ opts->async_threads = 2;
+ if (opts->async_threads > 0) {
/*
* The maximum number of async threads is two digits, so just
* use that to compute the space we need. Assume the default
@@ -2336,145 +2474,133 @@ main(int argc, char *argv[])
* to 4096 if needed.
*/
req_len = strlen(",async=(enabled=true,threads=)") + 4;
- cfg->async_config = dmalloc(req_len);
- snprintf(cfg->async_config, req_len,
+ wtperf->async_config = dmalloc(req_len);
+ snprintf(wtperf->async_config, req_len,
",async=(enabled=true,threads=%" PRIu32 ")",
- cfg->async_threads);
+ opts->async_threads);
}
- if ((ret = config_compress(cfg)) != 0)
+ if ((ret = config_compress(wtperf)) != 0)
goto err;
/* You can't have truncate on a random collection. */
- if (F_ISSET(cfg, CFG_TRUNCATE) && cfg->random_range) {
- lprintf(cfg, 1, 0, "Cannot run truncate and random_range\n");
+ if (F_ISSET(wtperf, CFG_TRUNCATE) && opts->random_range) {
+ lprintf(wtperf, 1, 0, "Cannot run truncate and random_range\n");
goto err;
}
/* We can't run truncate with more than one table. */
- if (F_ISSET(cfg, CFG_TRUNCATE) && cfg->table_count > 1) {
- lprintf(cfg, 1, 0, "Cannot truncate more than 1 table\n");
+ if (F_ISSET(wtperf, CFG_TRUNCATE) && opts->table_count > 1) {
+ lprintf(wtperf, 1, 0, "Cannot truncate more than 1 table\n");
goto err;
}
- /* Build the URI from the table name. */
- req_len = strlen("table:") + strlen(cfg->table_name) + 2;
- cfg->base_uri = dmalloc(req_len);
- snprintf(cfg->base_uri, req_len, "table:%s", cfg->table_name);
-
/* Make stdout line buffered, so verbose output appears quickly. */
__wt_stream_set_line_buffer(stdout);
/* Concatenate non-default configuration strings. */
- if (cfg->verbose > 1 || user_cconfig != NULL ||
- cfg->session_count_idle > 0 || cfg->compress_ext != NULL ||
- cfg->async_config != NULL) {
- req_len = strlen(debug_cconfig) + 3;
+ if (opts->verbose > 1 || user_cconfig != NULL ||
+ opts->session_count_idle > 0 || wtperf->compress_ext != NULL ||
+ wtperf->async_config != NULL) {
+ req_len = strlen(debug_cconfig) + 20;
if (user_cconfig != NULL)
req_len += strlen(user_cconfig);
- if (cfg->async_config != NULL)
- req_len += strlen(cfg->async_config);
- if (cfg->compress_ext != NULL)
- req_len += strlen(cfg->compress_ext);
- if (cfg->session_count_idle > 0) {
+ if (wtperf->async_config != NULL)
+ req_len += strlen(wtperf->async_config);
+ if (wtperf->compress_ext != NULL)
+ req_len += strlen(wtperf->compress_ext);
+ if (opts->session_count_idle > 0) {
sreq_len = strlen(",session_max=") + 6;
req_len += sreq_len;
sess_cfg = dmalloc(sreq_len);
snprintf(sess_cfg, sreq_len,
",session_max=%" PRIu32,
- cfg->session_count_idle + cfg->workers_cnt +
- cfg->populate_threads + 10);
+ opts->session_count_idle +
+ wtperf->workers_cnt + opts->populate_threads + 10);
}
cc_buf = dmalloc(req_len);
- /*
- * This is getting hard to parse.
- */
- snprintf(cc_buf, req_len, "%s%s%s%s%s%s%s",
- cfg->async_config ? cfg->async_config : "",
- cfg->compress_ext ? cfg->compress_ext : "",
- cfg->verbose > 1 && strlen(debug_cconfig) ? ",": "",
- cfg->verbose > 1 &&
- strlen(debug_cconfig) ? debug_cconfig : "",
- sess_cfg ? sess_cfg : "",
- user_cconfig ? ",": "",
- user_cconfig ? user_cconfig : "");
- if (strlen(cc_buf))
- if ((ret = config_opt_str(
- cfg, "conn_config", cc_buf)) != 0)
- goto err;
+ snprintf(cc_buf, req_len, "%s,%s,%s,%s,%s",
+ wtperf->async_config ? wtperf->async_config : "",
+ wtperf->compress_ext ? wtperf->compress_ext : "",
+ opts->verbose > 1 ? debug_cconfig : "",
+ sess_cfg != NULL ? sess_cfg : "",
+ user_cconfig != NULL ? user_cconfig : "");
+ if (strlen(cc_buf) && (ret =
+ config_opt_name_value(wtperf, "conn_config", cc_buf)) != 0)
+ goto err;
}
- if (cfg->verbose > 1 || cfg->index ||
- user_tconfig != NULL || cfg->compress_table != NULL) {
- req_len = strlen(debug_tconfig) + 3;
+ if (opts->verbose > 1 || opts->index ||
+ user_tconfig != NULL || wtperf->compress_table != NULL) {
+ req_len = strlen(debug_tconfig) + 20;
if (user_tconfig != NULL)
req_len += strlen(user_tconfig);
- if (cfg->compress_table != NULL)
- req_len += strlen(cfg->compress_table);
- if (cfg->index)
+ if (wtperf->compress_table != NULL)
+ req_len += strlen(wtperf->compress_table);
+ if (opts->index)
req_len += strlen(INDEX_COL_NAMES);
tc_buf = dmalloc(req_len);
- /*
- * This is getting hard to parse.
- */
- snprintf(tc_buf, req_len, "%s%s%s%s%s%s",
- cfg->index ? INDEX_COL_NAMES : "",
- cfg->compress_table ? cfg->compress_table : "",
- cfg->verbose > 1 && strlen(debug_tconfig) ? ",": "",
- cfg->verbose > 1 &&
- strlen(debug_tconfig) ? debug_tconfig : "",
- user_tconfig ? ",": "",
+ snprintf(tc_buf, req_len, "%s,%s,%s,%s",
+ opts->index ? INDEX_COL_NAMES : "",
+ wtperf->compress_table != NULL ?
+ wtperf->compress_table : "",
+ opts->verbose > 1 ? debug_tconfig : "",
user_tconfig ? user_tconfig : "");
- if (strlen(tc_buf))
- if ((ret = config_opt_str(
- cfg, "table_config", tc_buf)) != 0)
- goto err;
+ if (strlen(tc_buf) && (ret =
+ config_opt_name_value(wtperf, "table_config", tc_buf)) != 0)
+ goto err;
}
- if (cfg->log_partial && cfg->table_count > 1) {
- req_len = strlen(cfg->table_config) +
+ if (opts->log_partial && opts->table_count > 1) {
+ req_len = strlen(opts->table_config) +
strlen(LOG_PARTIAL_CONFIG) + 1;
- cfg->partial_config = dmalloc(req_len);
- snprintf(cfg->partial_config, req_len, "%s%s",
- cfg->table_config, LOG_PARTIAL_CONFIG);
+ wtperf->partial_config = dmalloc(req_len);
+ snprintf(wtperf->partial_config, req_len, "%s%s",
+ opts->table_config, LOG_PARTIAL_CONFIG);
}
/*
* Set the config for reopen. If readonly add in that string.
* If not readonly then just copy the original conn_config.
*/
- if (cfg->readonly)
- req_len = strlen(cfg->conn_config) +
+ if (opts->readonly)
+ req_len = strlen(opts->conn_config) +
strlen(READONLY_CONFIG) + 1;
else
- req_len = strlen(cfg->conn_config) + 1;
- cfg->reopen_config = dmalloc(req_len);
- if (cfg->readonly)
- snprintf(cfg->reopen_config, req_len, "%s%s",
- cfg->conn_config, READONLY_CONFIG);
+ req_len = strlen(opts->conn_config) + 1;
+ wtperf->reopen_config = dmalloc(req_len);
+ if (opts->readonly)
+ snprintf(wtperf->reopen_config, req_len, "%s%s",
+ opts->conn_config, READONLY_CONFIG);
else
- snprintf(cfg->reopen_config, req_len, "%s",
- cfg->conn_config);
+ snprintf(wtperf->reopen_config,
+ req_len, "%s", opts->conn_config);
/* Sanity-check the configuration. */
- if ((ret = config_sanity(cfg)) != 0)
+ if ((ret = config_sanity(wtperf)) != 0)
goto err;
/* If creating, remove and re-create the home directory. */
- if (cfg->create != 0)
- recreate_dir(cfg->home);
+ if (opts->create != 0)
+ recreate_dir(wtperf->home);
/* Write a copy of the config. */
- config_to_file(cfg);
+ req_len = strlen(wtperf->home) + strlen("/CONFIG.wtperf") + 1;
+ path = dmalloc(req_len);
+ snprintf(path, req_len, "%s/CONFIG.wtperf", wtperf->home);
+ config_opt_log(opts, path);
+ free(path);
/* Display the configuration. */
- if (cfg->verbose > 1)
- config_print(cfg);
+ if (opts->verbose > 1)
+ config_opt_print(wtperf);
- if ((ret = start_all_runs(cfg)) != 0)
+ if ((ret = start_all_runs(wtperf)) != 0)
goto err;
if (0) {
einval: ret = EINVAL;
}
-err: config_free(cfg);
+err: wtperf_free(wtperf);
+ config_opt_cleanup(opts);
+
free(cc_buf);
free(sess_cfg);
free(tc_buf);
@@ -2485,26 +2611,26 @@ err: config_free(cfg);
}
static int
-start_threads(CONFIG *cfg,
- WORKLOAD *workp, CONFIG_THREAD *base, u_int num, void *(*func)(void *))
+start_threads(WTPERF *wtperf,
+ WORKLOAD *workp, WTPERF_THREAD *base, u_int num, void *(*func)(void *))
{
- CONFIG_THREAD *thread;
+ CONFIG_OPTS *opts;
+ WTPERF_THREAD *thread;
u_int i;
int ret;
+ opts = wtperf->opts;
+
/* Initialize the threads. */
for (i = 0, thread = base; i < num; ++i, ++thread) {
- thread->cfg = cfg;
+ thread->wtperf = wtperf;
thread->workload = workp;
/*
* We don't want the threads executing in lock-step, seed each
* one differently.
*/
- if ((ret = __wt_random_init_seed(NULL, &thread->rnd)) != 0) {
- lprintf(cfg, ret, 0, "Error initializing RNG");
- return (ret);
- }
+ __wt_random_init_seed(NULL, &thread->rnd);
/*
* Every thread gets a key/data buffer because we don't bother
@@ -2512,14 +2638,14 @@ start_threads(CONFIG *cfg,
* don't, it's not enough memory to bother. These buffers hold
* strings: trailing NUL is included in the size.
*/
- thread->key_buf = dcalloc(cfg->key_sz, 1);
- thread->value_buf = dcalloc(cfg->value_sz_max, 1);
+ thread->key_buf = dcalloc(opts->key_sz, 1);
+ thread->value_buf = dcalloc(opts->value_sz_max, 1);
/*
* Initialize and then toss in a bit of random values if needed.
*/
- memset(thread->value_buf, 'a', cfg->value_sz - 1);
- if (cfg->random_value)
+ memset(thread->value_buf, 'a', opts->value_sz - 1);
+ if (opts->random_value)
randomize_value(thread, thread->value_buf);
/*
@@ -2537,7 +2663,7 @@ start_threads(CONFIG *cfg,
for (i = 0, thread = base; i < num; ++i, ++thread)
if ((ret = pthread_create(
&thread->handle, NULL, func, thread)) != 0) {
- lprintf(cfg, ret, 0, "Error creating thread");
+ lprintf(wtperf, ret, 0, "Error creating thread");
return (ret);
}
@@ -2545,7 +2671,7 @@ start_threads(CONFIG *cfg,
}
static int
-stop_threads(CONFIG *cfg, u_int num, CONFIG_THREAD *threads)
+stop_threads(WTPERF *wtperf, u_int num, WTPERF_THREAD *threads)
{
u_int i;
int ret;
@@ -2555,7 +2681,7 @@ stop_threads(CONFIG *cfg, u_int num, CONFIG_THREAD *threads)
for (i = 0; i < num; ++i, ++threads) {
if ((ret = pthread_join(threads->handle, NULL)) != 0) {
- lprintf(cfg, ret, 0, "Error joining thread");
+ lprintf(wtperf, ret, 0, "Error joining thread");
return (ret);
}
@@ -2588,35 +2714,38 @@ recreate_dir(const char *name)
}
static int
-drop_all_tables(CONFIG *cfg)
+drop_all_tables(WTPERF *wtperf)
{
struct timespec start, stop;
+ CONFIG_OPTS *opts;
WT_SESSION *session;
size_t i;
uint64_t msecs;
int ret, t_ret;
+ opts = wtperf->opts;
+
/* Drop any tables. */
- if ((ret = cfg->conn->open_session(
- cfg->conn, NULL, cfg->sess_config, &session)) != 0) {
- lprintf(cfg, ret, 0,
- "Error opening a session on %s", cfg->home);
+ if ((ret = wtperf->conn->open_session(
+ wtperf->conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0,
+ "Error opening a session on %s", wtperf->home);
return (ret);
}
- testutil_check(__wt_epoch(NULL, &start));
- for (i = 0; i < cfg->table_count; i++) {
- if ((ret = session->drop(
- session, cfg->uris[i], NULL)) != 0) {
- lprintf(cfg, ret, 0,
- "Error dropping table %s", cfg->uris[i]);
+ __wt_epoch(NULL, &start);
+ for (i = 0; i < opts->table_count; i++) {
+ if ((ret =
+ session->drop(session, wtperf->uris[i], NULL)) != 0) {
+ lprintf(wtperf, ret, 0,
+ "Error dropping table %s", wtperf->uris[i]);
goto err;
}
}
- testutil_check(__wt_epoch(NULL, &stop));
+ __wt_epoch(NULL, &stop);
msecs = WT_TIMEDIFF_MS(stop, start);
- lprintf(cfg, 0, 1,
+ lprintf(wtperf, 0, 1,
"Executed %" PRIu32 " drop operations average time %" PRIu64 "ms",
- cfg->table_count, msecs / cfg->table_count);
+ opts->table_count, msecs / opts->table_count);
err: if ((t_ret = session->close(session, NULL)) != 0 && ret == 0)
ret = t_ret;
@@ -2624,27 +2753,34 @@ err: if ((t_ret = session->close(session, NULL)) != 0 && ret == 0)
}
static uint64_t
-wtperf_value_range(CONFIG *cfg)
+wtperf_value_range(WTPERF *wtperf)
{
- if (cfg->random_range)
- return (cfg->icount + cfg->random_range);
+ CONFIG_OPTS *opts;
+
+ opts = wtperf->opts;
+
+ if (opts->random_range)
+ return (opts->icount + opts->random_range);
/*
* It is legal to configure a zero size populate phase, hide that
* from other code by pretending the range is 1 in that case.
*/
- if (cfg->icount + cfg->insert_key == 0)
+ if (opts->icount + wtperf->insert_key == 0)
return (1);
- return (cfg->icount + cfg->insert_key - (u_int)(cfg->workers_cnt + 1));
+ return (opts->icount +
+ wtperf->insert_key - (u_int)(wtperf->workers_cnt + 1));
}
static uint64_t
-wtperf_rand(CONFIG_THREAD *thread)
+wtperf_rand(WTPERF_THREAD *thread)
{
- CONFIG *cfg;
+ CONFIG_OPTS *opts;
+ WTPERF *wtperf;
double S1, S2, U;
uint64_t rval;
- cfg = thread->cfg;
+ wtperf = thread->wtperf;
+ opts = wtperf->opts;
/*
* Use WiredTiger's random number routine: it's lock-free and fairly
@@ -2653,11 +2789,11 @@ wtperf_rand(CONFIG_THREAD *thread)
rval = __wt_random(&thread->rnd);
/* Use Pareto distribution to give 80/20 hot/cold values. */
- if (cfg->pareto != 0) {
+ if (opts->pareto != 0) {
#define PARETO_SHAPE 1.5
S1 = (-1 / PARETO_SHAPE);
- S2 = wtperf_value_range(cfg) *
- (cfg->pareto / 100.0) * (PARETO_SHAPE - 1);
+ S2 = wtperf_value_range(wtperf) *
+ (opts->pareto / 100.0) * (PARETO_SHAPE - 1);
U = 1 - (double)rval / (double)UINT32_MAX;
rval = (uint64_t)((pow(U, S1) - 1) * S2);
/*
@@ -2665,13 +2801,13 @@ wtperf_rand(CONFIG_THREAD *thread)
* 2% of the time, from my testing. That will lead to the
* first item in the table being "hot".
*/
- if (rval > wtperf_value_range(cfg))
+ if (rval > wtperf_value_range(wtperf))
rval = 0;
}
/*
* Wrap the key to within the expected range and avoid zero: we never
* insert that key.
*/
- rval = (rval % wtperf_value_range(cfg)) + 1;
+ rval = (rval % wtperf_value_range(wtperf)) + 1;
return (rval);
}