summaryrefslogtreecommitdiff
path: root/bench
diff options
context:
space:
mode:
authorAlex Gorrod <alexander.gorrod@mongodb.com>2017-02-03 03:28:50 +1100
committersueloverso <sue@mongodb.com>2017-02-02 11:28:50 -0500
commit3e68fb2d7da35eeb122308971f02203c58caa538 (patch)
treeef563addbea357f1738eb7042a931f57758756fd /bench
parent0562f92104f0b2d8ef218d9fe465ef718bc2d9cd (diff)
downloadmongo-3e68fb2d7da35eeb122308971f02203c58caa538.tar.gz
WT-3139 Enhance wtperf to support periodic table scans (#3268)
* Enhance wtperf to support periodic table scans * Implement scans as read_range. * Use a random cursor to set key in table properly. * Don't allow insert workload with table specifier. * Reset the rand cursor so it isn't positioned. * Make wtperf pre_load_data an option.
Diffstat (limited to 'bench')
-rw-r--r--bench/wtperf/config.c42
-rw-r--r--bench/wtperf/idle_table_cycle.c2
-rw-r--r--bench/wtperf/stress/btree-split-stress.wtperf3
-rw-r--r--bench/wtperf/wtperf.c163
-rw-r--r--bench/wtperf/wtperf.h5
-rw-r--r--bench/wtperf/wtperf_opt.i10
6 files changed, 180 insertions, 45 deletions
diff --git a/bench/wtperf/config.c b/bench/wtperf/config.c
index a15a3485dde..9eea99eeec4 100644
--- a/bench/wtperf/config.c
+++ b/bench/wtperf/config.c
@@ -215,6 +215,7 @@ config_threads(WTPERF *wtperf, const char *config, size_t len)
return (EINVAL);
}
workp = &wtperf->workload[wtperf->workload_cnt++];
+ workp->table_index = INT32_MAX;
while ((ret = scan->next(scan, &k, &v)) == 0) {
if (STRING_MATCH("count", k.str, k.len)) {
@@ -233,12 +234,28 @@ config_threads(WTPERF *wtperf, const char *config, size_t len)
goto err;
continue;
}
+ if (STRING_MATCH("pause", k.str, k.len)) {
+ if ((workp->pause = v.val) < 0)
+ goto err;
+ continue;
+ }
if (STRING_MATCH("read", k.str, k.len) ||
STRING_MATCH("reads", k.str, k.len)) {
if ((workp->read = v.val) < 0)
goto err;
continue;
}
+ if (STRING_MATCH("read_range", k.str, k.len)) {
+ if ((workp->read_range = v.val) < 0)
+ goto err;
+ continue;
+ }
+ if (STRING_MATCH("table", k.str, k.len)) {
+ if (v.val <= 0)
+ goto err;
+ workp->table_index = (int32_t)v.val - 1;
+ continue;
+ }
if (STRING_MATCH("throttle", k.str, k.len)) {
workp->throttle = (uint64_t)v.val;
continue;
@@ -760,16 +777,33 @@ config_sanity(WTPERF *wtperf)
opts->value_sz_min = opts->value_sz;
}
- if (opts->readonly && wtperf->workload != NULL)
+ if (wtperf->workload != NULL)
for (i = 0, workp = wtperf->workload;
- i < wtperf->workload_cnt; ++i, ++workp)
- if (workp->insert != 0 || workp->update != 0 ||
- workp->truncate != 0) {
+ i < wtperf->workload_cnt; ++i, ++workp) {
+ if (opts->readonly &&
+ (workp->insert != 0 || workp->update != 0 ||
+ workp->truncate != 0)) {
fprintf(stderr,
"Invalid workload: insert, update or "
"truncate specified with readonly\n");
return (EINVAL);
}
+ if (workp->insert != 0 &&
+ workp->table_index != INT32_MAX) {
+ fprintf(stderr,
+ "Invalid workload: Cannot insert into "
+ "specific table only\n");
+ return (EINVAL);
+ }
+ if (workp->table_index != INT32_MAX &&
+ workp->table_index >= (int32_t)opts->table_count) {
+ fprintf(stderr,
+ "Workload table index %" PRId32
+ " is larger than table count %" PRId32,
+ workp->table_index, opts->table_count);
+ return (EINVAL);
+ }
+ }
return (0);
}
diff --git a/bench/wtperf/idle_table_cycle.c b/bench/wtperf/idle_table_cycle.c
index 13fa55e86f5..bb44cfbde59 100644
--- a/bench/wtperf/idle_table_cycle.c
+++ b/bench/wtperf/idle_table_cycle.c
@@ -120,6 +120,7 @@ cycle_idle_tables(void *arg)
return (NULL);
start = stop;
+#if 1
/*
* Drop the table. Keep retrying on EBUSY failure - it is an
* expected return when checkpoints are happening.
@@ -136,6 +137,7 @@ cycle_idle_tables(void *arg)
}
if (check_timing(wtperf, "drop", start, &stop) != 0)
return (NULL);
+#endif
}
return (NULL);
diff --git a/bench/wtperf/stress/btree-split-stress.wtperf b/bench/wtperf/stress/btree-split-stress.wtperf
index 86bb288fc6d..eb6ca1cfddc 100644
--- a/bench/wtperf/stress/btree-split-stress.wtperf
+++ b/bench/wtperf/stress/btree-split-stress.wtperf
@@ -6,5 +6,4 @@ run_time=300
reopen_connection=false
populate_threads=2
value_sz=256
-read_range=100
-threads=((count=4,inserts=1,throttle=100000),(count=8,reads=1))
+threads=((count=4,inserts=1,throttle=100000),(count=8,reads=1,read_range=100))
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c
index baa259f8817..044fd38dc06 100644
--- a/bench/wtperf/wtperf.c
+++ b/bench/wtperf/wtperf.c
@@ -432,19 +432,17 @@ err: wtperf->error = wtperf->stop = true;
* search do them. Ensuring the keys we see are always in order.
*/
static int
-do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor)
+do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor, int64_t read_range)
{
- CONFIG_OPTS *opts;
- size_t range;
uint64_t next_val, prev_val;
+ int64_t range;
char *range_key_buf;
char buf[512];
int ret;
- opts = wtperf->opts;
ret = 0;
- if (opts->read_range == 0)
+ if (read_range == 0)
return (0);
memset(&buf[0], 0, 512 * sizeof(char));
@@ -454,7 +452,7 @@ do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor)
testutil_check(cursor->get_key(cursor, &range_key_buf));
extract_key(range_key_buf, &next_val);
- for (range = 0; range < opts->read_range; ++range) {
+ for (range = 0; range < read_range; ++range) {
prev_val = next_val;
ret = cursor->next(cursor);
/* We are done if we reach the end. */
@@ -475,12 +473,56 @@ do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor)
return (0);
}
+/* pre_load_data --
+ * Pull everything into cache before starting the workload phase.
+ */
+static int
+pre_load_data(WTPERF *wtperf)
+{
+ CONFIG_OPTS *opts;
+ WT_CONNECTION *conn;
+ WT_CURSOR *cursor;
+ WT_SESSION *session;
+ char *key;
+ int ret;
+ size_t i;
+
+ opts = wtperf->opts;
+ conn = wtperf->conn;
+
+ if ((ret = conn->open_session(
+ conn, NULL, opts->sess_config, &session)) != 0) {
+ lprintf(wtperf, ret, 0, "worker: WT_CONNECTION.open_session");
+ goto err;
+ }
+ for (i = 0; i < opts->table_count; i++) {
+ if ((ret = session->open_cursor(session,
+ wtperf->uris[i], NULL, NULL, &cursor)) != 0) {
+ lprintf(wtperf, ret, 0,
+ "worker: WT_SESSION.open_cursor: %s",
+ wtperf->uris[i]);
+ goto err;
+ }
+ while (cursor->next(cursor) == 0)
+ if ((ret = cursor->get_key(cursor, &key)) != 0)
+ goto err;
+ if ((ret = cursor->close(cursor)) != 0)
+ goto err;
+ }
+ if ((ret = session->close(session, NULL)) != 0)
+ goto err;
+ if (ret != 0)
+err: lprintf(wtperf, ret, 0, "Pre-workload traverse error");
+ return (ret);
+}
+
static void *
worker(void *arg)
{
struct timespec start, stop;
CONFIG_OPTS *opts;
TRACK *trk;
+ WORKLOAD *workload;
WTPERF *wtperf;
WTPERF_THREAD *thread;
WT_CONNECTION *conn;
@@ -495,13 +537,14 @@ worker(void *arg)
char buf[512];
thread = (WTPERF_THREAD *)arg;
+ workload = thread->workload;
wtperf = thread->wtperf;
opts = wtperf->opts;
conn = wtperf->conn;
cursors = NULL;
- log_table_cursor = NULL; /* -Wconditional-initialized */
+ cursor = log_table_cursor = NULL; /* -Wconditional-initialized */
ops = 0;
- ops_per_txn = thread->workload->ops_per_txn;
+ ops_per_txn = workload->ops_per_txn;
session = NULL;
trk = NULL;
@@ -510,7 +553,6 @@ worker(void *arg)
lprintf(wtperf, ret, 0, "worker: WT_CONNECTION.open_session");
goto err;
}
- cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
for (i = 0; i < opts->table_count_idle; i++) {
snprintf(buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i);
if ((ret = session->open_cursor(
@@ -525,14 +567,34 @@ worker(void *arg)
goto err;
}
}
- for (i = 0; i < opts->table_count; i++) {
+ if (workload->table_index != INT32_MAX) {
if ((ret = session->open_cursor(session,
- wtperf->uris[i], NULL, NULL, &cursors[i])) != 0) {
+ wtperf->uris[workload->table_index],
+ NULL, NULL, &cursor)) != 0) {
lprintf(wtperf, ret, 0,
"worker: WT_SESSION.open_cursor: %s",
- wtperf->uris[i]);
+ wtperf->uris[workload->table_index]);
+ goto err;
+ }
+ if ((ret = session->open_cursor(session,
+ wtperf->uris[workload->table_index],
+ NULL, "next_random=true", &thread->rand_cursor)) != 0) {
+ lprintf(wtperf, ret, 0,
+ "worker: WT_SESSION.open_cursor: random %s",
+ wtperf->uris[workload->table_index]);
goto err;
}
+ } else {
+ cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *));
+ for (i = 0; i < opts->table_count; i++) {
+ if ((ret = session->open_cursor(session,
+ wtperf->uris[i], NULL, NULL, &cursors[i])) != 0) {
+ lprintf(wtperf, ret, 0,
+ "worker: WT_SESSION.open_cursor: %s",
+ wtperf->uris[i]);
+ goto err;
+ }
+ }
}
if (opts->log_like_table && (ret = session->open_cursor(session,
wtperf->log_table_uri, NULL, NULL, &log_table_cursor)) != 0) {
@@ -543,19 +605,19 @@ worker(void *arg)
}
/* Setup the timer for throttling. */
- if (thread->workload->throttle != 0)
+ if (workload->throttle != 0)
setup_throttle(thread);
/* Setup for truncate */
- if (thread->workload->truncate != 0)
+ if (workload->truncate != 0)
if ((ret = setup_truncate(wtperf, thread, session)) != 0)
goto err;
key_buf = thread->key_buf;
value_buf = thread->value_buf;
- op = thread->workload->ops;
- op_end = op + sizeof(thread->workload->ops);
+ op = workload->ops;
+ op_end = op + sizeof(workload->ops);
if ((ops_per_txn != 0 || opts->log_like_table) &&
(ret = session->begin_transaction(session, NULL)) != 0) {
@@ -564,6 +626,8 @@ worker(void *arg)
}
while (!wtperf->stop) {
+ if (workload->pause != 0)
+ (void)sleep((unsigned int)workload->pause);
/*
* Generate the next key and setup operation specific
* statistics tracking objects.
@@ -603,10 +667,12 @@ worker(void *arg)
generate_key(opts, key_buf, next_val);
- /*
- * Spread the data out around the multiple databases.
- */
- cursor = cursors[map_key_to_table(wtperf->opts, next_val)];
+ if (workload->table_index == INT32_MAX)
+ /*
+ * Spread the data out around the multiple databases.
+ */
+ cursor = cursors[
+ map_key_to_table(wtperf->opts, next_val)];
/*
* Skip the first time we do an operation, when trk->ops
@@ -642,7 +708,8 @@ worker(void *arg)
* for several operations, confirming that the
* next key is in the correct order.
*/
- ret = do_range_reads(wtperf, cursor);
+ ret = do_range_reads(wtperf,
+ cursor, workload->read_range);
}
if (ret == 0 || ret == WT_NOTFOUND)
@@ -689,7 +756,7 @@ worker(void *arg)
*/
strncpy(value_buf,
value, opts->value_sz_max - 1);
- if (thread->workload->update_delta != 0)
+ if (workload->update_delta != 0)
update_value_delta(thread);
if (value_buf[0] == 'a')
value_buf[0] = 'b';
@@ -806,7 +873,7 @@ op_err: if (ret == WT_ROLLBACK && ops_per_txn != 0) {
/* Schedule the next operation */
if (++op == op_end)
- op = thread->workload->ops;
+ op = workload->ops;
/*
* Decrement throttle ops and check if we should sleep
@@ -843,7 +910,7 @@ run_mix_schedule_op(WORKLOAD *workp, int op, int64_t op_cnt)
uint8_t *p, *end;
/* Jump around the array to roughly spread out the operations. */
- jump = 100 / op_cnt;
+ jump = (int)(100 / op_cnt);
/*
* Find a read operation and replace it with another operation. This
@@ -884,17 +951,6 @@ run_mix_schedule(WTPERF *wtperf, WORKLOAD *workp)
opts = wtperf->opts;
- /* Confirm reads, inserts, truncates and updates cannot all be zero. */
- if (workp->insert == 0 && workp->read == 0 &&
- workp->truncate == 0 && workp->update == 0) {
- lprintf(wtperf, EINVAL, 0, "no operations scheduled");
- return (EINVAL);
- }
-
- /*
- * Handle truncate first - it's a special case that can't be used in
- * a mixed workload.
- */
if (workp->truncate != 0) {
if (workp->insert != 0 ||
workp->read != 0 || workp->update != 0) {
@@ -906,6 +962,12 @@ run_mix_schedule(WTPERF *wtperf, WORKLOAD *workp)
return (0);
}
+ /* Confirm reads, inserts and updates cannot all be zero. */
+ if (workp->insert == 0 && workp->read == 0 && workp->update == 0) {
+ lprintf(wtperf, EINVAL, 0, "no operations scheduled");
+ return (EINVAL);
+ }
+
/*
* Check for a simple case where the thread is only doing insert or
* update operations (because the default operation for a
@@ -2244,6 +2306,8 @@ start_run(WTPERF *wtperf)
opts->checkpoint_threads, checkpoint_worker) != 0)
goto err;
}
+ if (opts->pre_load_data && (ret = pre_load_data(wtperf)) != 0)
+ goto err;
/* Execute the workload. */
if ((ret = execute_workload(wtperf)) != 0)
goto err;
@@ -2827,14 +2891,43 @@ static uint64_t
wtperf_rand(WTPERF_THREAD *thread)
{
CONFIG_OPTS *opts;
+ WT_CURSOR *rnd_cursor;
WTPERF *wtperf;
double S1, S2, U;
uint64_t rval;
+ int ret;
+ char *key_buf;
wtperf = thread->wtperf;
opts = wtperf->opts;
/*
+ * If we have a random cursor set up then use it.
+ */
+ if ((rnd_cursor = thread->rand_cursor) != NULL) {
+ if ((ret = rnd_cursor->next(rnd_cursor))) {
+ lprintf(wtperf, ret, 0, "worker: rand next failed");
+ /* 0 is outside the expected range. */
+ return (0);
+ }
+ if ((ret = rnd_cursor->get_key(rnd_cursor, &key_buf)) != 0) {
+ lprintf(wtperf, ret, 0,
+ "worker: rand next key retrieval");
+ return (0);
+ }
+ /*
+ * Resetting the cursor is not fatal. We still return the
+ * value we retrieved above. We do it so that we don't
+ * leave a cursor positioned.
+ */
+ if ((ret = rnd_cursor->reset(rnd_cursor)) != 0)
+ lprintf(wtperf, ret, 0,
+ "worker: rand cursor reset failed");
+ extract_key(key_buf, &rval);
+ return (rval);
+ }
+
+ /*
* Use WiredTiger's random number routine: it's lock-free and fairly
* good.
*/
diff --git a/bench/wtperf/wtperf.h b/bench/wtperf/wtperf.h
index 81d74e134f6..db88d0b0271 100644
--- a/bench/wtperf/wtperf.h
+++ b/bench/wtperf/wtperf.h
@@ -66,6 +66,9 @@ typedef struct {
uint64_t throttle; /* Maximum operations/second */
/* Number of operations per transaction. Zero for autocommit */
int64_t ops_per_txn;
+ int64_t pause; /* Time between scans */
+ int64_t read_range; /* Range of reads */
+ int32_t table_index; /* Table to focus ops on */
int64_t truncate; /* Truncate ratio */
uint64_t truncate_pct; /* Truncate Percent */
uint64_t truncate_count; /* Truncate Count */
@@ -225,6 +228,7 @@ typedef struct {
struct __wtperf_thread { /* Per-thread structure */
WTPERF *wtperf; /* Enclosing configuration */
+ WT_CURSOR *rand_cursor; /* Random key cursor */
WT_RAND_STATE rnd; /* Random number generation state */
@@ -241,6 +245,7 @@ struct __wtperf_thread { /* Per-thread structure */
TRACK ckpt; /* Checkpoint operations */
TRACK insert; /* Insert operations */
TRACK read; /* Read operations */
+ TRACK scan; /* Scan operations */
TRACK update; /* Update operations */
TRACK truncate; /* Truncate operations */
TRACK truncate_sleep; /* Truncate sleep operations */
diff --git a/bench/wtperf/wtperf_opt.i b/bench/wtperf/wtperf_opt.i
index 680eb53a90e..63cef4c28fb 100644
--- a/bench/wtperf/wtperf_opt.i
+++ b/bench/wtperf/wtperf_opt.i
@@ -145,12 +145,13 @@ DEF_OPT_AS_UINT32(populate_ops_per_txn, 0,
"phase, zero for auto-commit")
DEF_OPT_AS_UINT32(populate_threads, 1,
"number of populate threads, 1 for bulk load")
+DEF_OPT_AS_BOOL(pre_load_data, 0,
+ "Scan all data prior to starting the workload phase to warm the cache")
DEF_OPT_AS_UINT32(random_range, 0,
"if non zero choose a value from within this range as the key for "
"insert operations")
DEF_OPT_AS_BOOL(random_value, 0, "generate random content for the value")
DEF_OPT_AS_BOOL(range_partition, 0, "partition data by range (vs hash)")
-DEF_OPT_AS_UINT32(read_range, 0, "scan a range of keys after each search")
DEF_OPT_AS_BOOL(readonly, 0,
"reopen the connection between populate and workload phases in readonly "
"mode. Requires reopen_connection turned on (default). Requires that "
@@ -192,9 +193,10 @@ DEF_OPT_AS_STRING(threads, "", "workload configuration: each 'count' "
"'threads=((count=2,reads=1)(count=8,reads=1,inserts=2,updates=1))' "
"which would create 2 threads doing nothing but reads and 8 threads "
"each doing 50% inserts and 25% reads and updates. Allowed configuration "
- "values are 'count', 'throttle', 'update_delta', 'reads', 'inserts', "
- "'updates', 'truncate', 'truncate_pct' and 'truncate_count'. There are "
- "also behavior modifiers, supported modifiers are 'ops_per_txn'")
+ "values are 'count', 'throttle', 'update_delta', 'reads', 'read_range', "
+ "'inserts', 'updates', 'truncate', 'truncate_pct' and 'truncate_count'. "
+ "There are also behavior modifiers, supported modifiers are "
+ "'ops_per_txn'")
DEF_OPT_AS_CONFIG_STRING(transaction_config, "",
"WT_SESSION.begin_transaction configuration string, applied during the "
"populate phase when populate_ops_per_txn is nonzero")