diff options
author | Alex Gorrod <alexg@wiredtiger.com> | 2012-12-20 16:43:20 +1100 |
---|---|---|
committer | Alex Gorrod <alexg@wiredtiger.com> | 2012-12-20 16:43:20 +1100 |
commit | 573de639a54101b0fc2749040441f835fff36399 (patch) | |
tree | 68640df1833242e0c1c823c396647ccdbf5cf264 /bench | |
parent | 3dcf958a3079c822d3ca198c4e7693191d223872 (diff) | |
download | mongo-573de639a54101b0fc2749040441f835fff36399.tar.gz |
Update wtperf to support insert threads in workload mode.
Diffstat (limited to 'bench')
-rw-r--r-- | bench/wtperf/wtperf.c | 151 |
1 files changed, 127 insertions, 24 deletions
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c index b3ba51df631..ad9f6edcabb 100644 --- a/bench/wtperf/wtperf.c +++ b/bench/wtperf/wtperf.c @@ -59,6 +59,7 @@ typedef struct { uint32_t elapsed_time; uint32_t populate_threads;/* Number of populate threads. */ uint32_t read_threads; /* Number of read threads. */ + uint32_t insert_threads;/* Number of insert threads. */ uint32_t update_threads;/* Number of update threads. */ uint32_t verbose; WT_CONNECTION *conn; @@ -80,6 +81,8 @@ int lprintf(CONFIG *cfg, int err, uint32_t level, const char *fmt, ...) #endif ; void *checkpoint_worker(void *); +int find_table_count(CONFIG *); +void *insert_thread(void *); void *populate_thread(void *); void print_config(CONFIG *); void *read_thread(void *); @@ -97,7 +100,8 @@ void worker(CONFIG *, uint32_t); /* Worker thread types. */ #define WORKER_READ 0x01 -#define WORKER_UPDATE 0x02 +#define WORKER_INSERT 0x02 +#define WORKER_UPDATE 0x03 /* Default values - these are tiny, we want the basic run to be fast. */ CONFIG default_cfg = { @@ -117,6 +121,7 @@ CONFIG default_cfg = { 0, /* elapsed_time */ 1, /* populate_threads */ 2, /* read_threads */ + 0, /* insert_threads */ 0, /* update_threads */ 0, /* verbose */ NULL, /* conn */ @@ -143,6 +148,7 @@ CONFIG small_cfg = { 0, /* elapsed_time */ 1, /* populate_threads */ 8, /* read_threads */ + 0, /* insert_threads */ 0, /* update_threads */ 0, /* verbose */ NULL, /* conn */ @@ -169,6 +175,7 @@ CONFIG med_cfg = { 0, /* elapsed_time */ 1, /* populate_threads */ 16, /* read_threads */ + 0, /* insert_threads */ 0, /* update_threads */ 0, /* verbose */ NULL, /* conn */ @@ -195,6 +202,7 @@ CONFIG large_cfg = { 0, /* elapsed_time */ 1, /* populate_threads */ 16, /* read_threads */ + 0, /* insert_threads */ 0, /* update_threads */ 0, /* verbose */ NULL, /* conn */ @@ -207,12 +215,20 @@ const char *debug_cconfig = "verbose=[lsm]"; const char *debug_tconfig = ""; /* Global values shared by threads. */ -uint64_t g_nops; +/* + * g_nins_ops is used to track both insert count and assign keys, so use this + * to track insert failures. + */ +uint64_t g_nfailedins_ops; +uint64_t g_nins_ops; +uint64_t g_npop_ops; uint64_t g_nread_ops; uint64_t g_nupdate_ops; +uint64_t g_nworker_ops; int g_running; int g_util_running; uint32_t g_threads_quit; /* For tracking threads that exit early. */ +/* End global values shared by threads. */ void * read_thread(void *arg) @@ -222,6 +238,13 @@ read_thread(void *arg) } void * +insert_thread(void *arg) +{ + worker((CONFIG *)arg, WORKER_INSERT); + return (NULL); +} + +void * update_thread(void *arg) { worker((CONFIG *)arg, WORKER_UPDATE); @@ -235,11 +258,12 @@ worker(CONFIG *cfg, uint32_t worker_type) WT_SESSION *session; WT_CURSOR *cursor; const char *op_name = "search"; - char *key_buf, *value; + char *data_buf, *key_buf, *value; int ret, op_ret; + uint64_t next_val; session = NULL; - key_buf = NULL; + data_buf = key_buf = NULL; op_ret = 0; conn = cfg->conn; @@ -248,6 +272,14 @@ worker(CONFIG *cfg, uint32_t worker_type) ret = ENOMEM; goto err; } + if (worker_type == WORKER_INSERT) { + data_buf = calloc(cfg->data_sz, 1); + if (data_buf == NULL) { + lprintf(cfg, ENOMEM, 0, "Populate data buffer"); + goto err; + } + memset(data_buf, 'a', cfg->data_sz - 1); + } if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) { lprintf(cfg, ret, 0, @@ -263,16 +295,28 @@ worker(CONFIG *cfg, uint32_t worker_type) while (g_running) { /* Get a value in range, avoid zero. */ - sprintf(key_buf, "%d", - (uint32_t)rand() % (cfg->icount - 1) + 1); +#define VALUE_RANGE (cfg->icount + g_nins_ops - (cfg->insert_threads + 1)) + next_val = (worker_type == WORKER_INSERT ? + (cfg->icount + ATOMIC_ADD(g_nins_ops, 1)) : + (rand() % VALUE_RANGE) + 1); + sprintf(key_buf, "%020" PRIu64, next_val); cursor->set_key(cursor, key_buf); switch(worker_type) { case WORKER_READ: + op_name = "read"; op_ret = cursor->search(cursor); if (op_ret == 0) ++g_nread_ops; break; + case WORKER_INSERT: + op_name = "insert"; + cursor->set_value(cursor, data_buf); + op_ret = cursor->insert(cursor); + if (op_ret != 0) + ++g_nfailedins_ops; + break; case WORKER_UPDATE: + op_name = "update"; op_ret = cursor->search(cursor); if (op_ret == 0) { cursor->get_value(cursor, &value); @@ -281,7 +325,6 @@ worker(CONFIG *cfg, uint32_t worker_type) else value[0] = 'a'; op_ret = cursor->update(cursor); - op_name = "update"; } if (op_ret == 0) ++g_nupdate_ops; @@ -296,13 +339,15 @@ worker(CONFIG *cfg, uint32_t worker_type) lprintf(cfg, op_ret, 0, "%s failed for: %s", op_name, key_buf); else - ++g_nops; + ++g_nworker_ops; } err: if (ret != 0) ++g_threads_quit; if (session != NULL) session->close(session, NULL); + if (data_buf != NULL) + free(data_buf); if (key_buf != NULL) free(key_buf); } @@ -310,7 +355,7 @@ err: if (ret != 0) /* Retrieve an ID for the next insert operation. */ int get_next_op(uint64_t *op) { - *op = ATOMIC_ADD(g_nops, 1); + *op = ATOMIC_ADD(g_npop_ops, 1); return (0); } @@ -363,7 +408,7 @@ populate_thread(void *arg) get_next_op(&op); if (op > cfg->icount) break; - sprintf(key_buf, "%"PRIu64, op); + sprintf(key_buf, "%020"PRIu64, op); cursor->set_key(cursor, key_buf); if ((ret = cursor->insert(cursor)) != 0) { lprintf(cfg, ret, 0, "Failed inserting"); @@ -435,12 +480,12 @@ stat_worker(void *arg) if (cfg->phase == WT_PERF_POP) lprintf(cfg, 0, cfg->verbose, "inserts: %" PRIu64 ", elapsed time: %.2f", - g_nops, secs); + g_npop_ops, secs); else lprintf(cfg, 0, cfg->verbose, - "reads: %" PRIu64 " updates: %" PRIu64 - ", elapsed time: %.2f", - g_nread_ops, g_nupdate_ops, secs); + "reads: %" PRIu64 " inserts: %" PRIu64 + " updates: %" PRIu64 ", elapsed time: %.2f", + g_nread_ops, g_nins_ops, g_nupdate_ops, secs); /* Report data source stats. */ if ((ret = session->open_cursor(session, stat_uri, @@ -560,7 +605,8 @@ int execute_populate(CONFIG *cfg) gettimeofday(&cfg->phase_start_time, NULL); for (cfg->elapsed_time = 0, elapsed = last_ops = 0; - g_nops < cfg->icount && g_threads_quit < cfg->populate_threads;) { + g_npop_ops < cfg->icount && + g_threads_quit < cfg->populate_threads;) { /* * Sleep for 100th of a second, report_interval is in second * granularity, so adjust accordingly. @@ -570,8 +616,8 @@ int execute_populate(CONFIG *cfg) if (elapsed % 100 == 0 && (elapsed / 100) % cfg->report_interval == 0) { lprintf(cfg, 0, 1, "%" PRIu64 " ops in %d secs", - g_nops - last_ops, cfg->report_interval); - last_ops = g_nops; + g_npop_ops - last_ops, cfg->report_interval); + last_ops = g_npop_ops; } } if (g_threads_quit == cfg->populate_threads) { @@ -600,7 +646,7 @@ int execute_populate(CONFIG *cfg) int execute_workload(CONFIG *cfg) { - pthread_t *rthreads, *uthreads; + pthread_t *ithreads, *rthreads, *uthreads; int ret; uint64_t last_reads, last_updates; @@ -611,6 +657,10 @@ int execute_workload(CONFIG *cfg) cfg, cfg->read_threads, &rthreads, read_thread)) != 0) return (ret); + if (cfg->insert_threads != 0 && (ret = start_threads( + cfg, cfg->insert_threads, &ithreads, insert_thread)) != 0) + return (ret); + if (cfg->update_threads != 0 && (ret = start_threads( cfg, cfg->update_threads, &uthreads, update_thread)) != 0) return (ret); @@ -641,6 +691,10 @@ int execute_workload(CONFIG *cfg) (ret = stop_threads(cfg, cfg->read_threads, rthreads)) != 0) return (ret); + if (cfg->insert_threads != 0 && + (ret = stop_threads(cfg, cfg->insert_threads, ithreads)) != 0) + return (ret); + if (cfg->update_threads != 0 && (ret = stop_threads(cfg, cfg->update_threads, uthreads)) != 0) return (ret); @@ -648,12 +702,49 @@ int execute_workload(CONFIG *cfg) return (0); } +/* + * Ensure that icount matches the number of records in the + * existing table. + */ +int find_table_count(CONFIG *cfg) +{ + WT_CONNECTION *conn; + WT_CURSOR *cursor; + WT_SESSION *session; + char *key; + int ret; + + conn = cfg->conn; + + if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) { + lprintf(cfg, ret, 0, + "open_session failed finding existing table count"); + goto err; + } + if ((ret = session->open_cursor(session, cfg->uri, + NULL, NULL, &cursor)) != 0) { + lprintf(cfg, ret, 0, + "open_cursor failed finding existing table count"); + goto err; + } + if ((ret = cursor->prev(cursor)) != 0) { + lprintf(cfg, ret, 0, + "cursor prev failed finding existing table count"); + goto err; + } + cursor->get_key(cursor, &key); + cfg->icount = atoi(key); + +err: session->close(session, NULL); + return (ret); +} + int main(int argc, char **argv) { CONFIG cfg; WT_CONNECTION *conn; const char *user_cconfig, *user_tconfig; - const char *opts = "C:P:R:U:T:c:d:eh:i:k:l:r:s:t:u:v:SML"; + const char *opts = "C:I:P:R:U:T:c:d:eh:i:k:l:r:s:t:u:v:SML"; char *cc_buf, *tc_buf; int ch, checkpoint_created, ret, stat_created; pthread_t checkpoint, stat; @@ -729,6 +820,9 @@ int main(int argc, char **argv) case 'C': user_cconfig = optarg; break; + case 'I': + cfg.insert_threads = (uint32_t)atoi(optarg); + break; case 'P': cfg.populate_threads = (uint32_t)atoi(optarg); break; @@ -827,20 +921,27 @@ int main(int argc, char **argv) } if (cfg.create != 0 && execute_populate(&cfg) != 0) goto err; + /* If we aren't populating, set the insert count. */ + if (cfg.create == 0 && find_table_count(&cfg) != 0) + goto err; if (cfg.run_time != 0 && - (cfg.read_threads != 0 || cfg.update_threads != 0) && + cfg.read_threads + cfg.insert_threads + cfg.update_threads != 0 && (ret = execute_workload(&cfg)) != 0) goto err; lprintf(&cfg, 0, 1, - "Ran performance test example with %d read threads" - " and %d update threads for %d seconds.", - cfg.read_threads, cfg.update_threads, cfg.run_time); + "Ran performance test example with %d read threads, %d insert" + " threads and %d update threads for %d seconds.", + cfg.read_threads, cfg.insert_threads, + cfg.update_threads, cfg.run_time); if (cfg.read_threads != 0) lprintf(&cfg, 0, 1, "Executed %" PRIu64 " read operations", g_nread_ops); + if (cfg.insert_threads != 0) + lprintf(&cfg, 0, 1, + "Executed %" PRIu64 " insert operations", g_nins_ops); if (cfg.update_threads != 0) lprintf(&cfg, 0, 1, "Executed %" PRIu64 " update operations", g_nupdate_ops); @@ -879,7 +980,7 @@ start_threads( int ret; g_running = 1; - g_nops = g_nread_ops = g_nupdate_ops = 0; + g_npop_ops = g_nread_ops = g_nupdate_ops = 0; g_threads_quit = 0; threads = calloc(num, sizeof(pthread_t *)); if (threads == NULL) @@ -1006,6 +1107,7 @@ void print_config(CONFIG *cfg) printf("\t Reporting interval: %d\n", cfg->report_interval); printf("\t Workload period: %d\n", cfg->run_time); printf("\t Number read threads: %d\n", cfg->read_threads); + printf("\t Number insert threads: %d\n", cfg->insert_threads); printf("\t Number update threads: %d\n", cfg->update_threads); printf("\t Verbosity: %d\n", cfg->verbose); } @@ -1017,6 +1119,7 @@ void usage(void) printf("\t-M Use a medium default configuration\n"); printf("\t-L Use a large default configuration\n"); printf("\t-C <string> additional connection configuration\n"); + printf("\t-I <int> number of insert worker threads\n"); printf("\t-P <int> number of populate threads\n"); printf("\t-R <int> number of read threads\n"); printf("\t-U <int> number of update threads\n"); |