diff options
author | Alex Gorrod <alexander.gorrod@mongodb.com> | 2017-02-03 03:28:50 +1100 |
---|---|---|
committer | sueloverso <sue@mongodb.com> | 2017-02-02 11:28:50 -0500 |
commit | 3e68fb2d7da35eeb122308971f02203c58caa538 (patch) | |
tree | ef563addbea357f1738eb7042a931f57758756fd /bench | |
parent | 0562f92104f0b2d8ef218d9fe465ef718bc2d9cd (diff) | |
download | mongo-3e68fb2d7da35eeb122308971f02203c58caa538.tar.gz |
WT-3139 Enhance wtperf to support periodic table scans (#3268)
* Enhance wtperf to support periodic table scans
* Implement scans as read_range.
* Use a random cursor to set key in table properly.
* Don't allow insert workload with table specifier.
* Reset the rand cursor so it isn't positioned.
* Make wtperf pre_load_data an option.
Diffstat (limited to 'bench')
-rw-r--r-- | bench/wtperf/config.c | 42 | ||||
-rw-r--r-- | bench/wtperf/idle_table_cycle.c | 2 | ||||
-rw-r--r-- | bench/wtperf/stress/btree-split-stress.wtperf | 3 | ||||
-rw-r--r-- | bench/wtperf/wtperf.c | 163 | ||||
-rw-r--r-- | bench/wtperf/wtperf.h | 5 | ||||
-rw-r--r-- | bench/wtperf/wtperf_opt.i | 10 |
6 files changed, 180 insertions, 45 deletions
diff --git a/bench/wtperf/config.c b/bench/wtperf/config.c index a15a3485dde..9eea99eeec4 100644 --- a/bench/wtperf/config.c +++ b/bench/wtperf/config.c @@ -215,6 +215,7 @@ config_threads(WTPERF *wtperf, const char *config, size_t len) return (EINVAL); } workp = &wtperf->workload[wtperf->workload_cnt++]; + workp->table_index = INT32_MAX; while ((ret = scan->next(scan, &k, &v)) == 0) { if (STRING_MATCH("count", k.str, k.len)) { @@ -233,12 +234,28 @@ config_threads(WTPERF *wtperf, const char *config, size_t len) goto err; continue; } + if (STRING_MATCH("pause", k.str, k.len)) { + if ((workp->pause = v.val) < 0) + goto err; + continue; + } if (STRING_MATCH("read", k.str, k.len) || STRING_MATCH("reads", k.str, k.len)) { if ((workp->read = v.val) < 0) goto err; continue; } + if (STRING_MATCH("read_range", k.str, k.len)) { + if ((workp->read_range = v.val) < 0) + goto err; + continue; + } + if (STRING_MATCH("table", k.str, k.len)) { + if (v.val <= 0) + goto err; + workp->table_index = (int32_t)v.val - 1; + continue; + } if (STRING_MATCH("throttle", k.str, k.len)) { workp->throttle = (uint64_t)v.val; continue; @@ -760,16 +777,33 @@ config_sanity(WTPERF *wtperf) opts->value_sz_min = opts->value_sz; } - if (opts->readonly && wtperf->workload != NULL) + if (wtperf->workload != NULL) for (i = 0, workp = wtperf->workload; - i < wtperf->workload_cnt; ++i, ++workp) - if (workp->insert != 0 || workp->update != 0 || - workp->truncate != 0) { + i < wtperf->workload_cnt; ++i, ++workp) { + if (opts->readonly && + (workp->insert != 0 || workp->update != 0 || + workp->truncate != 0)) { fprintf(stderr, "Invalid workload: insert, update or " "truncate specified with readonly\n"); return (EINVAL); } + if (workp->insert != 0 && + workp->table_index != INT32_MAX) { + fprintf(stderr, + "Invalid workload: Cannot insert into " + "specific table only\n"); + return (EINVAL); + } + if (workp->table_index != INT32_MAX && + workp->table_index >= (int32_t)opts->table_count) { + fprintf(stderr, + "Workload table index %" PRId32 + " is larger than table count %" PRId32, + workp->table_index, opts->table_count); + return (EINVAL); + } + } return (0); } diff --git a/bench/wtperf/idle_table_cycle.c b/bench/wtperf/idle_table_cycle.c index 13fa55e86f5..bb44cfbde59 100644 --- a/bench/wtperf/idle_table_cycle.c +++ b/bench/wtperf/idle_table_cycle.c @@ -120,6 +120,7 @@ cycle_idle_tables(void *arg) return (NULL); start = stop; +#if 1 /* * Drop the table. Keep retrying on EBUSY failure - it is an * expected return when checkpoints are happening. @@ -136,6 +137,7 @@ cycle_idle_tables(void *arg) } if (check_timing(wtperf, "drop", start, &stop) != 0) return (NULL); +#endif } return (NULL); diff --git a/bench/wtperf/stress/btree-split-stress.wtperf b/bench/wtperf/stress/btree-split-stress.wtperf index 86bb288fc6d..eb6ca1cfddc 100644 --- a/bench/wtperf/stress/btree-split-stress.wtperf +++ b/bench/wtperf/stress/btree-split-stress.wtperf @@ -6,5 +6,4 @@ run_time=300 reopen_connection=false populate_threads=2 value_sz=256 -read_range=100 -threads=((count=4,inserts=1,throttle=100000),(count=8,reads=1)) +threads=((count=4,inserts=1,throttle=100000),(count=8,reads=1,read_range=100)) diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c index baa259f8817..044fd38dc06 100644 --- a/bench/wtperf/wtperf.c +++ b/bench/wtperf/wtperf.c @@ -432,19 +432,17 @@ err: wtperf->error = wtperf->stop = true; * search do them. Ensuring the keys we see are always in order. */ static int -do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor) +do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor, int64_t read_range) { - CONFIG_OPTS *opts; - size_t range; uint64_t next_val, prev_val; + int64_t range; char *range_key_buf; char buf[512]; int ret; - opts = wtperf->opts; ret = 0; - if (opts->read_range == 0) + if (read_range == 0) return (0); memset(&buf[0], 0, 512 * sizeof(char)); @@ -454,7 +452,7 @@ do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor) testutil_check(cursor->get_key(cursor, &range_key_buf)); extract_key(range_key_buf, &next_val); - for (range = 0; range < opts->read_range; ++range) { + for (range = 0; range < read_range; ++range) { prev_val = next_val; ret = cursor->next(cursor); /* We are done if we reach the end. */ @@ -475,12 +473,56 @@ do_range_reads(WTPERF *wtperf, WT_CURSOR *cursor) return (0); } +/* pre_load_data -- + * Pull everything into cache before starting the workload phase. + */ +static int +pre_load_data(WTPERF *wtperf) +{ + CONFIG_OPTS *opts; + WT_CONNECTION *conn; + WT_CURSOR *cursor; + WT_SESSION *session; + char *key; + int ret; + size_t i; + + opts = wtperf->opts; + conn = wtperf->conn; + + if ((ret = conn->open_session( + conn, NULL, opts->sess_config, &session)) != 0) { + lprintf(wtperf, ret, 0, "worker: WT_CONNECTION.open_session"); + goto err; + } + for (i = 0; i < opts->table_count; i++) { + if ((ret = session->open_cursor(session, + wtperf->uris[i], NULL, NULL, &cursor)) != 0) { + lprintf(wtperf, ret, 0, + "worker: WT_SESSION.open_cursor: %s", + wtperf->uris[i]); + goto err; + } + while (cursor->next(cursor) == 0) + if ((ret = cursor->get_key(cursor, &key)) != 0) + goto err; + if ((ret = cursor->close(cursor)) != 0) + goto err; + } + if ((ret = session->close(session, NULL)) != 0) + goto err; + if (ret != 0) +err: lprintf(wtperf, ret, 0, "Pre-workload traverse error"); + return (ret); +} + static void * worker(void *arg) { struct timespec start, stop; CONFIG_OPTS *opts; TRACK *trk; + WORKLOAD *workload; WTPERF *wtperf; WTPERF_THREAD *thread; WT_CONNECTION *conn; @@ -495,13 +537,14 @@ worker(void *arg) char buf[512]; thread = (WTPERF_THREAD *)arg; + workload = thread->workload; wtperf = thread->wtperf; opts = wtperf->opts; conn = wtperf->conn; cursors = NULL; - log_table_cursor = NULL; /* -Wconditional-initialized */ + cursor = log_table_cursor = NULL; /* -Wconditional-initialized */ ops = 0; - ops_per_txn = thread->workload->ops_per_txn; + ops_per_txn = workload->ops_per_txn; session = NULL; trk = NULL; @@ -510,7 +553,6 @@ worker(void *arg) lprintf(wtperf, ret, 0, "worker: WT_CONNECTION.open_session"); goto err; } - cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *)); for (i = 0; i < opts->table_count_idle; i++) { snprintf(buf, 512, "%s_idle%05d", wtperf->uris[0], (int)i); if ((ret = session->open_cursor( @@ -525,14 +567,34 @@ worker(void *arg) goto err; } } - for (i = 0; i < opts->table_count; i++) { + if (workload->table_index != INT32_MAX) { if ((ret = session->open_cursor(session, - wtperf->uris[i], NULL, NULL, &cursors[i])) != 0) { + wtperf->uris[workload->table_index], + NULL, NULL, &cursor)) != 0) { lprintf(wtperf, ret, 0, "worker: WT_SESSION.open_cursor: %s", - wtperf->uris[i]); + wtperf->uris[workload->table_index]); + goto err; + } + if ((ret = session->open_cursor(session, + wtperf->uris[workload->table_index], + NULL, "next_random=true", &thread->rand_cursor)) != 0) { + lprintf(wtperf, ret, 0, + "worker: WT_SESSION.open_cursor: random %s", + wtperf->uris[workload->table_index]); goto err; } + } else { + cursors = dcalloc(opts->table_count, sizeof(WT_CURSOR *)); + for (i = 0; i < opts->table_count; i++) { + if ((ret = session->open_cursor(session, + wtperf->uris[i], NULL, NULL, &cursors[i])) != 0) { + lprintf(wtperf, ret, 0, + "worker: WT_SESSION.open_cursor: %s", + wtperf->uris[i]); + goto err; + } + } } if (opts->log_like_table && (ret = session->open_cursor(session, wtperf->log_table_uri, NULL, NULL, &log_table_cursor)) != 0) { @@ -543,19 +605,19 @@ worker(void *arg) } /* Setup the timer for throttling. */ - if (thread->workload->throttle != 0) + if (workload->throttle != 0) setup_throttle(thread); /* Setup for truncate */ - if (thread->workload->truncate != 0) + if (workload->truncate != 0) if ((ret = setup_truncate(wtperf, thread, session)) != 0) goto err; key_buf = thread->key_buf; value_buf = thread->value_buf; - op = thread->workload->ops; - op_end = op + sizeof(thread->workload->ops); + op = workload->ops; + op_end = op + sizeof(workload->ops); if ((ops_per_txn != 0 || opts->log_like_table) && (ret = session->begin_transaction(session, NULL)) != 0) { @@ -564,6 +626,8 @@ worker(void *arg) } while (!wtperf->stop) { + if (workload->pause != 0) + (void)sleep((unsigned int)workload->pause); /* * Generate the next key and setup operation specific * statistics tracking objects. @@ -603,10 +667,12 @@ worker(void *arg) generate_key(opts, key_buf, next_val); - /* - * Spread the data out around the multiple databases. - */ - cursor = cursors[map_key_to_table(wtperf->opts, next_val)]; + if (workload->table_index == INT32_MAX) + /* + * Spread the data out around the multiple databases. + */ + cursor = cursors[ + map_key_to_table(wtperf->opts, next_val)]; /* * Skip the first time we do an operation, when trk->ops @@ -642,7 +708,8 @@ worker(void *arg) * for several operations, confirming that the * next key is in the correct order. */ - ret = do_range_reads(wtperf, cursor); + ret = do_range_reads(wtperf, + cursor, workload->read_range); } if (ret == 0 || ret == WT_NOTFOUND) @@ -689,7 +756,7 @@ worker(void *arg) */ strncpy(value_buf, value, opts->value_sz_max - 1); - if (thread->workload->update_delta != 0) + if (workload->update_delta != 0) update_value_delta(thread); if (value_buf[0] == 'a') value_buf[0] = 'b'; @@ -806,7 +873,7 @@ op_err: if (ret == WT_ROLLBACK && ops_per_txn != 0) { /* Schedule the next operation */ if (++op == op_end) - op = thread->workload->ops; + op = workload->ops; /* * Decrement throttle ops and check if we should sleep @@ -843,7 +910,7 @@ run_mix_schedule_op(WORKLOAD *workp, int op, int64_t op_cnt) uint8_t *p, *end; /* Jump around the array to roughly spread out the operations. */ - jump = 100 / op_cnt; + jump = (int)(100 / op_cnt); /* * Find a read operation and replace it with another operation. This @@ -884,17 +951,6 @@ run_mix_schedule(WTPERF *wtperf, WORKLOAD *workp) opts = wtperf->opts; - /* Confirm reads, inserts, truncates and updates cannot all be zero. */ - if (workp->insert == 0 && workp->read == 0 && - workp->truncate == 0 && workp->update == 0) { - lprintf(wtperf, EINVAL, 0, "no operations scheduled"); - return (EINVAL); - } - - /* - * Handle truncate first - it's a special case that can't be used in - * a mixed workload. - */ if (workp->truncate != 0) { if (workp->insert != 0 || workp->read != 0 || workp->update != 0) { @@ -906,6 +962,12 @@ run_mix_schedule(WTPERF *wtperf, WORKLOAD *workp) return (0); } + /* Confirm reads, inserts and updates cannot all be zero. */ + if (workp->insert == 0 && workp->read == 0 && workp->update == 0) { + lprintf(wtperf, EINVAL, 0, "no operations scheduled"); + return (EINVAL); + } + /* * Check for a simple case where the thread is only doing insert or * update operations (because the default operation for a @@ -2244,6 +2306,8 @@ start_run(WTPERF *wtperf) opts->checkpoint_threads, checkpoint_worker) != 0) goto err; } + if (opts->pre_load_data && (ret = pre_load_data(wtperf)) != 0) + goto err; /* Execute the workload. */ if ((ret = execute_workload(wtperf)) != 0) goto err; @@ -2827,14 +2891,43 @@ static uint64_t wtperf_rand(WTPERF_THREAD *thread) { CONFIG_OPTS *opts; + WT_CURSOR *rnd_cursor; WTPERF *wtperf; double S1, S2, U; uint64_t rval; + int ret; + char *key_buf; wtperf = thread->wtperf; opts = wtperf->opts; /* + * If we have a random cursor set up then use it. + */ + if ((rnd_cursor = thread->rand_cursor) != NULL) { + if ((ret = rnd_cursor->next(rnd_cursor))) { + lprintf(wtperf, ret, 0, "worker: rand next failed"); + /* 0 is outside the expected range. */ + return (0); + } + if ((ret = rnd_cursor->get_key(rnd_cursor, &key_buf)) != 0) { + lprintf(wtperf, ret, 0, + "worker: rand next key retrieval"); + return (0); + } + /* + * Resetting the cursor is not fatal. We still return the + * value we retrieved above. We do it so that we don't + * leave a cursor positioned. + */ + if ((ret = rnd_cursor->reset(rnd_cursor)) != 0) + lprintf(wtperf, ret, 0, + "worker: rand cursor reset failed"); + extract_key(key_buf, &rval); + return (rval); + } + + /* * Use WiredTiger's random number routine: it's lock-free and fairly * good. */ diff --git a/bench/wtperf/wtperf.h b/bench/wtperf/wtperf.h index 81d74e134f6..db88d0b0271 100644 --- a/bench/wtperf/wtperf.h +++ b/bench/wtperf/wtperf.h @@ -66,6 +66,9 @@ typedef struct { uint64_t throttle; /* Maximum operations/second */ /* Number of operations per transaction. Zero for autocommit */ int64_t ops_per_txn; + int64_t pause; /* Time between scans */ + int64_t read_range; /* Range of reads */ + int32_t table_index; /* Table to focus ops on */ int64_t truncate; /* Truncate ratio */ uint64_t truncate_pct; /* Truncate Percent */ uint64_t truncate_count; /* Truncate Count */ @@ -225,6 +228,7 @@ typedef struct { struct __wtperf_thread { /* Per-thread structure */ WTPERF *wtperf; /* Enclosing configuration */ + WT_CURSOR *rand_cursor; /* Random key cursor */ WT_RAND_STATE rnd; /* Random number generation state */ @@ -241,6 +245,7 @@ struct __wtperf_thread { /* Per-thread structure */ TRACK ckpt; /* Checkpoint operations */ TRACK insert; /* Insert operations */ TRACK read; /* Read operations */ + TRACK scan; /* Scan operations */ TRACK update; /* Update operations */ TRACK truncate; /* Truncate operations */ TRACK truncate_sleep; /* Truncate sleep operations */ diff --git a/bench/wtperf/wtperf_opt.i b/bench/wtperf/wtperf_opt.i index 680eb53a90e..63cef4c28fb 100644 --- a/bench/wtperf/wtperf_opt.i +++ b/bench/wtperf/wtperf_opt.i @@ -145,12 +145,13 @@ DEF_OPT_AS_UINT32(populate_ops_per_txn, 0, "phase, zero for auto-commit") DEF_OPT_AS_UINT32(populate_threads, 1, "number of populate threads, 1 for bulk load") +DEF_OPT_AS_BOOL(pre_load_data, 0, + "Scan all data prior to starting the workload phase to warm the cache") DEF_OPT_AS_UINT32(random_range, 0, "if non zero choose a value from within this range as the key for " "insert operations") DEF_OPT_AS_BOOL(random_value, 0, "generate random content for the value") DEF_OPT_AS_BOOL(range_partition, 0, "partition data by range (vs hash)") -DEF_OPT_AS_UINT32(read_range, 0, "scan a range of keys after each search") DEF_OPT_AS_BOOL(readonly, 0, "reopen the connection between populate and workload phases in readonly " "mode. Requires reopen_connection turned on (default). Requires that " @@ -192,9 +193,10 @@ DEF_OPT_AS_STRING(threads, "", "workload configuration: each 'count' " "'threads=((count=2,reads=1)(count=8,reads=1,inserts=2,updates=1))' " "which would create 2 threads doing nothing but reads and 8 threads " "each doing 50% inserts and 25% reads and updates. Allowed configuration " - "values are 'count', 'throttle', 'update_delta', 'reads', 'inserts', " - "'updates', 'truncate', 'truncate_pct' and 'truncate_count'. There are " - "also behavior modifiers, supported modifiers are 'ops_per_txn'") + "values are 'count', 'throttle', 'update_delta', 'reads', 'read_range', " + "'inserts', 'updates', 'truncate', 'truncate_pct' and 'truncate_count'. " + "There are also behavior modifiers, supported modifiers are " + "'ops_per_txn'") DEF_OPT_AS_CONFIG_STRING(transaction_config, "", "WT_SESSION.begin_transaction configuration string, applied during the " "populate phase when populate_ops_per_txn is nonzero") |