diff options
author | Alex Gorrod <alexg@wiredtiger.com> | 2014-02-11 13:34:43 +1100 |
---|---|---|
committer | Alex Gorrod <alexg@wiredtiger.com> | 2014-02-11 13:34:43 +1100 |
commit | 222631e05e46b83fbb62056988e1c6b7474731f0 (patch) | |
tree | 8cd62b8dc20d50eeea8e120ef4f9d67f30b168e0 /bench/wtperf | |
parent | 785bf198cb8a34bc89588816ec87588976c0dce8 (diff) | |
parent | b5768ce125cf6fb174edfbd8a22a2b29beb14bc0 (diff) | |
download | mongo-222631e05e46b83fbb62056988e1c6b7474731f0.tar.gz |
Merge pull request #867 from wiredtiger/wtperf-multi-table
Add ability to create multiple tables to wtperf.
Diffstat (limited to 'bench/wtperf')
-rw-r--r-- | bench/wtperf/config.c | 9 | ||||
-rw-r--r-- | bench/wtperf/wtperf.c | 220 | ||||
-rw-r--r-- | bench/wtperf/wtperf.h | 1 | ||||
-rw-r--r-- | bench/wtperf/wtperf_opt.i | 3 |
4 files changed, 183 insertions, 50 deletions
diff --git a/bench/wtperf/config.c b/bench/wtperf/config.c index 2bb0201d639..c523ffc9f8b 100644 --- a/bench/wtperf/config.c +++ b/bench/wtperf/config.c @@ -96,6 +96,11 @@ config_free(CONFIG *cfg) *pstr = NULL; } } + if (cfg->uris != NULL) { + for (i = 0; i < cfg->table_count; i++) + free(cfg->uris[i]); + free(cfg->uris); + } free(cfg->popthreads); free(cfg->uri); @@ -474,6 +479,10 @@ config_sanity(CONFIG *cfg) fprintf(stderr, "interval value longer than the run-time\n"); return (1); } + if (cfg->table_count > 99) { + fprintf(stderr, "table count greater than 99\n"); + return (1); + } return (0); } diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c index 3d255f670dc..0fb269b522c 100644 --- a/bench/wtperf/wtperf.c +++ b/bench/wtperf/wtperf.c @@ -32,6 +32,7 @@ static const CONFIG default_cfg = { "WT_TEST", /* home */ "WT_TEST", /* monitor dir */ NULL, /* uri */ + NULL, /* uris */ NULL, /* conn */ NULL, /* logf */ NULL, NULL, /* populate, checkpoint threads */ @@ -72,6 +73,8 @@ static volatile uint32_t g_totalsec; /* total seconds running */ #endif static void *checkpoint_worker(void *); +static int create_tables(CONFIG *); +static int create_uris(CONFIG *); static int execute_populate(CONFIG *); static int execute_workload(CONFIG *); static int find_table_count(CONFIG *); @@ -188,16 +191,18 @@ worker(void *arg) CONFIG_THREAD *thread; TRACK *trk; WT_CONNECTION *conn; - WT_CURSOR *cursor; + WT_CURSOR **cursors, *cursor; WT_SESSION *session; - uint64_t next_val, usecs; + char *value_buf, *key_buf, *value; int measure_latency, ret; + size_t i; + uint64_t next_val, usecs; uint8_t *op, *op_end; - char *value_buf, *key_buf, *value; thread = (CONFIG_THREAD *)arg; cfg = thread->cfg; conn = cfg->conn; + cursors = NULL; session = NULL; trk = NULL; @@ -206,7 +211,24 @@ worker(void *arg) lprintf(cfg, ret, 0, "worker: WT_CONNECTION.open_session"); goto err; } - if ((ret = session->open_cursor( + if (cfg->table_count > 1) { + cursors = (WT_CURSOR **)calloc( + cfg->table_count, sizeof(WT_CURSOR *)); + if (cursors == NULL) { + lprintf(cfg, ENOMEM, 0, + "worker: couldn't allocate cursor array"); + goto err; + } + for (i = 0; i < cfg->table_count; i++) { + if ((ret = session->open_cursor(session, + cfg->uris[i], NULL, NULL, &cursors[i])) != 0) { + lprintf(cfg, ret, 0, + "worker: WT_SESSION.open_cursor: %s", + cfg->uris[i]); + goto err; + } + } + } else if ((ret = session->open_cursor( session, cfg->uri, NULL, NULL, &cursor)) != 0) { lprintf(cfg, ret, 0, "worker: WT_SESSION.open_cursor: %s", cfg->uri); @@ -220,6 +242,11 @@ worker(void *arg) op_end = op + sizeof(thread->workload->ops); while (!g_stop) { + /* Pick a cursor if there are multiple tables. */ + if (cfg->table_count > 1) + cursor = cursors[ + __wt_random() % (cfg->table_count - 1)]; + /* * Generate the next key and setup operation specific * statistics tracking objects. @@ -356,6 +383,8 @@ op_err: lprintf(cfg, ret, 0, if (0) { err: g_error = g_stop = 1; } + if (cursors != NULL) + free(cursors); return (NULL); } @@ -471,17 +500,20 @@ populate_thread(void *arg) CONFIG_THREAD *thread; TRACK *trk; WT_CONNECTION *conn; - WT_CURSOR *cursor; + WT_CURSOR **cursors, *cursor; WT_SESSION *session; + char *value_buf, *key_buf; + const char *cursor_config; + int intxn, measure_latency, ret; + size_t i; uint32_t opcount; uint64_t op, usecs; - int intxn, measure_latency, ret; - char *value_buf, *key_buf; thread = (CONFIG_THREAD *)arg; cfg = thread->cfg; conn = cfg->conn; session = NULL; + cursors = NULL; ret = 0; trk = &thread->insert; @@ -494,15 +526,35 @@ populate_thread(void *arg) goto err; } - /* Do a bulk load if populate is single-threaded. */ - if ((ret = session->open_cursor(session, cfg->uri, NULL, - cfg->populate_threads == 1 ? "bulk" : NULL, &cursor)) != 0) { + /* Do bulk loads if populate is single-threaded. */ + cursor_config = cfg->populate_threads == 1 ? "bulk" : NULL; + /* Create the cursor or cursors if there are multiple tables. */ + if (cfg->table_count > 1) { + cursors = (WT_CURSOR **)calloc( + cfg->table_count, sizeof(WT_CURSOR *)); + if (cursors == NULL) { + lprintf(cfg, ENOMEM, 0, + "worker: couldn't allocate cursor array"); + goto err; + } + for (i = 0; i < cfg->table_count; i++) { + if ((ret = session->open_cursor( + session, cfg->uris[i], NULL, + cursor_config, &cursors[i])) != 0) { + lprintf(cfg, ret, 0, + "populate: WT_SESSION.open_cursor: %s", + cfg->uris[i]); + goto err; + } + } + } else if ((ret = session->open_cursor( + session, cfg->uri, NULL, cursor_config, &cursor)) != 0) { lprintf(cfg, ret, 0, "populate: WT_SESSION.open_cursor: %s", cfg->uri); goto err; } - /* Populate the database. */ + /* Populate the databases. */ for (intxn = 0, opcount = 0;;) { op = get_next_incr(); if (op > cfg->icount) @@ -525,26 +577,30 @@ populate_thread(void *arg) lprintf(cfg, ret, 0, "Get time call failed"); goto err; } - cursor->set_key(cursor, key_buf); - if (cfg->random_value) - randomize_value(cfg, value_buf); - cursor->set_value(cursor, value_buf); - if ((ret = cursor->insert(cursor)) != 0) { - lprintf(cfg, ret, 0, "Failed inserting"); - goto err; - } - /* Gather statistics */ - if (measure_latency) { - if ((ret = __wt_epoch(NULL, &stop)) != 0) { - lprintf(cfg, ret, 0, - "Get time call failed"); + for (i = 0; i < cfg->table_count; i++) { + if (cfg->table_count > 1) + cursor = cursors[i]; + cursor->set_key(cursor, key_buf); + if (cfg->random_value) + randomize_value(cfg, value_buf); + cursor->set_value(cursor, value_buf); + if ((ret = cursor->insert(cursor)) != 0) { + lprintf(cfg, ret, 0, "Failed inserting"); goto err; } - ++trk->latency_ops; - usecs = ns_to_us(WT_TIMEDIFF(stop, start)); - track_operation(trk, usecs); + /* Gather statistics */ + if (measure_latency) { + if ((ret = __wt_epoch(NULL, &stop)) != 0) { + lprintf(cfg, ret, 0, + "Get time call failed"); + goto err; + } + ++trk->latency_ops; + usecs = ns_to_us(WT_TIMEDIFF(stop, start)); + track_operation(trk, usecs); + } + ++thread->insert.ops; /* Same as trk->ops */ } - ++thread->insert.ops; /* Same as trk->ops */ if (cfg->populate_ops_per_txn != 0) { if (++opcount < cfg->populate_ops_per_txn) @@ -572,6 +628,8 @@ populate_thread(void *arg) if (0) { err: g_error = g_stop = 1; } + if (cursors != NULL) + free(cursors); return (NULL); } @@ -1053,11 +1111,90 @@ err: if ((t_ret = session->close(session, NULL)) != 0) { return (ret); } +/* + * Populate the uri array if more than one table is being used. + */ +int +create_uris(CONFIG *cfg) +{ + char *uri; + int ret; + size_t base_uri_len; + uint32_t i; + + ret = 0; + if (cfg->table_count < 2) { + cfg->uris = NULL; + return (0); + } + + base_uri_len = strlen(cfg->uri); + cfg->uris = (char **)malloc(cfg->table_count * sizeof(char *)); + if (cfg->uris == NULL) { + ret = ENOMEM; + goto err; + } + for (i = 0; i < cfg->table_count; i++) { + uri = cfg->uris[i] = (char *)calloc(base_uri_len + 3, 1); + if (uri == NULL) { + ret = ENOMEM; + goto err; + } + memcpy(uri, cfg->uri, base_uri_len); + uri[base_uri_len] = uri[base_uri_len + 1] = '0'; + uri[base_uri_len] = '0' + (i / 10); + uri[base_uri_len + 1] = '0' + (i % 10); + } +err: if (ret != 0) { + for (i = 0; i < cfg->table_count; i++) + free(cfg->uris[i]); + free(cfg->uris); + cfg->uris = NULL; + } + return (ret); +} + +int +create_tables(CONFIG *cfg) +{ + WT_SESSION *session; + char *uri; + int ret; + size_t i; + + session = NULL; + if (cfg->create == 0) + return (0); + + uri = cfg->uri; + if ((ret = cfg->conn->open_session( + cfg->conn, NULL, cfg->sess_config, &session)) != 0) { + lprintf(cfg, ret, 0, + "Error opening a session on %s", cfg->home); + return (ret); + } + for (i = 0; i < cfg->table_count; i++) { + if (cfg->table_count > 1) + uri = cfg->uris[i]; + if ((ret = session->create( + session, uri, cfg->table_config)) != 0) { + lprintf(cfg, ret, 0, + "Error creating table %s", cfg->uri); + return (ret); + } + } + if ((ret = session->close(session, NULL)) != 0) { + lprintf(cfg, + ret, 0, "Error closing session"); + return (ret); + } + return (ret); +} + int main(int argc, char *argv[]) { CONFIG *cfg, _cfg; - WT_SESSION *session; pthread_t monitor_thread; size_t len; uint64_t req_len, total_ops; @@ -1068,7 +1205,6 @@ main(int argc, char *argv[]) const char *user_cconfig, *user_tconfig; char *cmd, *cc_buf, *tc_buf, *tmphome; - session = NULL; monitor_created = monitor_set = ret = 0; helium_mount = user_cconfig = user_tconfig = NULL; cmd = cc_buf = tc_buf = tmphome = NULL; @@ -1252,26 +1388,10 @@ main(int argc, char *argv[]) ret, 0, "Error loading Helium: %s", helium_buf); } - if (cfg->create != 0) { /* If creating, create the table. */ - if ((ret = cfg->conn->open_session( - cfg->conn, NULL, cfg->sess_config, &session)) != 0) { - lprintf(cfg, ret, 0, - "Error opening a session on %s", cfg->home); - goto err; - } - if ((ret = session->create( - session, cfg->uri, cfg->table_config)) != 0) { - lprintf(cfg, - ret, 0, "Error creating table %s", cfg->uri); - goto err; - } - if ((ret = session->close(session, NULL)) != 0) { - lprintf(cfg, - ret, 0, "Error closing session"); - goto err; - } - session = NULL; - } + if ((ret = create_uris(cfg)) != 0) + goto err; + if ((ret = create_tables(cfg)) != 0) + goto err; /* Start the monitor thread. */ if (cfg->sample_interval != 0) { if ((ret = pthread_create( diff --git a/bench/wtperf/wtperf.h b/bench/wtperf/wtperf.h index 54c4334a2ac..00e6dd5fb95 100644 --- a/bench/wtperf/wtperf.h +++ b/bench/wtperf/wtperf.h @@ -65,6 +65,7 @@ struct __config { /* Configuration struction */ const char *home; /* WiredTiger home */ const char *monitor_dir; /* Monitor output dir */ char *uri; /* Object URI */ + char **uris; /* URIs if multiple tables */ WT_CONNECTION *conn; /* Database connection */ diff --git a/bench/wtperf/wtperf_opt.i b/bench/wtperf/wtperf_opt.i index 4e11799781f..28e682de364 100644 --- a/bench/wtperf/wtperf_opt.i +++ b/bench/wtperf/wtperf_opt.i @@ -114,6 +114,9 @@ DEF_OPT_AS_CONFIG_STRING(table_config, "key_format=S,value_format=S,type=lsm,exclusive=true," "leaf_page_max=4kb,internal_page_max=64kb,allocation_size=4kb,", "table configuration string") +DEF_OPT_AS_UINT32(table_count, 1, + "number of tables to run operations over. Operations are spread evenly " + "over the tables amongst all threads. Default 1, maximum 99.") DEF_OPT_AS_STRING(threads, "", "workload configuration: each 'count' " "entry is the total number of threads, and the 'insert', 'read' and " "'update' entries are the ratios of insert, read and update operations " |