diff options
author | Susan LoVerso <sue@wiredtiger.com> | 2014-02-03 10:26:13 -0500 |
---|---|---|
committer | Susan LoVerso <sue@wiredtiger.com> | 2014-02-03 10:26:13 -0500 |
commit | 885a6c9b42f1ac2e73246f9bcb17de5dcfcdaba5 (patch) | |
tree | 6a97247ac9350f8cf46080c499d6995dda8a9abb | |
parent | 7fdf93aceca4e6bbd3d2ca52f0ffc8693f8b6b92 (diff) | |
download | mongo-885a6c9b42f1ac2e73246f9bcb17de5dcfcdaba5.tar.gz |
Add populate latency support. Change test3 workload definition.
-rw-r--r-- | bench/wtperf/runners/test3-500m-lsm.wtperf | 3 | ||||
-rw-r--r-- | bench/wtperf/track.c | 24 | ||||
-rw-r--r-- | bench/wtperf/wtperf.c | 75 |
3 files changed, 87 insertions, 15 deletions
diff --git a/bench/wtperf/runners/test3-500m-lsm.wtperf b/bench/wtperf/runners/test3-500m-lsm.wtperf index 41379eedbce..2da836d577c 100644 --- a/bench/wtperf/runners/test3-500m-lsm.wtperf +++ b/bench/wtperf/runners/test3-500m-lsm.wtperf @@ -14,4 +14,5 @@ pareto=true report_interval=10 run_time=14400 sample_interval=10 -threads=((count=20,reads=1,updates=1)) +#threads=((count=20,reads=1,updates=1)) +threads=((count=10,reads=1),(count=10,updates=1)) diff --git a/bench/wtperf/track.c b/bench/wtperf/track.c index cbcc362c95d..c6c86beefd6 100644 --- a/bench/wtperf/track.c +++ b/bench/wtperf/track.c @@ -71,11 +71,17 @@ sum_ops(CONFIG *cfg, size_t field_offset) { CONFIG_THREAD *thread; uint64_t total; - int64_t i; + int64_t i, th_cnt; total = 0; - for (i = 0, thread = cfg->workers; - thread != NULL && i < cfg->workers_cnt; ++i, ++thread) + if (cfg->popthreads == NULL) { + thread = cfg->workers; + th_cnt = cfg->workers_cnt; + } else { + thread = cfg->popthreads; + th_cnt = cfg->populate_threads; + } + for (i = 0; thread != NULL && i < th_cnt; ++i, ++thread) total += ((TRACK *)((uint8_t *)thread + field_offset))->ops; return (total); @@ -108,15 +114,21 @@ latency_op(CONFIG *cfg, CONFIG_THREAD *thread; TRACK *track; uint64_t ops, latency, tmp; - int64_t i; + int64_t i, th_cnt; uint32_t max, min; ops = latency = 0; max = 0; min = UINT32_MAX; - for (i = 0, thread = cfg->workers; - thread != NULL && i < cfg->workers_cnt; ++i, ++thread) { + if (cfg->popthreads == NULL) { + thread = cfg->workers; + th_cnt = cfg->workers_cnt; + } else { + thread = cfg->popthreads; + th_cnt = cfg->populate_threads; + } + for (i = 0; thread != NULL && i < th_cnt; ++i, ++thread) { track = (TRACK *)((uint8_t *)thread + field_offset); tmp = track->latency_ops; ops += tmp - track->last_latency_ops; diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c index 733626e7c6c..b4b9e08c9da 100644 --- a/bench/wtperf/wtperf.c +++ b/bench/wtperf/wtperf.c @@ -496,14 +496,16 @@ run_mix_schedule(CONFIG *cfg, WORKLOAD *workp) static void * populate_thread(void *arg) { + struct timespec start, stop; CONFIG *cfg; CONFIG_THREAD *thread; + TRACK *trk; WT_CONNECTION *conn; WT_CURSOR *cursor; WT_SESSION *session; uint32_t opcount; - uint64_t op; - int intxn, ret; + uint64_t op, usecs; + int intxn, measure_latency, ret; char *value_buf, *key_buf; thread = (CONFIG_THREAD *)arg; @@ -511,6 +513,7 @@ populate_thread(void *arg) conn = cfg->conn; session = NULL; ret = 0; + trk = &thread->insert; key_buf = thread->key_buf; value_buf = thread->value_buf; @@ -537,6 +540,13 @@ populate_thread(void *arg) break; sprintf(key_buf, "%0*" PRIu64, cfg->key_sz, op); + measure_latency = cfg->sample_interval != 0 && ( + trk->ops % cfg->sample_rate == 0); + if (measure_latency && + (ret = __wt_epoch(NULL, &start)) != 0) { + lprintf(cfg, ret, 0, "Get time call failed"); + goto err; + } cursor->set_key(cursor, key_buf); if (cfg->random_value) randomize_value(cfg, value_buf); @@ -545,7 +555,18 @@ populate_thread(void *arg) lprintf(cfg, ret, 0, "Failed inserting"); goto err; } - ++thread->insert.ops; + /* Gather statistics */ + if (measure_latency) { + if ((ret = __wt_epoch(NULL, &stop)) != 0) { + lprintf(cfg, ret, 0, + "Get time call failed"); + goto err; + } + ++trk->latency_ops; + usecs = ns_to_us(WT_TIMEDIFF(stop, start)); + track_operation(trk, usecs); + } + ++thread->insert.ops; /* Same as trk->ops */ } else { for (intxn = 0, opcount = 0;;) { @@ -563,6 +584,13 @@ populate_thread(void *arg) intxn = 1; } sprintf(key_buf, "%0*" PRIu64, cfg->key_sz, op); + measure_latency = cfg->sample_interval != 0 && ( + trk->ops % cfg->sample_rate == 0); + if (measure_latency && + (ret = __wt_epoch(NULL, &start)) != 0) { + lprintf(cfg, ret, 0, "Get time call failed"); + goto err; + } cursor->set_key(cursor, key_buf); if (cfg->random_value) randomize_value(cfg, value_buf); @@ -571,7 +599,18 @@ populate_thread(void *arg) lprintf(cfg, ret, 0, "Failed inserting"); goto err; } - ++thread->insert.ops; + /* Gather statistics */ + if (measure_latency) { + if ((ret = __wt_epoch(NULL, &stop)) != 0) { + lprintf(cfg, ret, 0, + "Get time call failed"); + goto err; + } + ++trk->latency_ops; + usecs = ns_to_us(WT_TIMEDIFF(stop, start)); + track_operation(trk, usecs); + } + ++thread->insert.ops; /* Same as trk->ops */ if (++opcount < cfg->populate_ops_per_txn) continue; @@ -612,6 +651,7 @@ monitor(void *arg) char buf[64], *path; int ret; uint64_t reads, inserts, updates; + uint64_t cur_reads, cur_inserts, cur_updates; uint64_t last_reads, last_inserts, last_updates; uint32_t read_avg, read_min, read_max; uint32_t insert_avg, insert_min, insert_max; @@ -675,6 +715,18 @@ monitor(void *arg) latency_insert(cfg, &insert_avg, &insert_min, &insert_max); latency_update(cfg, &update_avg, &update_min, &update_max); + cur_reads = reads - last_reads; + cur_updates = updates - last_updates; + /* + * For now the only item we need to worry about changing is + * inserts when we transition from the populate phase to + * workload phase. + */ + if (inserts < last_inserts) + cur_inserts = 0; + else + cur_inserts = inserts - last_inserts; + (void)fprintf(fp, "%s" ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 @@ -684,9 +736,9 @@ monitor(void *arg) ",%" PRIu32 ",%" PRIu32 ",%" PRIu32 "\n", buf, - (reads - last_reads) / cfg->sample_interval, - (inserts - last_inserts) / cfg->sample_interval, - (updates - last_updates) / cfg->sample_interval, + cur_reads / cfg->sample_interval, + cur_inserts / cfg->sample_interval, + cur_updates / cfg->sample_interval, g_ckpt ? 'Y' : 'N', read_avg, read_min, read_max, insert_avg, insert_min, insert_max, @@ -779,6 +831,7 @@ err: g_error = g_stop = 1; static int execute_populate(CONFIG *cfg) { + CONFIG_THREAD *popth; WT_SESSION *session; struct timespec start, stop; double secs; @@ -831,8 +884,14 @@ execute_populate(CONFIG *cfg) return (ret); } + /* + * Move popthreads aside to narrow possible race with the monitor + * thread. + */ + popth = cfg->popthreads; + cfg->popthreads = NULL; if ((ret = - stop_threads(cfg, cfg->populate_threads, cfg->popthreads)) != 0) + stop_threads(cfg, cfg->populate_threads, popth)) != 0) return (ret); /* Report if any worker threads didn't finish. */ |