summaryrefslogtreecommitdiff
path: root/bench
diff options
context:
space:
mode:
authorAlex Gorrod <alexg@wiredtiger.com>2012-12-20 16:43:20 +1100
committerAlex Gorrod <alexg@wiredtiger.com>2012-12-20 16:43:20 +1100
commit573de639a54101b0fc2749040441f835fff36399 (patch)
tree68640df1833242e0c1c823c396647ccdbf5cf264 /bench
parent3dcf958a3079c822d3ca198c4e7693191d223872 (diff)
downloadmongo-573de639a54101b0fc2749040441f835fff36399.tar.gz
Update wtperf to support insert threads in workload mode.
Diffstat (limited to 'bench')
-rw-r--r--bench/wtperf/wtperf.c151
1 files changed, 127 insertions, 24 deletions
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c
index b3ba51df631..ad9f6edcabb 100644
--- a/bench/wtperf/wtperf.c
+++ b/bench/wtperf/wtperf.c
@@ -59,6 +59,7 @@ typedef struct {
uint32_t elapsed_time;
uint32_t populate_threads;/* Number of populate threads. */
uint32_t read_threads; /* Number of read threads. */
+ uint32_t insert_threads;/* Number of insert threads. */
uint32_t update_threads;/* Number of update threads. */
uint32_t verbose;
WT_CONNECTION *conn;
@@ -80,6 +81,8 @@ int lprintf(CONFIG *cfg, int err, uint32_t level, const char *fmt, ...)
#endif
;
void *checkpoint_worker(void *);
+int find_table_count(CONFIG *);
+void *insert_thread(void *);
void *populate_thread(void *);
void print_config(CONFIG *);
void *read_thread(void *);
@@ -97,7 +100,8 @@ void worker(CONFIG *, uint32_t);
/* Worker thread types. */
#define WORKER_READ 0x01
-#define WORKER_UPDATE 0x02
+#define WORKER_INSERT 0x02
+#define WORKER_UPDATE 0x03
/* Default values - these are tiny, we want the basic run to be fast. */
CONFIG default_cfg = {
@@ -117,6 +121,7 @@ CONFIG default_cfg = {
0, /* elapsed_time */
1, /* populate_threads */
2, /* read_threads */
+ 0, /* insert_threads */
0, /* update_threads */
0, /* verbose */
NULL, /* conn */
@@ -143,6 +148,7 @@ CONFIG small_cfg = {
0, /* elapsed_time */
1, /* populate_threads */
8, /* read_threads */
+ 0, /* insert_threads */
0, /* update_threads */
0, /* verbose */
NULL, /* conn */
@@ -169,6 +175,7 @@ CONFIG med_cfg = {
0, /* elapsed_time */
1, /* populate_threads */
16, /* read_threads */
+ 0, /* insert_threads */
0, /* update_threads */
0, /* verbose */
NULL, /* conn */
@@ -195,6 +202,7 @@ CONFIG large_cfg = {
0, /* elapsed_time */
1, /* populate_threads */
16, /* read_threads */
+ 0, /* insert_threads */
0, /* update_threads */
0, /* verbose */
NULL, /* conn */
@@ -207,12 +215,20 @@ const char *debug_cconfig = "verbose=[lsm]";
const char *debug_tconfig = "";
/* Global values shared by threads. */
-uint64_t g_nops;
+/*
+ * g_nins_ops is used to track both insert count and assign keys, so use this
+ * to track insert failures.
+ */
+uint64_t g_nfailedins_ops;
+uint64_t g_nins_ops;
+uint64_t g_npop_ops;
uint64_t g_nread_ops;
uint64_t g_nupdate_ops;
+uint64_t g_nworker_ops;
int g_running;
int g_util_running;
uint32_t g_threads_quit; /* For tracking threads that exit early. */
+/* End global values shared by threads. */
void *
read_thread(void *arg)
@@ -222,6 +238,13 @@ read_thread(void *arg)
}
void *
+insert_thread(void *arg)
+{
+ worker((CONFIG *)arg, WORKER_INSERT);
+ return (NULL);
+}
+
+void *
update_thread(void *arg)
{
worker((CONFIG *)arg, WORKER_UPDATE);
@@ -235,11 +258,12 @@ worker(CONFIG *cfg, uint32_t worker_type)
WT_SESSION *session;
WT_CURSOR *cursor;
const char *op_name = "search";
- char *key_buf, *value;
+ char *data_buf, *key_buf, *value;
int ret, op_ret;
+ uint64_t next_val;
session = NULL;
- key_buf = NULL;
+ data_buf = key_buf = NULL;
op_ret = 0;
conn = cfg->conn;
@@ -248,6 +272,14 @@ worker(CONFIG *cfg, uint32_t worker_type)
ret = ENOMEM;
goto err;
}
+ if (worker_type == WORKER_INSERT) {
+ data_buf = calloc(cfg->data_sz, 1);
+ if (data_buf == NULL) {
+ lprintf(cfg, ENOMEM, 0, "Populate data buffer");
+ goto err;
+ }
+ memset(data_buf, 'a', cfg->data_sz - 1);
+ }
if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) {
lprintf(cfg, ret, 0,
@@ -263,16 +295,28 @@ worker(CONFIG *cfg, uint32_t worker_type)
while (g_running) {
/* Get a value in range, avoid zero. */
- sprintf(key_buf, "%d",
- (uint32_t)rand() % (cfg->icount - 1) + 1);
+#define VALUE_RANGE (cfg->icount + g_nins_ops - (cfg->insert_threads + 1))
+ next_val = (worker_type == WORKER_INSERT ?
+ (cfg->icount + ATOMIC_ADD(g_nins_ops, 1)) :
+ (rand() % VALUE_RANGE) + 1);
+ sprintf(key_buf, "%020" PRIu64, next_val);
cursor->set_key(cursor, key_buf);
switch(worker_type) {
case WORKER_READ:
+ op_name = "read";
op_ret = cursor->search(cursor);
if (op_ret == 0)
++g_nread_ops;
break;
+ case WORKER_INSERT:
+ op_name = "insert";
+ cursor->set_value(cursor, data_buf);
+ op_ret = cursor->insert(cursor);
+ if (op_ret != 0)
+ ++g_nfailedins_ops;
+ break;
case WORKER_UPDATE:
+ op_name = "update";
op_ret = cursor->search(cursor);
if (op_ret == 0) {
cursor->get_value(cursor, &value);
@@ -281,7 +325,6 @@ worker(CONFIG *cfg, uint32_t worker_type)
else
value[0] = 'a';
op_ret = cursor->update(cursor);
- op_name = "update";
}
if (op_ret == 0)
++g_nupdate_ops;
@@ -296,13 +339,15 @@ worker(CONFIG *cfg, uint32_t worker_type)
lprintf(cfg, op_ret, 0,
"%s failed for: %s", op_name, key_buf);
else
- ++g_nops;
+ ++g_nworker_ops;
}
err: if (ret != 0)
++g_threads_quit;
if (session != NULL)
session->close(session, NULL);
+ if (data_buf != NULL)
+ free(data_buf);
if (key_buf != NULL)
free(key_buf);
}
@@ -310,7 +355,7 @@ err: if (ret != 0)
/* Retrieve an ID for the next insert operation. */
int get_next_op(uint64_t *op)
{
- *op = ATOMIC_ADD(g_nops, 1);
+ *op = ATOMIC_ADD(g_npop_ops, 1);
return (0);
}
@@ -363,7 +408,7 @@ populate_thread(void *arg)
get_next_op(&op);
if (op > cfg->icount)
break;
- sprintf(key_buf, "%"PRIu64, op);
+ sprintf(key_buf, "%020"PRIu64, op);
cursor->set_key(cursor, key_buf);
if ((ret = cursor->insert(cursor)) != 0) {
lprintf(cfg, ret, 0, "Failed inserting");
@@ -435,12 +480,12 @@ stat_worker(void *arg)
if (cfg->phase == WT_PERF_POP)
lprintf(cfg, 0, cfg->verbose,
"inserts: %" PRIu64 ", elapsed time: %.2f",
- g_nops, secs);
+ g_npop_ops, secs);
else
lprintf(cfg, 0, cfg->verbose,
- "reads: %" PRIu64 " updates: %" PRIu64
- ", elapsed time: %.2f",
- g_nread_ops, g_nupdate_ops, secs);
+ "reads: %" PRIu64 " inserts: %" PRIu64
+ " updates: %" PRIu64 ", elapsed time: %.2f",
+ g_nread_ops, g_nins_ops, g_nupdate_ops, secs);
/* Report data source stats. */
if ((ret = session->open_cursor(session, stat_uri,
@@ -560,7 +605,8 @@ int execute_populate(CONFIG *cfg)
gettimeofday(&cfg->phase_start_time, NULL);
for (cfg->elapsed_time = 0, elapsed = last_ops = 0;
- g_nops < cfg->icount && g_threads_quit < cfg->populate_threads;) {
+ g_npop_ops < cfg->icount &&
+ g_threads_quit < cfg->populate_threads;) {
/*
* Sleep for 100th of a second, report_interval is in second
* granularity, so adjust accordingly.
@@ -570,8 +616,8 @@ int execute_populate(CONFIG *cfg)
if (elapsed % 100 == 0 &&
(elapsed / 100) % cfg->report_interval == 0) {
lprintf(cfg, 0, 1, "%" PRIu64 " ops in %d secs",
- g_nops - last_ops, cfg->report_interval);
- last_ops = g_nops;
+ g_npop_ops - last_ops, cfg->report_interval);
+ last_ops = g_npop_ops;
}
}
if (g_threads_quit == cfg->populate_threads) {
@@ -600,7 +646,7 @@ int execute_populate(CONFIG *cfg)
int execute_workload(CONFIG *cfg)
{
- pthread_t *rthreads, *uthreads;
+ pthread_t *ithreads, *rthreads, *uthreads;
int ret;
uint64_t last_reads, last_updates;
@@ -611,6 +657,10 @@ int execute_workload(CONFIG *cfg)
cfg, cfg->read_threads, &rthreads, read_thread)) != 0)
return (ret);
+ if (cfg->insert_threads != 0 && (ret = start_threads(
+ cfg, cfg->insert_threads, &ithreads, insert_thread)) != 0)
+ return (ret);
+
if (cfg->update_threads != 0 && (ret = start_threads(
cfg, cfg->update_threads, &uthreads, update_thread)) != 0)
return (ret);
@@ -641,6 +691,10 @@ int execute_workload(CONFIG *cfg)
(ret = stop_threads(cfg, cfg->read_threads, rthreads)) != 0)
return (ret);
+ if (cfg->insert_threads != 0 &&
+ (ret = stop_threads(cfg, cfg->insert_threads, ithreads)) != 0)
+ return (ret);
+
if (cfg->update_threads != 0 &&
(ret = stop_threads(cfg, cfg->update_threads, uthreads)) != 0)
return (ret);
@@ -648,12 +702,49 @@ int execute_workload(CONFIG *cfg)
return (0);
}
+/*
+ * Ensure that icount matches the number of records in the
+ * existing table.
+ */
+int find_table_count(CONFIG *cfg)
+{
+ WT_CONNECTION *conn;
+ WT_CURSOR *cursor;
+ WT_SESSION *session;
+ char *key;
+ int ret;
+
+ conn = cfg->conn;
+
+ if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) {
+ lprintf(cfg, ret, 0,
+ "open_session failed finding existing table count");
+ goto err;
+ }
+ if ((ret = session->open_cursor(session, cfg->uri,
+ NULL, NULL, &cursor)) != 0) {
+ lprintf(cfg, ret, 0,
+ "open_cursor failed finding existing table count");
+ goto err;
+ }
+ if ((ret = cursor->prev(cursor)) != 0) {
+ lprintf(cfg, ret, 0,
+ "cursor prev failed finding existing table count");
+ goto err;
+ }
+ cursor->get_key(cursor, &key);
+ cfg->icount = atoi(key);
+
+err: session->close(session, NULL);
+ return (ret);
+}
+
int main(int argc, char **argv)
{
CONFIG cfg;
WT_CONNECTION *conn;
const char *user_cconfig, *user_tconfig;
- const char *opts = "C:P:R:U:T:c:d:eh:i:k:l:r:s:t:u:v:SML";
+ const char *opts = "C:I:P:R:U:T:c:d:eh:i:k:l:r:s:t:u:v:SML";
char *cc_buf, *tc_buf;
int ch, checkpoint_created, ret, stat_created;
pthread_t checkpoint, stat;
@@ -729,6 +820,9 @@ int main(int argc, char **argv)
case 'C':
user_cconfig = optarg;
break;
+ case 'I':
+ cfg.insert_threads = (uint32_t)atoi(optarg);
+ break;
case 'P':
cfg.populate_threads = (uint32_t)atoi(optarg);
break;
@@ -827,20 +921,27 @@ int main(int argc, char **argv)
}
if (cfg.create != 0 && execute_populate(&cfg) != 0)
goto err;
+ /* If we aren't populating, set the insert count. */
+ if (cfg.create == 0 && find_table_count(&cfg) != 0)
+ goto err;
if (cfg.run_time != 0 &&
- (cfg.read_threads != 0 || cfg.update_threads != 0) &&
+ cfg.read_threads + cfg.insert_threads + cfg.update_threads != 0 &&
(ret = execute_workload(&cfg)) != 0)
goto err;
lprintf(&cfg, 0, 1,
- "Ran performance test example with %d read threads"
- " and %d update threads for %d seconds.",
- cfg.read_threads, cfg.update_threads, cfg.run_time);
+ "Ran performance test example with %d read threads, %d insert"
+ " threads and %d update threads for %d seconds.",
+ cfg.read_threads, cfg.insert_threads,
+ cfg.update_threads, cfg.run_time);
if (cfg.read_threads != 0)
lprintf(&cfg, 0, 1,
"Executed %" PRIu64 " read operations", g_nread_ops);
+ if (cfg.insert_threads != 0)
+ lprintf(&cfg, 0, 1,
+ "Executed %" PRIu64 " insert operations", g_nins_ops);
if (cfg.update_threads != 0)
lprintf(&cfg, 0, 1,
"Executed %" PRIu64 " update operations", g_nupdate_ops);
@@ -879,7 +980,7 @@ start_threads(
int ret;
g_running = 1;
- g_nops = g_nread_ops = g_nupdate_ops = 0;
+ g_npop_ops = g_nread_ops = g_nupdate_ops = 0;
g_threads_quit = 0;
threads = calloc(num, sizeof(pthread_t *));
if (threads == NULL)
@@ -1006,6 +1107,7 @@ void print_config(CONFIG *cfg)
printf("\t Reporting interval: %d\n", cfg->report_interval);
printf("\t Workload period: %d\n", cfg->run_time);
printf("\t Number read threads: %d\n", cfg->read_threads);
+ printf("\t Number insert threads: %d\n", cfg->insert_threads);
printf("\t Number update threads: %d\n", cfg->update_threads);
printf("\t Verbosity: %d\n", cfg->verbose);
}
@@ -1017,6 +1119,7 @@ void usage(void)
printf("\t-M Use a medium default configuration\n");
printf("\t-L Use a large default configuration\n");
printf("\t-C <string> additional connection configuration\n");
+ printf("\t-I <int> number of insert worker threads\n");
printf("\t-P <int> number of populate threads\n");
printf("\t-R <int> number of read threads\n");
printf("\t-U <int> number of update threads\n");