summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSusan LoVerso <sue@wiredtiger.com>2014-02-03 10:26:13 -0500
committerSusan LoVerso <sue@wiredtiger.com>2014-02-03 10:26:13 -0500
commit885a6c9b42f1ac2e73246f9bcb17de5dcfcdaba5 (patch)
tree6a97247ac9350f8cf46080c499d6995dda8a9abb
parent7fdf93aceca4e6bbd3d2ca52f0ffc8693f8b6b92 (diff)
downloadmongo-885a6c9b42f1ac2e73246f9bcb17de5dcfcdaba5.tar.gz
Add populate latency support. Change test3 workload definition.
-rw-r--r--bench/wtperf/runners/test3-500m-lsm.wtperf3
-rw-r--r--bench/wtperf/track.c24
-rw-r--r--bench/wtperf/wtperf.c75
3 files changed, 87 insertions, 15 deletions
diff --git a/bench/wtperf/runners/test3-500m-lsm.wtperf b/bench/wtperf/runners/test3-500m-lsm.wtperf
index 41379eedbce..2da836d577c 100644
--- a/bench/wtperf/runners/test3-500m-lsm.wtperf
+++ b/bench/wtperf/runners/test3-500m-lsm.wtperf
@@ -14,4 +14,5 @@ pareto=true
report_interval=10
run_time=14400
sample_interval=10
-threads=((count=20,reads=1,updates=1))
+#threads=((count=20,reads=1,updates=1))
+threads=((count=10,reads=1),(count=10,updates=1))
diff --git a/bench/wtperf/track.c b/bench/wtperf/track.c
index cbcc362c95d..c6c86beefd6 100644
--- a/bench/wtperf/track.c
+++ b/bench/wtperf/track.c
@@ -71,11 +71,17 @@ sum_ops(CONFIG *cfg, size_t field_offset)
{
CONFIG_THREAD *thread;
uint64_t total;
- int64_t i;
+ int64_t i, th_cnt;
total = 0;
- for (i = 0, thread = cfg->workers;
- thread != NULL && i < cfg->workers_cnt; ++i, ++thread)
+ if (cfg->popthreads == NULL) {
+ thread = cfg->workers;
+ th_cnt = cfg->workers_cnt;
+ } else {
+ thread = cfg->popthreads;
+ th_cnt = cfg->populate_threads;
+ }
+ for (i = 0; thread != NULL && i < th_cnt; ++i, ++thread)
total += ((TRACK *)((uint8_t *)thread + field_offset))->ops;
return (total);
@@ -108,15 +114,21 @@ latency_op(CONFIG *cfg,
CONFIG_THREAD *thread;
TRACK *track;
uint64_t ops, latency, tmp;
- int64_t i;
+ int64_t i, th_cnt;
uint32_t max, min;
ops = latency = 0;
max = 0;
min = UINT32_MAX;
- for (i = 0, thread = cfg->workers;
- thread != NULL && i < cfg->workers_cnt; ++i, ++thread) {
+ if (cfg->popthreads == NULL) {
+ thread = cfg->workers;
+ th_cnt = cfg->workers_cnt;
+ } else {
+ thread = cfg->popthreads;
+ th_cnt = cfg->populate_threads;
+ }
+ for (i = 0; thread != NULL && i < th_cnt; ++i, ++thread) {
track = (TRACK *)((uint8_t *)thread + field_offset);
tmp = track->latency_ops;
ops += tmp - track->last_latency_ops;
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c
index 733626e7c6c..b4b9e08c9da 100644
--- a/bench/wtperf/wtperf.c
+++ b/bench/wtperf/wtperf.c
@@ -496,14 +496,16 @@ run_mix_schedule(CONFIG *cfg, WORKLOAD *workp)
static void *
populate_thread(void *arg)
{
+ struct timespec start, stop;
CONFIG *cfg;
CONFIG_THREAD *thread;
+ TRACK *trk;
WT_CONNECTION *conn;
WT_CURSOR *cursor;
WT_SESSION *session;
uint32_t opcount;
- uint64_t op;
- int intxn, ret;
+ uint64_t op, usecs;
+ int intxn, measure_latency, ret;
char *value_buf, *key_buf;
thread = (CONFIG_THREAD *)arg;
@@ -511,6 +513,7 @@ populate_thread(void *arg)
conn = cfg->conn;
session = NULL;
ret = 0;
+ trk = &thread->insert;
key_buf = thread->key_buf;
value_buf = thread->value_buf;
@@ -537,6 +540,13 @@ populate_thread(void *arg)
break;
sprintf(key_buf, "%0*" PRIu64, cfg->key_sz, op);
+ measure_latency = cfg->sample_interval != 0 && (
+ trk->ops % cfg->sample_rate == 0);
+ if (measure_latency &&
+ (ret = __wt_epoch(NULL, &start)) != 0) {
+ lprintf(cfg, ret, 0, "Get time call failed");
+ goto err;
+ }
cursor->set_key(cursor, key_buf);
if (cfg->random_value)
randomize_value(cfg, value_buf);
@@ -545,7 +555,18 @@ populate_thread(void *arg)
lprintf(cfg, ret, 0, "Failed inserting");
goto err;
}
- ++thread->insert.ops;
+ /* Gather statistics */
+ if (measure_latency) {
+ if ((ret = __wt_epoch(NULL, &stop)) != 0) {
+ lprintf(cfg, ret, 0,
+ "Get time call failed");
+ goto err;
+ }
+ ++trk->latency_ops;
+ usecs = ns_to_us(WT_TIMEDIFF(stop, start));
+ track_operation(trk, usecs);
+ }
+ ++thread->insert.ops; /* Same as trk->ops */
}
else {
for (intxn = 0, opcount = 0;;) {
@@ -563,6 +584,13 @@ populate_thread(void *arg)
intxn = 1;
}
sprintf(key_buf, "%0*" PRIu64, cfg->key_sz, op);
+ measure_latency = cfg->sample_interval != 0 && (
+ trk->ops % cfg->sample_rate == 0);
+ if (measure_latency &&
+ (ret = __wt_epoch(NULL, &start)) != 0) {
+ lprintf(cfg, ret, 0, "Get time call failed");
+ goto err;
+ }
cursor->set_key(cursor, key_buf);
if (cfg->random_value)
randomize_value(cfg, value_buf);
@@ -571,7 +599,18 @@ populate_thread(void *arg)
lprintf(cfg, ret, 0, "Failed inserting");
goto err;
}
- ++thread->insert.ops;
+ /* Gather statistics */
+ if (measure_latency) {
+ if ((ret = __wt_epoch(NULL, &stop)) != 0) {
+ lprintf(cfg, ret, 0,
+ "Get time call failed");
+ goto err;
+ }
+ ++trk->latency_ops;
+ usecs = ns_to_us(WT_TIMEDIFF(stop, start));
+ track_operation(trk, usecs);
+ }
+ ++thread->insert.ops; /* Same as trk->ops */
if (++opcount < cfg->populate_ops_per_txn)
continue;
@@ -612,6 +651,7 @@ monitor(void *arg)
char buf[64], *path;
int ret;
uint64_t reads, inserts, updates;
+ uint64_t cur_reads, cur_inserts, cur_updates;
uint64_t last_reads, last_inserts, last_updates;
uint32_t read_avg, read_min, read_max;
uint32_t insert_avg, insert_min, insert_max;
@@ -675,6 +715,18 @@ monitor(void *arg)
latency_insert(cfg, &insert_avg, &insert_min, &insert_max);
latency_update(cfg, &update_avg, &update_min, &update_max);
+ cur_reads = reads - last_reads;
+ cur_updates = updates - last_updates;
+ /*
+ * For now the only item we need to worry about changing is
+ * inserts when we transition from the populate phase to
+ * workload phase.
+ */
+ if (inserts < last_inserts)
+ cur_inserts = 0;
+ else
+ cur_inserts = inserts - last_inserts;
+
(void)fprintf(fp,
"%s"
",%" PRIu64 ",%" PRIu64 ",%" PRIu64
@@ -684,9 +736,9 @@ monitor(void *arg)
",%" PRIu32 ",%" PRIu32 ",%" PRIu32
"\n",
buf,
- (reads - last_reads) / cfg->sample_interval,
- (inserts - last_inserts) / cfg->sample_interval,
- (updates - last_updates) / cfg->sample_interval,
+ cur_reads / cfg->sample_interval,
+ cur_inserts / cfg->sample_interval,
+ cur_updates / cfg->sample_interval,
g_ckpt ? 'Y' : 'N',
read_avg, read_min, read_max,
insert_avg, insert_min, insert_max,
@@ -779,6 +831,7 @@ err: g_error = g_stop = 1;
static int
execute_populate(CONFIG *cfg)
{
+ CONFIG_THREAD *popth;
WT_SESSION *session;
struct timespec start, stop;
double secs;
@@ -831,8 +884,14 @@ execute_populate(CONFIG *cfg)
return (ret);
}
+ /*
+ * Move popthreads aside to narrow possible race with the monitor
+ * thread.
+ */
+ popth = cfg->popthreads;
+ cfg->popthreads = NULL;
if ((ret =
- stop_threads(cfg, cfg->populate_threads, cfg->popthreads)) != 0)
+ stop_threads(cfg, cfg->populate_threads, popth)) != 0)
return (ret);
/* Report if any worker threads didn't finish. */