summaryrefslogtreecommitdiff
path: root/bench/wtperf/wtperf.c
diff options
context:
space:
mode:
Diffstat (limited to 'bench/wtperf/wtperf.c')
-rw-r--r--bench/wtperf/wtperf.c745
1 files changed, 291 insertions, 454 deletions
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c
index a628f847476..c689fca80f7 100644
--- a/bench/wtperf/wtperf.c
+++ b/bench/wtperf/wtperf.c
@@ -31,6 +31,44 @@
/* Default values. */
#define DEFAULT_HOME "WT_TEST"
#define DEFAULT_MONITOR_DIR "WT_TEST"
+static const CONFIG default_cfg = {
+ NULL, /* home */
+ NULL, /* monitor dir */
+ NULL, /* partial logging */
+ NULL, /* reopen config */
+ NULL, /* base_uri */
+ NULL, /* uris */
+ NULL, /* conn */
+ NULL, /* logf */
+ NULL, /* async */
+ NULL, NULL, /* compressor ext, blk */
+ NULL, NULL, /* populate, checkpoint threads */
+
+ NULL, /* worker threads */
+ 0, /* worker thread count */
+ NULL, /* workloads */
+ 0, /* workload count */
+ 0, /* use_asyncops */
+ 0, /* checkpoint operations */
+ 0, /* insert operations */
+ 0, /* read operations */
+ 0, /* truncate operations */
+ 0, /* update operations */
+ 0, /* insert key */
+ 0, /* checkpoint in progress */
+ 0, /* thread error */
+ 0, /* notify threads to stop */
+ 0, /* in warmup phase */
+ false, /* Signal for idle cycle thread */
+ 0, /* total seconds running */
+ 0, /* flags */
+ {NULL, NULL}, /* the truncate queue */
+ {NULL, NULL}, /* the config queue */
+
+#define OPT_DEFINE_DEFAULT
+#include "wtperf_opt.i"
+#undef OPT_DEFINE_DEFAULT
+};
static const char * const debug_cconfig = "";
static const char * const debug_tconfig = "";
@@ -72,12 +110,9 @@ get_next_incr(CONFIG *cfg)
static void
randomize_value(CONFIG_THREAD *thread, char *value_buf)
{
- CONFIG_OPTS *opts;
uint8_t *vb;
uint32_t i, max_range, rand_val;
- opts = thread->cfg->opts;
-
/*
* Limit how much of the buffer we validate for length, this means
* that only threads that do growing updates will ever make changes to
@@ -86,11 +121,11 @@ randomize_value(CONFIG_THREAD *thread, char *value_buf)
* in this performance sensitive function.
*/
if (thread->workload == NULL || thread->workload->update_delta == 0)
- max_range = opts->value_sz;
+ max_range = thread->cfg->value_sz;
else if (thread->workload->update_delta > 0)
- max_range = opts->value_sz_max;
+ max_range = thread->cfg->value_sz_max;
else
- max_range = opts->value_sz_min;
+ max_range = thread->cfg->value_sz_min;
/*
* Generate a single random value and re-use it. We generally only
@@ -122,19 +157,15 @@ randomize_value(CONFIG_THREAD *thread, char *value_buf)
static uint32_t
map_key_to_table(CONFIG *cfg, uint64_t k)
{
- CONFIG_OPTS *opts;
-
- opts = cfg->opts;
-
- if (opts->range_partition) {
+ if (cfg->range_partition) {
/* Take care to return a result in [0..table_count-1]. */
- if (k > opts->icount + opts->random_range)
+ if (k > cfg->icount + cfg->random_range)
return (0);
return ((uint32_t)((k - 1) /
- ((opts->icount + opts->random_range +
- opts->table_count - 1) / opts->table_count)));
+ ((cfg->icount + cfg->random_range + cfg->table_count - 1) /
+ cfg->table_count)));
} else
- return ((uint32_t)(k % opts->table_count));
+ return ((uint32_t)(k % cfg->table_count));
}
/*
@@ -146,25 +177,23 @@ static inline void
update_value_delta(CONFIG_THREAD *thread)
{
CONFIG *cfg;
- CONFIG_OPTS *opts;
char * value;
int64_t delta, len, new_len;
cfg = thread->cfg;
- opts = cfg->opts;
value = thread->value_buf;
delta = thread->workload->update_delta;
len = (int64_t)strlen(value);
if (delta == INT64_MAX)
delta = __wt_random(&thread->rnd) %
- (opts->value_sz_max - opts->value_sz);
+ (cfg->value_sz_max - cfg->value_sz);
/* Ensure we aren't changing across boundaries */
- if (delta > 0 && len + delta > opts->value_sz_max)
- delta = opts->value_sz_max - len;
- else if (delta < 0 && len + delta < opts->value_sz_min)
- delta = opts->value_sz_min - len;
+ if (delta > 0 && len + delta > cfg->value_sz_max)
+ delta = cfg->value_sz_max - len;
+ else if (delta < 0 && len + delta < cfg->value_sz_min)
+ delta = cfg->value_sz_min - len;
/* Bail if there isn't anything to do */
if (delta == 0)
@@ -175,7 +204,7 @@ update_value_delta(CONFIG_THREAD *thread)
else {
/* Extend the value by the configured amount. */
for (new_len = len;
- new_len < opts->value_sz_max && new_len - len < delta;
+ new_len < cfg->value_sz_max && new_len - len < delta;
new_len++)
value[new_len] = 'a';
}
@@ -250,7 +279,7 @@ err:
/* Panic if error */
lprintf(cfg, ret, 0, "Error in op %" PRIu64,
op->get_id(op));
- cfg->error = cfg->stop = true;
+ cfg->error = cfg->stop = 1;
return (1);
}
@@ -323,7 +352,6 @@ static void *
worker_async(void *arg)
{
CONFIG *cfg;
- CONFIG_OPTS *opts;
CONFIG_THREAD *thread;
WT_ASYNC_OP *asyncop;
WT_CONNECTION *conn;
@@ -334,7 +362,6 @@ worker_async(void *arg)
thread = (CONFIG_THREAD *)arg;
cfg = thread->cfg;
- opts = cfg->opts;
conn = cfg->conn;
key_buf = thread->key_buf;
@@ -351,10 +378,10 @@ worker_async(void *arg)
switch (*op) {
case WORKER_INSERT:
case WORKER_INSERT_RMW:
- if (opts->random_range)
+ if (cfg->random_range)
next_val = wtperf_rand(thread);
else
- next_val = opts->icount + get_next_incr(cfg);
+ next_val = cfg->icount + get_next_incr(cfg);
break;
case WORKER_READ:
case WORKER_UPDATE:
@@ -395,14 +422,14 @@ worker_async(void *arg)
break;
goto op_err;
case WORKER_INSERT:
- if (opts->random_value)
+ if (cfg->random_value)
randomize_value(thread, value_buf);
asyncop->set_value(asyncop, value_buf);
if ((ret = asyncop->insert(asyncop)) == 0)
break;
goto op_err;
case WORKER_UPDATE:
- if (opts->random_value)
+ if (cfg->random_value)
randomize_value(thread, value_buf);
asyncop->set_value(asyncop, value_buf);
if ((ret = asyncop->update(asyncop)) == 0)
@@ -425,7 +452,7 @@ op_err: lprintf(cfg, ret, 0,
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = true;
+err: cfg->error = cfg->stop = 1;
}
return (NULL);
}
@@ -438,17 +465,15 @@ err: cfg->error = cfg->stop = true;
static int
do_range_reads(CONFIG *cfg, WT_CURSOR *cursor)
{
- CONFIG_OPTS *opts;
size_t range;
uint64_t next_val, prev_val;
char *range_key_buf;
char buf[512];
int ret;
- opts = cfg->opts;
ret = 0;
- if (opts->read_range == 0)
+ if (cfg->read_range == 0)
return (0);
memset(&buf[0], 0, 512 * sizeof(char));
@@ -458,7 +483,7 @@ do_range_reads(CONFIG *cfg, WT_CURSOR *cursor)
testutil_check(cursor->get_key(cursor, &range_key_buf));
extract_key(range_key_buf, &next_val);
- for (range = 0; range < opts->read_range; ++range) {
+ for (range = 0; range < cfg->read_range; ++range) {
prev_val = next_val;
ret = cursor->next(cursor);
/* We are done if we reach the end. */
@@ -484,7 +509,6 @@ worker(void *arg)
{
struct timespec start, stop;
CONFIG *cfg;
- CONFIG_OPTS *opts;
CONFIG_THREAD *thread;
TRACK *trk;
WT_CONNECTION *conn;
@@ -500,7 +524,6 @@ worker(void *arg)
thread = (CONFIG_THREAD *)arg;
cfg = thread->cfg;
- opts = cfg->opts;
conn = cfg->conn;
cursors = NULL;
ops = 0;
@@ -509,12 +532,12 @@ worker(void *arg)
trk = NULL;
if ((ret = conn->open_session(
- conn, NULL, opts->sess_config, &session)) != 0) {
+ conn, NULL, cfg->sess_config, &session)) != 0) {
lprintf(cfg, ret, 0, "worker: WT_CONNECTION.open_session");
goto err;
}
- cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
- for (i = 0; i < opts->table_count_idle; i++) {
+ cursors = dcalloc(cfg->table_count, sizeof(WT_CURSOR *));
+ for (i = 0; i < cfg->table_count_idle; i++) {
snprintf(buf, 512, "%s_idle%05d", cfg->uris[0], (int)i);
if ((ret = session->open_cursor(
session, buf, NULL, NULL, &tmp_cursor)) != 0) {
@@ -528,7 +551,7 @@ worker(void *arg)
goto err;
}
}
- for (i = 0; i < opts->table_count; i++) {
+ for (i = 0; i < cfg->table_count; i++) {
if ((ret = session->open_cursor(session,
cfg->uris[i], NULL, NULL, &cursors[i])) != 0) {
lprintf(cfg, ret, 0,
@@ -567,10 +590,10 @@ worker(void *arg)
case WORKER_INSERT:
case WORKER_INSERT_RMW:
trk = &thread->insert;
- if (opts->random_range)
+ if (cfg->random_range)
next_val = wtperf_rand(thread);
else
- next_val = opts->icount + get_next_incr(cfg);
+ next_val = cfg->icount + get_next_incr(cfg);
break;
case WORKER_READ:
trk = &thread->read;
@@ -608,8 +631,8 @@ worker(void *arg)
* is 0, to avoid first time latency spikes.
*/
measure_latency =
- opts->sample_interval != 0 && trk != NULL &&
- trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
+ cfg->sample_interval != 0 && trk != NULL &&
+ trk->ops != 0 && (trk->ops % cfg->sample_rate == 0);
if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) {
lprintf(cfg, ret, 0, "Get time call failed");
goto err;
@@ -654,7 +677,7 @@ worker(void *arg)
/* FALLTHROUGH */
case WORKER_INSERT:
- if (opts->random_value)
+ if (cfg->random_value)
randomize_value(thread, value_buf);
cursor->set_value(cursor, value_buf);
if ((ret = cursor->insert(cursor)) == 0)
@@ -685,14 +708,14 @@ worker(void *arg)
* safe, and be sure to NUL-terminate.
*/
strncpy(value_buf,
- value, opts->value_sz_max - 1);
+ value, cfg->value_sz_max - 1);
if (thread->workload->update_delta != 0)
update_value_delta(thread);
if (value_buf[0] == 'a')
value_buf[0] = 'b';
else
value_buf[0] = 'a';
- if (opts->random_value)
+ if (cfg->random_value)
randomize_value(thread, value_buf);
cursor->set_value(cursor, value_buf);
if ((ret = cursor->update(cursor)) == 0)
@@ -746,7 +769,7 @@ op_err: if (ret == WT_ROLLBACK && ops_per_txn != 0) {
}
/* Release the cursor, if we have multiple tables. */
- if (opts->table_count > 1 && ret == 0 &&
+ if (cfg->table_count > 1 && ret == 0 &&
*op != WORKER_INSERT && *op != WORKER_INSERT_RMW) {
if ((ret = cursor->reset(cursor)) != 0) {
lprintf(cfg, ret, 0, "Cursor reset failed");
@@ -805,7 +828,7 @@ op_err: if (ret == WT_ROLLBACK && ops_per_txn != 0) {
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = true;
+err: cfg->error = cfg->stop = 1;
}
free(cursors);
@@ -860,11 +883,8 @@ run_mix_schedule_op(WORKLOAD *workp, int op, int64_t op_cnt)
static int
run_mix_schedule(CONFIG *cfg, WORKLOAD *workp)
{
- CONFIG_OPTS *opts;
int64_t pct;
- opts = cfg->opts;
-
/* Confirm reads, inserts, truncates and updates cannot all be zero. */
if (workp->insert == 0 && workp->read == 0 &&
workp->truncate == 0 && workp->update == 0) {
@@ -895,7 +915,7 @@ run_mix_schedule(CONFIG *cfg, WORKLOAD *workp)
*/
if (workp->insert != 0 && workp->read == 0 && workp->update == 0) {
memset(workp->ops,
- opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT,
+ cfg->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT,
sizeof(workp->ops));
return (0);
}
@@ -927,7 +947,7 @@ run_mix_schedule(CONFIG *cfg, WORKLOAD *workp)
(workp->insert + workp->read + workp->update);
if (pct != 0)
run_mix_schedule_op(workp,
- opts->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT, pct);
+ cfg->insert_rmw ? WORKER_INSERT_RMW : WORKER_INSERT, pct);
pct = (workp->update * 100) /
(workp->insert + workp->read + workp->update);
if (pct != 0)
@@ -940,7 +960,6 @@ populate_thread(void *arg)
{
struct timespec start, stop;
CONFIG *cfg;
- CONFIG_OPTS *opts;
CONFIG_THREAD *thread;
TRACK *trk;
WT_CONNECTION *conn;
@@ -955,7 +974,6 @@ populate_thread(void *arg)
thread = (CONFIG_THREAD *)arg;
cfg = thread->cfg;
- opts = cfg->opts;
conn = cfg->conn;
session = NULL;
cursors = NULL;
@@ -966,17 +984,17 @@ populate_thread(void *arg)
value_buf = thread->value_buf;
if ((ret = conn->open_session(
- conn, NULL, opts->sess_config, &session)) != 0) {
+ conn, NULL, cfg->sess_config, &session)) != 0) {
lprintf(cfg, ret, 0, "populate: WT_CONNECTION.open_session");
goto err;
}
/* Do bulk loads if populate is single-threaded. */
cursor_config =
- (opts->populate_threads == 1 && !opts->index) ? "bulk" : NULL;
+ (cfg->populate_threads == 1 && !cfg->index) ? "bulk" : NULL;
/* Create the cursors. */
- cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
- for (i = 0; i < opts->table_count; i++) {
+ cursors = dcalloc(cfg->table_count, sizeof(WT_CURSOR *));
+ for (i = 0; i < cfg->table_count; i++) {
if ((ret = session->open_cursor(
session, cfg->uris[i], NULL,
cursor_config, &cursors[i])) != 0) {
@@ -990,12 +1008,12 @@ populate_thread(void *arg)
/* Populate the databases. */
for (intxn = 0, opcount = 0;;) {
op = get_next_incr(cfg);
- if (op > opts->icount)
+ if (op > cfg->icount)
break;
- if (opts->populate_ops_per_txn != 0 && !intxn) {
+ if (cfg->populate_ops_per_txn != 0 && !intxn) {
if ((ret = session->begin_transaction(
- session, opts->transaction_config)) != 0) {
+ session, cfg->transaction_config)) != 0) {
lprintf(cfg, ret, 0,
"Failed starting transaction.");
goto err;
@@ -1008,14 +1026,14 @@ populate_thread(void *arg)
cursor = cursors[map_key_to_table(cfg, op)];
generate_key(cfg, key_buf, op);
measure_latency =
- opts->sample_interval != 0 &&
- trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
+ cfg->sample_interval != 0 &&
+ trk->ops != 0 && (trk->ops % cfg->sample_rate == 0);
if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) {
lprintf(cfg, ret, 0, "Get time call failed");
goto err;
}
cursor->set_key(cursor, key_buf);
- if (opts->random_value)
+ if (cfg->random_value)
randomize_value(thread, value_buf);
cursor->set_value(cursor, value_buf);
if ((ret = cursor->insert(cursor)) == WT_ROLLBACK) {
@@ -1049,12 +1067,12 @@ populate_thread(void *arg)
}
++thread->insert.ops; /* Same as trk->ops */
- if (opts->checkpoint_stress_rate != 0 &&
- (op % opts->checkpoint_stress_rate) == 0)
+ if (cfg->checkpoint_stress_rate != 0 &&
+ (op % cfg->checkpoint_stress_rate) == 0)
stress_checkpoint_due = 1;
- if (opts->populate_ops_per_txn != 0) {
- if (++opcount < opts->populate_ops_per_txn)
+ if (cfg->populate_ops_per_txn != 0) {
+ if (++opcount < cfg->populate_ops_per_txn)
continue;
opcount = 0;
@@ -1085,7 +1103,7 @@ populate_thread(void *arg)
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = true;
+err: cfg->error = cfg->stop = 1;
}
free(cursors);
@@ -1097,7 +1115,6 @@ populate_async(void *arg)
{
struct timespec start, stop;
CONFIG *cfg;
- CONFIG_OPTS *opts;
CONFIG_THREAD *thread;
TRACK *trk;
WT_ASYNC_OP *asyncop;
@@ -1109,7 +1126,6 @@ populate_async(void *arg)
thread = (CONFIG_THREAD *)arg;
cfg = thread->cfg;
- opts = cfg->opts;
conn = cfg->conn;
session = NULL;
ret = 0;
@@ -1119,7 +1135,7 @@ populate_async(void *arg)
value_buf = thread->value_buf;
if ((ret = conn->open_session(
- conn, NULL, opts->sess_config, &session)) != 0) {
+ conn, NULL, cfg->sess_config, &session)) != 0) {
lprintf(cfg, ret, 0, "populate: WT_CONNECTION.open_session");
goto err;
}
@@ -1130,8 +1146,8 @@ populate_async(void *arg)
* the time to process by workers.
*/
measure_latency =
- opts->sample_interval != 0 &&
- trk->ops != 0 && (trk->ops % opts->sample_rate == 0);
+ cfg->sample_interval != 0 &&
+ trk->ops != 0 && (trk->ops % cfg->sample_rate == 0);
if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) {
lprintf(cfg, ret, 0, "Get time call failed");
goto err;
@@ -1139,7 +1155,7 @@ populate_async(void *arg)
/* Populate the databases. */
for (;;) {
op = get_next_incr(cfg);
- if (op > opts->icount)
+ if (op > cfg->icount)
break;
/*
* Allocate an async op for whichever table.
@@ -1154,7 +1170,7 @@ populate_async(void *arg)
asyncop->app_private = thread;
generate_key(cfg, key_buf, op);
asyncop->set_key(asyncop, key_buf);
- if (opts->random_value)
+ if (cfg->random_value)
randomize_value(thread, value_buf);
asyncop->set_value(asyncop, value_buf);
if ((ret = asyncop->insert(asyncop)) != 0) {
@@ -1188,7 +1204,7 @@ populate_async(void *arg)
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = true;
+err: cfg->error = cfg->stop = 1;
}
return (NULL);
}
@@ -1199,7 +1215,6 @@ monitor(void *arg)
struct timespec t;
struct tm *tm, _tm;
CONFIG *cfg;
- CONFIG_OPTS *opts;
FILE *fp;
size_t len;
uint64_t min_thr, reads, inserts, updates;
@@ -1215,14 +1230,12 @@ monitor(void *arg)
char buf[64], *path;
cfg = (CONFIG *)arg;
- opts = cfg->opts;
- assert(opts->sample_interval != 0);
-
+ assert(cfg->sample_interval != 0);
fp = NULL;
path = NULL;
- min_thr = (uint64_t)opts->min_throughput;
- latency_max = (uint32_t)ms_to_us(opts->max_latency);
+ min_thr = (uint64_t)cfg->min_throughput;
+ latency_max = (uint32_t)ms_to_us(cfg->max_latency);
/* Open the logging file. */
len = strlen(cfg->monitor_dir) + 100;
@@ -1253,7 +1266,7 @@ monitor(void *arg)
"\n");
last_reads = last_inserts = last_updates = 0;
while (!cfg->stop) {
- for (i = 0; i < opts->sample_interval; i++) {
+ for (i = 0; i < cfg->sample_interval; i++) {
sleep(1);
if (cfg->stop)
break;
@@ -1278,8 +1291,8 @@ monitor(void *arg)
latency_insert(cfg, &insert_avg, &insert_min, &insert_max);
latency_update(cfg, &update_avg, &update_min, &update_max);
- cur_reads = (reads - last_reads) / opts->sample_interval;
- cur_updates = (updates - last_updates) / opts->sample_interval;
+ cur_reads = (reads - last_reads) / cfg->sample_interval;
+ cur_updates = (updates - last_updates) / cfg->sample_interval;
/*
* For now the only item we need to worry about changing is
* inserts when we transition from the populate phase to
@@ -1289,7 +1302,7 @@ monitor(void *arg)
cur_inserts = 0;
else
cur_inserts =
- (inserts - last_inserts) / opts->sample_interval;
+ (inserts - last_inserts) / cfg->sample_interval;
(void)fprintf(fp,
"%s,%" PRIu32
@@ -1309,7 +1322,7 @@ monitor(void *arg)
if (latency_max != 0 &&
(read_max > latency_max || insert_max > latency_max ||
update_max > latency_max)) {
- if (opts->max_latency_fatal) {
+ if (cfg->max_latency_fatal) {
level = 1;
msg_err = WT_PANIC;
str = "ERROR";
@@ -1328,7 +1341,7 @@ monitor(void *arg)
((cur_reads != 0 && cur_reads < min_thr) ||
(cur_inserts != 0 && cur_inserts < min_thr) ||
(cur_updates != 0 && cur_updates < min_thr))) {
- if (opts->min_throughput_fatal) {
+ if (cfg->min_throughput_fatal) {
level = 1;
msg_err = WT_PANIC;
str = "ERROR";
@@ -1350,7 +1363,7 @@ monitor(void *arg)
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = true;
+err: cfg->error = cfg->stop = 1;
}
if (fp != NULL)
@@ -1364,7 +1377,6 @@ static void *
checkpoint_worker(void *arg)
{
CONFIG *cfg;
- CONFIG_OPTS *opts;
CONFIG_THREAD *thread;
WT_CONNECTION *conn;
WT_SESSION *session;
@@ -1374,12 +1386,11 @@ checkpoint_worker(void *arg)
thread = (CONFIG_THREAD *)arg;
cfg = thread->cfg;
- opts = cfg->opts;
conn = cfg->conn;
session = NULL;
if ((ret = conn->open_session(
- conn, NULL, opts->sess_config, &session)) != 0) {
+ conn, NULL, cfg->sess_config, &session)) != 0) {
lprintf(cfg, ret, 0,
"open_session failed in checkpoint thread.");
goto err;
@@ -1387,7 +1398,7 @@ checkpoint_worker(void *arg)
while (!cfg->stop) {
/* Break the sleep up, so we notice interrupts faster. */
- for (i = 0; i < opts->checkpoint_interval; i++) {
+ for (i = 0; i < cfg->checkpoint_interval; i++) {
sleep(1);
if (cfg->stop)
break;
@@ -1400,12 +1411,12 @@ checkpoint_worker(void *arg)
lprintf(cfg, ret, 0, "Get time failed in checkpoint.");
goto err;
}
- cfg->ckpt = true;
+ cfg->ckpt = 1;
if ((ret = session->checkpoint(session, NULL)) != 0) {
lprintf(cfg, ret, 0, "Checkpoint failed.");
goto err;
}
- cfg->ckpt = false;
+ cfg->ckpt = 0;
++thread->ckpt.ops;
if ((ret = __wt_epoch(NULL, &e)) != 0) {
@@ -1423,7 +1434,7 @@ checkpoint_worker(void *arg)
/* Notify our caller we failed and shut the system down. */
if (0) {
-err: cfg->error = cfg->stop = true;
+err: cfg->error = cfg->stop = 1;
}
return (NULL);
@@ -1433,7 +1444,6 @@ static int
execute_populate(CONFIG *cfg)
{
struct timespec start, stop;
- CONFIG_OPTS *opts;
CONFIG_THREAD *popth;
WT_ASYNC_OP *asyncop;
pthread_t idle_table_cycle_thread;
@@ -1444,12 +1454,10 @@ execute_populate(CONFIG *cfg)
int elapsed, ret;
void *(*pfunc)(void *);
- opts = cfg->opts;
-
lprintf(cfg, 0, 1,
"Starting %" PRIu32
" populate thread(s) for %" PRIu32 " items",
- opts->populate_threads, opts->icount);
+ cfg->populate_threads, cfg->icount);
/* Start cycling idle tables if configured. */
if ((ret = start_idle_table_cycle(cfg, &idle_table_cycle_thread)) != 0)
@@ -1457,16 +1465,15 @@ execute_populate(CONFIG *cfg)
cfg->insert_key = 0;
- cfg->popthreads =
- dcalloc(opts->populate_threads, sizeof(CONFIG_THREAD));
- if (cfg->use_asyncops) {
+ cfg->popthreads = dcalloc(cfg->populate_threads, sizeof(CONFIG_THREAD));
+ if (cfg->use_asyncops > 0) {
lprintf(cfg, 0, 1, "Starting %" PRIu32 " async thread(s)",
- opts->async_threads);
+ cfg->async_threads);
pfunc = populate_async;
} else
pfunc = populate_thread;
if ((ret = start_threads(cfg, NULL,
- cfg->popthreads, opts->populate_threads, pfunc)) != 0)
+ cfg->popthreads, cfg->populate_threads, pfunc)) != 0)
return (ret);
if ((ret = __wt_epoch(NULL, &start)) != 0) {
@@ -1474,26 +1481,26 @@ execute_populate(CONFIG *cfg)
return (ret);
}
for (elapsed = 0, interval = 0, last_ops = 0;
- cfg->insert_key < opts->icount && !cfg->error;) {
+ cfg->insert_key < cfg->icount && cfg->error == 0;) {
/*
* Sleep for 100th of a second, report_interval is in second
* granularity, each 100th increment of elapsed is a single
* increment of interval.
*/
(void)usleep(10000);
- if (opts->report_interval == 0 || ++elapsed < 100)
+ if (cfg->report_interval == 0 || ++elapsed < 100)
continue;
elapsed = 0;
- if (++interval < opts->report_interval)
+ if (++interval < cfg->report_interval)
continue;
interval = 0;
- cfg->totalsec += opts->report_interval;
+ cfg->totalsec += cfg->report_interval;
cfg->insert_ops = sum_pop_ops(cfg);
lprintf(cfg, 0, 1,
"%" PRIu64 " populate inserts (%" PRIu64 " of %"
PRIu32 ") in %" PRIu32 " secs (%" PRIu32 " total secs)",
cfg->insert_ops - last_ops, cfg->insert_ops,
- opts->icount, opts->report_interval, cfg->totalsec);
+ cfg->icount, cfg->report_interval, cfg->totalsec);
last_ops = cfg->insert_ops;
}
if ((ret = __wt_epoch(NULL, &stop)) != 0) {
@@ -1509,19 +1516,19 @@ execute_populate(CONFIG *cfg)
*/
popth = cfg->popthreads;
cfg->popthreads = NULL;
- ret = stop_threads(cfg, opts->populate_threads, popth);
+ ret = stop_threads(cfg, cfg->populate_threads, popth);
free(popth);
if (ret != 0)
return (ret);
/* Report if any worker threads didn't finish. */
- if (cfg->error) {
+ if (cfg->error != 0) {
lprintf(cfg, WT_ERROR, 0,
"Populate thread(s) exited without finishing.");
return (WT_ERROR);
}
- lprintf(cfg, 0, 1, "Finished load of %" PRIu32 " items", opts->icount);
+ lprintf(cfg, 0, 1, "Finished load of %" PRIu32 " items", cfg->icount);
msecs = WT_TIMEDIFF_MS(stop, start);
/*
@@ -1533,7 +1540,7 @@ execute_populate(CONFIG *cfg)
print_ops_sec = 0;
} else {
print_secs = (double)msecs / (double)MSEC_PER_SEC;
- print_ops_sec = (uint64_t)(opts->icount / print_secs);
+ print_ops_sec = (uint64_t)(cfg->icount / print_secs);
}
lprintf(cfg, 0, 1,
"Load time: %.2f\n" "load ops/sec: %" PRIu64,
@@ -1544,15 +1551,15 @@ execute_populate(CONFIG *cfg)
* set an unlimited timeout because if we close the connection
* then any in-progress compact/merge is aborted.
*/
- if (opts->compact) {
- assert(opts->async_threads > 0);
+ if (cfg->compact) {
+ assert(cfg->async_threads > 0);
lprintf(cfg, 0, 1, "Compact after populate");
if ((ret = __wt_epoch(NULL, &start)) != 0) {
lprintf(cfg, ret, 0, "Get time failed in populate.");
return (ret);
}
- tables = opts->table_count;
- for (i = 0; i < opts->table_count; i++) {
+ tables = cfg->table_count;
+ for (i = 0; i < cfg->table_count; i++) {
/*
* If no ops are available, retry. Any other error,
* return.
@@ -1593,12 +1600,9 @@ execute_populate(CONFIG *cfg)
static int
close_reopen(CONFIG *cfg)
{
- CONFIG_OPTS *opts;
int ret;
- opts = cfg->opts;
-
- if (!opts->readonly && !opts->reopen_connection)
+ if (!cfg->readonly && !cfg->reopen_connection)
return (0);
/*
* Reopen the connection. We do this so that the workload phase always
@@ -1624,7 +1628,7 @@ close_reopen(CONFIG *cfg)
* threads looking for work that will never arrive don't affect
* performance.
*/
- if (opts->compact && !cfg->use_asyncops) {
+ if (cfg->compact && cfg->use_asyncops == 0) {
if ((ret = cfg->conn->reconfigure(
cfg->conn, "async=(enabled=false)")) != 0) {
lprintf(cfg, ret, 0, "Reconfigure async off failed");
@@ -1637,7 +1641,6 @@ close_reopen(CONFIG *cfg)
static int
execute_workload(CONFIG *cfg)
{
- CONFIG_OPTS *opts;
CONFIG_THREAD *threads;
WORKLOAD *workp;
WT_CONNECTION *conn;
@@ -1650,8 +1653,6 @@ execute_workload(CONFIG *cfg)
int ret, t_ret;
void *(*pfunc)(void *);
- opts = cfg->opts;
-
cfg->insert_key = 0;
cfg->insert_ops = cfg->read_ops = cfg->truncate_ops = 0;
cfg->update_ops = 0;
@@ -1666,26 +1667,26 @@ execute_workload(CONFIG *cfg)
if ((ret = start_idle_table_cycle(cfg, &idle_table_cycle_thread)) != 0)
return (ret);
- if (opts->warmup != 0)
- cfg->in_warmup = true;
+ if (cfg->warmup != 0)
+ cfg->in_warmup = 1;
/* Allocate memory for the worker threads. */
cfg->workers = dcalloc((size_t)cfg->workers_cnt, sizeof(CONFIG_THREAD));
- if (cfg->use_asyncops) {
+ if (cfg->use_asyncops > 0) {
lprintf(cfg, 0, 1, "Starting %" PRIu32 " async thread(s)",
- opts->async_threads);
+ cfg->async_threads);
pfunc = worker_async;
} else
pfunc = worker;
- if (opts->session_count_idle != 0) {
- sessions = dcalloc((size_t)opts->session_count_idle,
+ if (cfg->session_count_idle != 0) {
+ sessions = dcalloc((size_t)cfg->session_count_idle,
sizeof(WT_SESSION *));
conn = cfg->conn;
- for (i = 0; i < opts->session_count_idle; ++i)
- if ((ret = conn->open_session(conn,
- NULL, opts->sess_config, &sessions[i])) != 0) {
+ for (i = 0; i < cfg->session_count_idle; ++i)
+ if ((ret = conn->open_session(
+ conn, NULL, cfg->sess_config, &sessions[i])) != 0) {
lprintf(cfg, ret, 0,
"execute_workload: idle open_session");
goto err;
@@ -1713,15 +1714,15 @@ execute_workload(CONFIG *cfg)
threads += workp->threads;
}
- if (opts->warmup != 0) {
+ if (cfg->warmup != 0) {
lprintf(cfg, 0, 1,
- "Waiting for warmup duration of %" PRIu32, opts->warmup);
- sleep(opts->warmup);
- cfg->in_warmup = false;
+ "Waiting for warmup duration of %" PRIu32, cfg->warmup);
+ sleep(cfg->warmup);
+ cfg->in_warmup = 0;
}
- for (interval = opts->report_interval,
- run_time = opts->run_time, run_ops = opts->run_ops; !cfg->error;) {
+ for (interval = cfg->report_interval, run_time = cfg->run_time,
+ run_ops = cfg->run_ops; cfg->error == 0;) {
/*
* Sleep for one second at a time.
* If we are tracking run time, check to see if we're done, and
@@ -1750,8 +1751,8 @@ execute_workload(CONFIG *cfg)
/* If writing out throughput information, see if it's time. */
if (interval == 0 || --interval > 0)
continue;
- interval = opts->report_interval;
- cfg->totalsec += opts->report_interval;
+ interval = cfg->report_interval;
+ cfg->totalsec += cfg->report_interval;
lprintf(cfg, 0, 1,
"%" PRIu64 " reads, %" PRIu64 " inserts, %" PRIu64
@@ -1762,7 +1763,7 @@ execute_workload(CONFIG *cfg)
cfg->update_ops - last_updates,
cfg->truncate_ops - last_truncates,
cfg->ckpt_ops - last_ckpts,
- opts->report_interval, cfg->totalsec);
+ cfg->report_interval, cfg->totalsec);
last_reads = cfg->read_ops;
last_inserts = cfg->insert_ops;
last_updates = cfg->update_ops;
@@ -1771,7 +1772,7 @@ execute_workload(CONFIG *cfg)
}
/* Notify the worker threads they are done. */
-err: cfg->stop = true;
+err: cfg->stop = 1;
/* Stop cycling idle tables. */
if ((ret = stop_idle_table_cycle(cfg, idle_table_cycle_thread)) != 0)
@@ -1782,12 +1783,12 @@ err: cfg->stop = true;
ret = t_ret;
/* Drop tables if configured to and this isn't an error path */
- if (ret == 0 && opts->drop_tables && (ret = drop_all_tables(cfg)) != 0)
+ if (ret == 0 && cfg->drop_tables && (ret = drop_all_tables(cfg)) != 0)
lprintf(cfg, ret, 0, "Drop tables failed.");
free(sessions);
/* Report if any worker threads didn't finish. */
- if (cfg->error) {
+ if (cfg->error != 0) {
lprintf(cfg, WT_ERROR, 0,
"Worker thread(s) exited without finishing.");
if (ret == 0)
@@ -1803,7 +1804,6 @@ err: cfg->stop = true;
static int
find_table_count(CONFIG *cfg)
{
- CONFIG_OPTS *opts;
WT_CONNECTION *conn;
WT_CURSOR *cursor;
WT_SESSION *session;
@@ -1811,17 +1811,16 @@ find_table_count(CONFIG *cfg)
int ret, t_ret;
char *key;
- opts = cfg->opts;
conn = cfg->conn;
max_icount = 0;
if ((ret = conn->open_session(
- conn, NULL, opts->sess_config, &session)) != 0) {
+ conn, NULL, cfg->sess_config, &session)) != 0) {
lprintf(cfg, ret, 0,
"find_table_count: open_session failed");
goto out;
}
- for (i = 0; i < opts->table_count; i++) {
+ for (i = 0; i < cfg->table_count; i++) {
if ((ret = session->open_cursor(session, cfg->uris[i],
NULL, NULL, &cursor)) != 0) {
lprintf(cfg, ret, 0,
@@ -1854,65 +1853,61 @@ err: if ((t_ret = session->close(session, NULL)) != 0) {
lprintf(cfg, ret, 0,
"find_table_count: session close failed");
}
- opts->icount = max_icount;
+ cfg->icount = max_icount;
out: return (ret);
}
/*
- * Populate the uri array.
+ * Populate the uri array if more than one table is being used.
*/
static void
create_uris(CONFIG *cfg)
{
- CONFIG_OPTS *opts;
- size_t len;
+ size_t base_uri_len;
uint32_t i;
+ char *uri;
- opts = cfg->opts;
-
- cfg->uris = dcalloc(opts->table_count, sizeof(char *));
- len = strlen("table:") + strlen(opts->table_name) + 20;
- for (i = 0; i < opts->table_count; i++) {
- /* If there is only one table, just use the base name. */
- cfg->uris[i] = dmalloc(len);
- if (opts->table_count == 1)
- sprintf(cfg->uris[i], "table:%s", opts->table_name);
+ base_uri_len = strlen(cfg->base_uri);
+ cfg->uris = dcalloc(cfg->table_count, sizeof(char *));
+ for (i = 0; i < cfg->table_count; i++) {
+ uri = cfg->uris[i] = dcalloc(base_uri_len + 6, 1);
+ /*
+ * If there is only one table, just use base name.
+ */
+ if (cfg->table_count == 1)
+ memcpy(uri, cfg->base_uri, base_uri_len);
else
- sprintf(
- cfg->uris[i], "table:%s%05d", opts->table_name, i);
+ sprintf(uri, "%s%05d", cfg->base_uri, i);
}
}
static int
create_tables(CONFIG *cfg)
{
- CONFIG_OPTS *opts;
WT_SESSION *session;
size_t i;
int ret;
char buf[512];
- opts = cfg->opts;
-
if ((ret = cfg->conn->open_session(
- cfg->conn, NULL, opts->sess_config, &session)) != 0) {
+ cfg->conn, NULL, cfg->sess_config, &session)) != 0) {
lprintf(cfg, ret, 0,
"Error opening a session on %s", cfg->home);
return (ret);
}
- for (i = 0; i < opts->table_count_idle; i++) {
+ for (i = 0; i < cfg->table_count_idle; i++) {
snprintf(buf, 512, "%s_idle%05d", cfg->uris[0], (int)i);
if ((ret = session->create(
- session, buf, opts->table_config)) != 0) {
+ session, buf, cfg->table_config)) != 0) {
lprintf(cfg, ret, 0,
"Error creating idle table %s", buf);
return (ret);
}
}
- for (i = 0; i < opts->table_count; i++) {
- if (opts->log_partial && i > 0) {
+ for (i = 0; i < cfg->table_count; i++) {
+ if (cfg->log_partial && i > 0) {
if (((ret = session->create(session,
cfg->uris[i], cfg->partial_config)) != 0)) {
lprintf(cfg, ret, 0,
@@ -1920,12 +1915,12 @@ create_tables(CONFIG *cfg)
return (ret);
}
} else if ((ret = session->create(
- session, cfg->uris[i], opts->table_config)) != 0) {
+ session, cfg->uris[i], cfg->table_config)) != 0) {
lprintf(cfg, ret, 0,
"Error creating table %s", cfg->uris[i]);
return (ret);
}
- if (opts->index) {
+ if (cfg->index) {
snprintf(buf, 512, "index:%s:val_idx",
cfg->uris[i] + strlen("table:"));
if ((ret = session->create(
@@ -1945,172 +1940,48 @@ create_tables(CONFIG *cfg)
return (0);
}
-/*
- * config_copy --
- * Create a new CONFIG structure as a duplicate of a previous one.
- */
-static int
-config_copy(const CONFIG *src, CONFIG **retp)
-{
- CONFIG *dest;
- CONFIG_OPTS *opts;
- size_t i;
-
- opts = src->opts;
-
- dest = dcalloc(1, sizeof(CONFIG));
-
- if (src->home != NULL)
- dest->home = dstrdup(src->home);
- if (src->monitor_dir != NULL)
- dest->monitor_dir = dstrdup(src->monitor_dir);
- if (src->partial_config != NULL)
- dest->partial_config = dstrdup(src->partial_config);
- if (src->reopen_config != NULL)
- dest->reopen_config = dstrdup(src->reopen_config);
-
- if (src->uris != NULL) {
- dest->uris = dcalloc(opts->table_count, sizeof(char *));
- for (i = 0; i < opts->table_count; i++)
- dest->uris[i] = dstrdup(src->uris[i]);
- }
-
- if (src->async_config != NULL)
- dest->async_config = dstrdup(src->async_config);
-
- dest->ckptthreads = NULL;
- dest->popthreads = NULL;
- dest->workers = NULL;
-
- if (src->workload != NULL) {
- dest->workload = dcalloc(WORKLOAD_MAX, sizeof(WORKLOAD));
- memcpy(dest->workload,
- src->workload, WORKLOAD_MAX * sizeof(WORKLOAD));
- }
-
- TAILQ_INIT(&dest->stone_head);
-
- dest->opts = src->opts;
-
- *retp = dest;
- return (0);
-}
-
-/*
- * config_free --
- * Free any storage allocated in the CONFIG structure.
- */
-static void
-config_free(CONFIG *cfg)
-{
- CONFIG_OPTS *opts;
- size_t i;
-
- opts = cfg->opts;
-
- free(cfg->home);
- free(cfg->monitor_dir);
- free(cfg->partial_config);
- free(cfg->reopen_config);
-
- if (cfg->uris != NULL) {
- for (i = 0; i < opts->table_count; i++)
- free(cfg->uris[i]);
- free(cfg->uris);
- }
-
- free(cfg->async_config);
-
- free(cfg->ckptthreads);
- free(cfg->popthreads);
-
- free(cfg->workers);
- free(cfg->workload);
-
- cleanup_truncate_config(cfg);
-}
-
-/*
- * config_compress --
- * Parse the compression configuration.
- */
-static int
-config_compress(CONFIG *cfg)
-{
- CONFIG_OPTS *opts;
- int ret;
- const char *s;
-
- opts = cfg->opts;
- ret = 0;
-
- s = opts->compression;
- if (strcmp(s, "none") == 0) {
- cfg->compress_ext = NULL;
- cfg->compress_table = NULL;
- } else if (strcmp(s, "lz4") == 0) {
-#ifndef HAVE_BUILTIN_EXTENSION_LZ4
- cfg->compress_ext = LZ4_EXT;
-#endif
- cfg->compress_table = LZ4_BLK;
- } else if (strcmp(s, "snappy") == 0) {
-#ifndef HAVE_BUILTIN_EXTENSION_SNAPPY
- cfg->compress_ext = SNAPPY_EXT;
-#endif
- cfg->compress_table = SNAPPY_BLK;
- } else if (strcmp(s, "zlib") == 0) {
-#ifndef HAVE_BUILTIN_EXTENSION_ZLIB
- cfg->compress_ext = ZLIB_EXT;
-#endif
- cfg->compress_table = ZLIB_BLK;
- } else {
- fprintf(stderr,
- "invalid compression configuration: %s\n", s);
- ret = EINVAL;
- }
- return (ret);
-
-}
-
static int
start_all_runs(CONFIG *cfg)
{
CONFIG *next_cfg, **configs;
- CONFIG_OPTS *opts;
pthread_t *threads;
- size_t i, len;
+ size_t home_len, i;
int ret, t_ret;
+ char *new_home;
- opts = cfg->opts;
- configs = NULL;
ret = 0;
+ configs = NULL;
- if (opts->database_count == 1)
+ if (cfg->database_count == 1)
return (start_run(cfg));
/* Allocate an array to hold our config struct copies. */
- configs = dcalloc(opts->database_count, sizeof(CONFIG *));
+ configs = dcalloc(cfg->database_count, sizeof(CONFIG *));
/* Allocate an array to hold our thread IDs. */
- threads = dcalloc(opts->database_count, sizeof(pthread_t));
+ threads = dcalloc(cfg->database_count, sizeof(pthread_t));
- len = strlen(cfg->home);
- for (i = 0; i < opts->database_count; i++) {
- if ((ret = config_copy(cfg, &next_cfg)) != 0)
- goto err;
+ home_len = strlen(cfg->home);
+ for (i = 0; i < cfg->database_count; i++) {
+ next_cfg = dcalloc(1, sizeof(CONFIG));
configs[i] = next_cfg;
+ if ((ret = config_copy(next_cfg, cfg)) != 0)
+ goto err;
/* Setup a unique home directory for each database. */
- next_cfg->home = dmalloc(len + 5);
- snprintf(next_cfg->home,
- len + 5, "%s/D%02d", cfg->home, (int)i);
+ new_home = dmalloc(home_len + 5);
+ snprintf(new_home, home_len + 5, "%s/D%02d", cfg->home, (int)i);
+ free(next_cfg->home);
+ next_cfg->home = new_home;
/* If the monitor dir is default, update it too. */
- if (strcmp(cfg->monitor_dir, cfg->home) == 0)
- next_cfg->monitor_dir = dstrdup(next_cfg->home);
+ if (strcmp(cfg->monitor_dir, cfg->home) == 0) {
+ free(next_cfg->monitor_dir);
+ next_cfg->monitor_dir = dstrdup(new_home);
+ }
/* If creating the sub-database, recreate its home */
- if (opts->create != 0)
+ if (cfg->create != 0)
recreate_dir(next_cfg->home);
if ((ret = pthread_create(
@@ -2121,14 +1992,14 @@ start_all_runs(CONFIG *cfg)
}
/* Wait for threads to finish. */
- for (i = 0; i < opts->database_count; i++)
+ for (i = 0; i < cfg->database_count; i++)
if ((t_ret = pthread_join(threads[i], NULL)) != 0) {
lprintf(cfg, ret, 0, "Error joining thread");
if (ret == 0)
ret = t_ret;
}
-err: for (i = 0; i < opts->database_count && configs[i] != NULL; i++) {
+err: for (i = 0; i < cfg->database_count && configs[i] != NULL; i++) {
config_free(configs[i]);
free(configs[i]);
}
@@ -2154,13 +2025,11 @@ thread_run_wtperf(void *arg)
static int
start_run(CONFIG *cfg)
{
- CONFIG_OPTS *opts;
pthread_t monitor_thread;
uint64_t total_ops;
uint32_t run_time;
int monitor_created, ret, t_ret;
- opts = cfg->opts;
monitor_created = ret = 0;
/* [-Wconditional-uninitialized] */
memset(&monitor_thread, 0, sizeof(monitor_thread));
@@ -2169,7 +2038,7 @@ start_run(CONFIG *cfg)
goto err;
if ((ret = wiredtiger_open( /* Open the real connection. */
- cfg->home, NULL, opts->conn_config, &cfg->conn)) != 0) {
+ cfg->home, NULL, cfg->conn_config, &cfg->conn)) != 0) {
lprintf(cfg, ret, 0, "Error connecting to %s", cfg->home);
goto err;
}
@@ -2177,26 +2046,27 @@ start_run(CONFIG *cfg)
create_uris(cfg);
/* If creating, create the tables. */
- if (opts->create != 0 && (ret = create_tables(cfg)) != 0)
+ if (cfg->create != 0 && (ret = create_tables(cfg)) != 0)
goto err;
/* Start the monitor thread. */
- if (opts->sample_interval != 0) {
+ if (cfg->sample_interval != 0) {
if ((ret = pthread_create(
&monitor_thread, NULL, monitor, cfg)) != 0) {
- lprintf(cfg, ret, 0, "Error creating monitor thread.");
+ lprintf(
+ cfg, ret, 0, "Error creating monitor thread.");
goto err;
}
monitor_created = 1;
}
/* If creating, populate the table. */
- if (opts->create != 0 && execute_populate(cfg) != 0)
+ if (cfg->create != 0 && execute_populate(cfg) != 0)
goto err;
/* Optional workload. */
if (cfg->workers_cnt != 0 &&
- (opts->run_time != 0 || opts->run_ops != 0)) {
+ (cfg->run_time != 0 || cfg->run_ops != 0)) {
/*
* If we have a workload, close and reopen the connection so
* that LSM can detect read-only workloads.
@@ -2205,18 +2075,18 @@ start_run(CONFIG *cfg)
goto err;
/* Didn't create, set insert count. */
- if (opts->create == 0 &&
- opts->random_range == 0 && find_table_count(cfg) != 0)
+ if (cfg->create == 0 && cfg->random_range == 0 &&
+ find_table_count(cfg) != 0)
goto err;
/* Start the checkpoint thread. */
- if (opts->checkpoint_threads != 0) {
+ if (cfg->checkpoint_threads != 0) {
lprintf(cfg, 0, 1,
"Starting %" PRIu32 " checkpoint thread(s)",
- opts->checkpoint_threads);
+ cfg->checkpoint_threads);
cfg->ckptthreads = dcalloc(
- opts->checkpoint_threads, sizeof(CONFIG_THREAD));
+ cfg->checkpoint_threads, sizeof(CONFIG_THREAD));
if (start_threads(cfg, NULL, cfg->ckptthreads,
- opts->checkpoint_threads, checkpoint_worker) != 0)
+ cfg->checkpoint_threads, checkpoint_worker) != 0)
goto err;
}
/* Execute the workload. */
@@ -2231,7 +2101,7 @@ start_run(CONFIG *cfg)
cfg->ckpt_ops = sum_ckpt_ops(cfg);
total_ops = cfg->read_ops + cfg->insert_ops + cfg->update_ops;
- run_time = opts->run_time == 0 ? 1 : opts->run_time;
+ run_time = cfg->run_time == 0 ? 1 : cfg->run_time;
lprintf(cfg, 0, 1,
"Executed %" PRIu64 " read operations (%" PRIu64
"%%) %" PRIu64 " ops/sec",
@@ -2265,7 +2135,7 @@ err: if (ret == 0)
}
/* Notify the worker threads they are done. */
- cfg->stop = true;
+ cfg->stop = 1;
if ((t_ret = stop_threads(cfg, 1, cfg->ckptthreads)) != 0)
if (ret == 0)
@@ -2287,13 +2157,12 @@ err: if (ret == 0)
}
if (ret == 0) {
- if (opts->run_time == 0 && opts->run_ops == 0)
+ if (cfg->run_time == 0 && cfg->run_ops == 0)
lprintf(cfg, 0, 1, "Run completed");
else
lprintf(cfg, 0, 1, "Run completed: %" PRIu32 " %s",
- opts->run_time == 0 ?
- opts->run_ops : opts->run_time,
- opts->run_time == 0 ? "operations" : "seconds");
+ cfg->run_time == 0 ? cfg->run_ops : cfg->run_time,
+ cfg->run_time == 0 ? "operations" : "seconds");
}
if (cfg->logf != NULL) {
@@ -2309,55 +2178,32 @@ extern int __wt_optind, __wt_optreset;
extern char *__wt_optarg;
void (*custom_die)(void) = NULL;
-/*
- * usage --
- * wtperf usage print, no error.
- */
-static void
-usage(void)
-{
- printf("wtperf [-C config] "
- "[-H mount] [-h home] [-O file] [-o option] [-T config]\n");
- printf("\t-C <string> additional connection configuration\n");
- printf("\t (added to option conn_config)\n");
- printf("\t-H <mount> configure Helium volume mount point\n");
- printf("\t-h <string> Wired Tiger home must exist, default WT_TEST\n");
- printf("\t-O <file> file contains options as listed below\n");
- printf("\t-o option=val[,option=val,...] set options listed below\n");
- printf("\t-T <string> additional table configuration\n");
- printf("\t (added to option table_config)\n");
- printf("\n");
- config_opt_usage();
-}
-
int
main(int argc, char *argv[])
{
CONFIG *cfg, _cfg;
- CONFIG_OPTS *opts;
size_t req_len, sreq_len;
bool monitor_set;
int ch, ret;
- const char *cmdflags = "C:h:m:O:o:T:";
+ const char *opts = "C:h:m:O:o:T:";
const char *config_opts;
- char *cc_buf, *path, *sess_cfg, *tc_buf, *user_cconfig, *user_tconfig;
-
- /* The first CONFIG structure (from which all others are derived). */
- cfg = &_cfg;
- memset(cfg, 0, sizeof(*cfg));
- cfg->home = dstrdup(DEFAULT_HOME);
- cfg->monitor_dir = dstrdup(DEFAULT_MONITOR_DIR);
- TAILQ_INIT(&cfg->stone_head);
- config_opt_init(&cfg->opts);
+ char *cc_buf, *sess_cfg, *tc_buf, *user_cconfig, *user_tconfig;
- opts = cfg->opts;
monitor_set = false;
ret = 0;
config_opts = NULL;
cc_buf = sess_cfg = tc_buf = user_cconfig = user_tconfig = NULL;
+ /* Setup the default configuration values. */
+ cfg = &_cfg;
+ memset(cfg, 0, sizeof(*cfg));
+ if (config_copy(cfg, &default_cfg))
+ goto err;
+ cfg->home = dstrdup(DEFAULT_HOME);
+ cfg->monitor_dir = dstrdup(DEFAULT_MONITOR_DIR);
+
/* Do a basic validation of options, and home is needed before open. */
- while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
+ while ((ch = __wt_getopt("wtperf", argc, argv, opts)) != EOF)
switch (ch) {
case 'C':
if (user_cconfig == NULL)
@@ -2413,16 +2259,16 @@ main(int argc, char *argv[])
/* Parse options that override values set via a configuration file. */
__wt_optreset = __wt_optind = 1;
- while ((ch = __wt_getopt("wtperf", argc, argv, cmdflags)) != EOF)
+ while ((ch = __wt_getopt("wtperf", argc, argv, opts)) != EOF)
switch (ch) {
case 'o':
/* Allow -o key=value */
- if (config_opt_str(cfg, __wt_optarg) != 0)
+ if (config_opt_line(cfg, __wt_optarg) != 0)
goto einval;
break;
}
- if (opts->populate_threads == 0 && opts->icount != 0) {
+ if (cfg->populate_threads == 0 && cfg->icount != 0) {
lprintf(cfg, 1, 0,
"Cannot have 0 populate threads when icount is set\n");
goto err;
@@ -2434,16 +2280,16 @@ main(int argc, char *argv[])
* If the user wants compaction, then we also enable async for
* the compact operation, but not for the workloads.
*/
- if (opts->async_threads > 0) {
+ if (cfg->async_threads > 0) {
if (F_ISSET(cfg, CFG_TRUNCATE)) {
lprintf(cfg, 1, 0, "Cannot run truncate and async\n");
goto err;
}
- cfg->use_asyncops = true;
+ cfg->use_asyncops = 1;
}
- if (opts->compact && opts->async_threads == 0)
- opts->async_threads = 2;
- if (opts->async_threads > 0) {
+ if (cfg->compact && cfg->async_threads == 0)
+ cfg->async_threads = 2;
+ if (cfg->async_threads > 0) {
/*
* The maximum number of async threads is two digits, so just
* use that to compute the space we need. Assume the default
@@ -2454,29 +2300,34 @@ main(int argc, char *argv[])
cfg->async_config = dmalloc(req_len);
snprintf(cfg->async_config, req_len,
",async=(enabled=true,threads=%" PRIu32 ")",
- opts->async_threads);
+ cfg->async_threads);
}
if ((ret = config_compress(cfg)) != 0)
goto err;
/* You can't have truncate on a random collection. */
- if (F_ISSET(cfg, CFG_TRUNCATE) && opts->random_range) {
+ if (F_ISSET(cfg, CFG_TRUNCATE) && cfg->random_range) {
lprintf(cfg, 1, 0, "Cannot run truncate and random_range\n");
goto err;
}
/* We can't run truncate with more than one table. */
- if (F_ISSET(cfg, CFG_TRUNCATE) && opts->table_count > 1) {
+ if (F_ISSET(cfg, CFG_TRUNCATE) && cfg->table_count > 1) {
lprintf(cfg, 1, 0, "Cannot truncate more than 1 table\n");
goto err;
}
+ /* Build the URI from the table name. */
+ req_len = strlen("table:") + strlen(cfg->table_name) + 2;
+ cfg->base_uri = dmalloc(req_len);
+ snprintf(cfg->base_uri, req_len, "table:%s", cfg->table_name);
+
/* Make stdout line buffered, so verbose output appears quickly. */
__wt_stream_set_line_buffer(stdout);
/* Concatenate non-default configuration strings. */
- if (opts->verbose > 1 || user_cconfig != NULL ||
- opts->session_count_idle > 0 || cfg->compress_ext != NULL ||
+ if (cfg->verbose > 1 || user_cconfig != NULL ||
+ cfg->session_count_idle > 0 || cfg->compress_ext != NULL ||
cfg->async_config != NULL) {
req_len = strlen(debug_cconfig) + 3;
if (user_cconfig != NULL)
@@ -2485,14 +2336,14 @@ main(int argc, char *argv[])
req_len += strlen(cfg->async_config);
if (cfg->compress_ext != NULL)
req_len += strlen(cfg->compress_ext);
- if (opts->session_count_idle > 0) {
+ if (cfg->session_count_idle > 0) {
sreq_len = strlen(",session_max=") + 6;
req_len += sreq_len;
sess_cfg = dmalloc(sreq_len);
snprintf(sess_cfg, sreq_len,
",session_max=%" PRIu32,
- opts->session_count_idle +
- cfg->workers_cnt + opts->populate_threads + 10);
+ cfg->session_count_idle + cfg->workers_cnt +
+ cfg->populate_threads + 10);
}
cc_buf = dmalloc(req_len);
/*
@@ -2501,82 +2352,81 @@ main(int argc, char *argv[])
snprintf(cc_buf, req_len, "%s%s%s%s%s%s%s",
cfg->async_config ? cfg->async_config : "",
cfg->compress_ext ? cfg->compress_ext : "",
- opts->verbose > 1 && strlen(debug_cconfig) ? ",": "",
- opts->verbose > 1 &&
+ cfg->verbose > 1 && strlen(debug_cconfig) ? ",": "",
+ cfg->verbose > 1 &&
strlen(debug_cconfig) ? debug_cconfig : "",
sess_cfg ? sess_cfg : "",
user_cconfig ? ",": "",
user_cconfig ? user_cconfig : "");
- if (strlen(cc_buf) && (ret =
- config_opt_name_value(cfg, "conn_config", cc_buf)) != 0)
- goto err;
+ if (strlen(cc_buf))
+ if ((ret = config_opt_str(
+ cfg, "conn_config", cc_buf)) != 0)
+ goto err;
}
- if (opts->verbose > 1 || opts->index ||
+ if (cfg->verbose > 1 || cfg->index ||
user_tconfig != NULL || cfg->compress_table != NULL) {
req_len = strlen(debug_tconfig) + 3;
if (user_tconfig != NULL)
req_len += strlen(user_tconfig);
if (cfg->compress_table != NULL)
req_len += strlen(cfg->compress_table);
- if (opts->index)
+ if (cfg->index)
req_len += strlen(INDEX_COL_NAMES);
tc_buf = dmalloc(req_len);
/*
* This is getting hard to parse.
*/
snprintf(tc_buf, req_len, "%s%s%s%s%s%s",
- opts->index ? INDEX_COL_NAMES : "",
+ cfg->index ? INDEX_COL_NAMES : "",
cfg->compress_table ? cfg->compress_table : "",
- opts->verbose > 1 && strlen(debug_tconfig) ? ",": "",
- opts->verbose > 1 &&
+ cfg->verbose > 1 && strlen(debug_tconfig) ? ",": "",
+ cfg->verbose > 1 &&
strlen(debug_tconfig) ? debug_tconfig : "",
user_tconfig ? ",": "",
user_tconfig ? user_tconfig : "");
- if (strlen(tc_buf) && (ret =
- config_opt_name_value(cfg, "table_config", tc_buf)) != 0)
- goto err;
+ if (strlen(tc_buf))
+ if ((ret = config_opt_str(
+ cfg, "table_config", tc_buf)) != 0)
+ goto err;
}
- if (opts->log_partial && opts->table_count > 1) {
- req_len = strlen(opts->table_config) +
+ if (cfg->log_partial && cfg->table_count > 1) {
+ req_len = strlen(cfg->table_config) +
strlen(LOG_PARTIAL_CONFIG) + 1;
cfg->partial_config = dmalloc(req_len);
snprintf(cfg->partial_config, req_len, "%s%s",
- opts->table_config, LOG_PARTIAL_CONFIG);
+ cfg->table_config, LOG_PARTIAL_CONFIG);
}
/*
* Set the config for reopen. If readonly add in that string.
* If not readonly then just copy the original conn_config.
*/
- if (opts->readonly)
- req_len = strlen(opts->conn_config) +
+ if (cfg->readonly)
+ req_len = strlen(cfg->conn_config) +
strlen(READONLY_CONFIG) + 1;
else
- req_len = strlen(opts->conn_config) + 1;
+ req_len = strlen(cfg->conn_config) + 1;
cfg->reopen_config = dmalloc(req_len);
- if (opts->readonly)
+ if (cfg->readonly)
snprintf(cfg->reopen_config, req_len, "%s%s",
- opts->conn_config, READONLY_CONFIG);
+ cfg->conn_config, READONLY_CONFIG);
else
- snprintf(cfg->reopen_config, req_len, "%s", opts->conn_config);
+ snprintf(cfg->reopen_config, req_len, "%s",
+ cfg->conn_config);
/* Sanity-check the configuration. */
if ((ret = config_sanity(cfg)) != 0)
goto err;
/* If creating, remove and re-create the home directory. */
- if (opts->create != 0)
+ if (cfg->create != 0)
recreate_dir(cfg->home);
/* Write a copy of the config. */
- req_len = strlen(cfg->home) + strlen("/CONFIG.wtperf") + 1;
- path = dmalloc(req_len);
- snprintf(path, req_len, "%s/CONFIG.wtperf", cfg->home);
- config_opt_log(opts, path);
- free(path);
+ config_to_file(cfg);
/* Display the configuration. */
- if (opts->verbose > 1)
- config_opt_print(cfg);
+ if (cfg->verbose > 1)
+ config_print(cfg);
if ((ret = start_all_runs(cfg)) != 0)
goto err;
@@ -2586,8 +2436,6 @@ einval: ret = EINVAL;
}
err: config_free(cfg);
- config_opt_cleanup(opts);
-
free(cc_buf);
free(sess_cfg);
free(tc_buf);
@@ -2601,13 +2449,10 @@ static int
start_threads(CONFIG *cfg,
WORKLOAD *workp, CONFIG_THREAD *base, u_int num, void *(*func)(void *))
{
- CONFIG_OPTS *opts;
CONFIG_THREAD *thread;
u_int i;
int ret;
- opts = cfg->opts;
-
/* Initialize the threads. */
for (i = 0, thread = base; i < num; ++i, ++thread) {
thread->cfg = cfg;
@@ -2628,14 +2473,14 @@ start_threads(CONFIG *cfg,
* don't, it's not enough memory to bother. These buffers hold
* strings: trailing NUL is included in the size.
*/
- thread->key_buf = dcalloc(opts->key_sz, 1);
- thread->value_buf = dcalloc(opts->value_sz_max, 1);
+ thread->key_buf = dcalloc(cfg->key_sz, 1);
+ thread->value_buf = dcalloc(cfg->value_sz_max, 1);
/*
* Initialize and then toss in a bit of random values if needed.
*/
- memset(thread->value_buf, 'a', opts->value_sz - 1);
- if (opts->random_value)
+ memset(thread->value_buf, 'a', cfg->value_sz - 1);
+ if (cfg->random_value)
randomize_value(thread, thread->value_buf);
/*
@@ -2707,24 +2552,22 @@ static int
drop_all_tables(CONFIG *cfg)
{
struct timespec start, stop;
- CONFIG_OPTS *opts;
WT_SESSION *session;
size_t i;
uint64_t msecs;
int ret, t_ret;
- opts = cfg->opts;
-
/* Drop any tables. */
if ((ret = cfg->conn->open_session(
- cfg->conn, NULL, opts->sess_config, &session)) != 0) {
+ cfg->conn, NULL, cfg->sess_config, &session)) != 0) {
lprintf(cfg, ret, 0,
"Error opening a session on %s", cfg->home);
return (ret);
}
testutil_check(__wt_epoch(NULL, &start));
- for (i = 0; i < opts->table_count; i++) {
- if ((ret = session->drop(session, cfg->uris[i], NULL)) != 0) {
+ for (i = 0; i < cfg->table_count; i++) {
+ if ((ret = session->drop(
+ session, cfg->uris[i], NULL)) != 0) {
lprintf(cfg, ret, 0,
"Error dropping table %s", cfg->uris[i]);
goto err;
@@ -2734,7 +2577,7 @@ drop_all_tables(CONFIG *cfg)
msecs = WT_TIMEDIFF_MS(stop, start);
lprintf(cfg, 0, 1,
"Executed %" PRIu32 " drop operations average time %" PRIu64 "ms",
- opts->table_count, msecs / opts->table_count);
+ cfg->table_count, msecs / cfg->table_count);
err: if ((t_ret = session->close(session, NULL)) != 0 && ret == 0)
ret = t_ret;
@@ -2744,31 +2587,25 @@ err: if ((t_ret = session->close(session, NULL)) != 0 && ret == 0)
static uint64_t
wtperf_value_range(CONFIG *cfg)
{
- CONFIG_OPTS *opts;
-
- opts = cfg->opts;
-
- if (opts->random_range)
- return (opts->icount + opts->random_range);
+ if (cfg->random_range)
+ return (cfg->icount + cfg->random_range);
/*
* It is legal to configure a zero size populate phase, hide that
* from other code by pretending the range is 1 in that case.
*/
- if (opts->icount + cfg->insert_key == 0)
+ if (cfg->icount + cfg->insert_key == 0)
return (1);
- return (opts->icount + cfg->insert_key - (u_int)(cfg->workers_cnt + 1));
+ return (cfg->icount + cfg->insert_key - (u_int)(cfg->workers_cnt + 1));
}
static uint64_t
wtperf_rand(CONFIG_THREAD *thread)
{
CONFIG *cfg;
- CONFIG_OPTS *opts;
double S1, S2, U;
uint64_t rval;
cfg = thread->cfg;
- opts = cfg->opts;
/*
* Use WiredTiger's random number routine: it's lock-free and fairly
@@ -2777,11 +2614,11 @@ wtperf_rand(CONFIG_THREAD *thread)
rval = __wt_random(&thread->rnd);
/* Use Pareto distribution to give 80/20 hot/cold values. */
- if (opts->pareto != 0) {
+ if (cfg->pareto != 0) {
#define PARETO_SHAPE 1.5
S1 = (-1 / PARETO_SHAPE);
S2 = wtperf_value_range(cfg) *
- (opts->pareto / 100.0) * (PARETO_SHAPE - 1);
+ (cfg->pareto / 100.0) * (PARETO_SHAPE - 1);
U = 1 - (double)rval / (double)UINT32_MAX;
rval = (uint64_t)((pow(U, S1) - 1) * S2);
/*