summaryrefslogtreecommitdiff
path: root/bench/wtperf
diff options
context:
space:
mode:
authorAlex Gorrod <alexg@wiredtiger.com>2014-02-11 13:34:43 +1100
committerAlex Gorrod <alexg@wiredtiger.com>2014-02-11 13:34:43 +1100
commit222631e05e46b83fbb62056988e1c6b7474731f0 (patch)
tree8cd62b8dc20d50eeea8e120ef4f9d67f30b168e0 /bench/wtperf
parent785bf198cb8a34bc89588816ec87588976c0dce8 (diff)
parentb5768ce125cf6fb174edfbd8a22a2b29beb14bc0 (diff)
downloadmongo-222631e05e46b83fbb62056988e1c6b7474731f0.tar.gz
Merge pull request #867 from wiredtiger/wtperf-multi-table
Add ability to create multiple tables to wtperf.
Diffstat (limited to 'bench/wtperf')
-rw-r--r--bench/wtperf/config.c9
-rw-r--r--bench/wtperf/wtperf.c220
-rw-r--r--bench/wtperf/wtperf.h1
-rw-r--r--bench/wtperf/wtperf_opt.i3
4 files changed, 183 insertions, 50 deletions
diff --git a/bench/wtperf/config.c b/bench/wtperf/config.c
index 2bb0201d639..c523ffc9f8b 100644
--- a/bench/wtperf/config.c
+++ b/bench/wtperf/config.c
@@ -96,6 +96,11 @@ config_free(CONFIG *cfg)
*pstr = NULL;
}
}
+ if (cfg->uris != NULL) {
+ for (i = 0; i < cfg->table_count; i++)
+ free(cfg->uris[i]);
+ free(cfg->uris);
+ }
free(cfg->popthreads);
free(cfg->uri);
@@ -474,6 +479,10 @@ config_sanity(CONFIG *cfg)
fprintf(stderr, "interval value longer than the run-time\n");
return (1);
}
+ if (cfg->table_count > 99) {
+ fprintf(stderr, "table count greater than 99\n");
+ return (1);
+ }
return (0);
}
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c
index 3d255f670dc..0fb269b522c 100644
--- a/bench/wtperf/wtperf.c
+++ b/bench/wtperf/wtperf.c
@@ -32,6 +32,7 @@ static const CONFIG default_cfg = {
"WT_TEST", /* home */
"WT_TEST", /* monitor dir */
NULL, /* uri */
+ NULL, /* uris */
NULL, /* conn */
NULL, /* logf */
NULL, NULL, /* populate, checkpoint threads */
@@ -72,6 +73,8 @@ static volatile uint32_t g_totalsec; /* total seconds running */
#endif
static void *checkpoint_worker(void *);
+static int create_tables(CONFIG *);
+static int create_uris(CONFIG *);
static int execute_populate(CONFIG *);
static int execute_workload(CONFIG *);
static int find_table_count(CONFIG *);
@@ -188,16 +191,18 @@ worker(void *arg)
CONFIG_THREAD *thread;
TRACK *trk;
WT_CONNECTION *conn;
- WT_CURSOR *cursor;
+ WT_CURSOR **cursors, *cursor;
WT_SESSION *session;
- uint64_t next_val, usecs;
+ char *value_buf, *key_buf, *value;
int measure_latency, ret;
+ size_t i;
+ uint64_t next_val, usecs;
uint8_t *op, *op_end;
- char *value_buf, *key_buf, *value;
thread = (CONFIG_THREAD *)arg;
cfg = thread->cfg;
conn = cfg->conn;
+ cursors = NULL;
session = NULL;
trk = NULL;
@@ -206,7 +211,24 @@ worker(void *arg)
lprintf(cfg, ret, 0, "worker: WT_CONNECTION.open_session");
goto err;
}
- if ((ret = session->open_cursor(
+ if (cfg->table_count > 1) {
+ cursors = (WT_CURSOR **)calloc(
+ cfg->table_count, sizeof(WT_CURSOR *));
+ if (cursors == NULL) {
+ lprintf(cfg, ENOMEM, 0,
+ "worker: couldn't allocate cursor array");
+ goto err;
+ }
+ for (i = 0; i < cfg->table_count; i++) {
+ if ((ret = session->open_cursor(session,
+ cfg->uris[i], NULL, NULL, &cursors[i])) != 0) {
+ lprintf(cfg, ret, 0,
+ "worker: WT_SESSION.open_cursor: %s",
+ cfg->uris[i]);
+ goto err;
+ }
+ }
+ } else if ((ret = session->open_cursor(
session, cfg->uri, NULL, NULL, &cursor)) != 0) {
lprintf(cfg,
ret, 0, "worker: WT_SESSION.open_cursor: %s", cfg->uri);
@@ -220,6 +242,11 @@ worker(void *arg)
op_end = op + sizeof(thread->workload->ops);
while (!g_stop) {
+ /* Pick a cursor if there are multiple tables. */
+ if (cfg->table_count > 1)
+ cursor = cursors[
+ __wt_random() % (cfg->table_count - 1)];
+
/*
* Generate the next key and setup operation specific
* statistics tracking objects.
@@ -356,6 +383,8 @@ op_err: lprintf(cfg, ret, 0,
if (0) {
err: g_error = g_stop = 1;
}
+ if (cursors != NULL)
+ free(cursors);
return (NULL);
}
@@ -471,17 +500,20 @@ populate_thread(void *arg)
CONFIG_THREAD *thread;
TRACK *trk;
WT_CONNECTION *conn;
- WT_CURSOR *cursor;
+ WT_CURSOR **cursors, *cursor;
WT_SESSION *session;
+ char *value_buf, *key_buf;
+ const char *cursor_config;
+ int intxn, measure_latency, ret;
+ size_t i;
uint32_t opcount;
uint64_t op, usecs;
- int intxn, measure_latency, ret;
- char *value_buf, *key_buf;
thread = (CONFIG_THREAD *)arg;
cfg = thread->cfg;
conn = cfg->conn;
session = NULL;
+ cursors = NULL;
ret = 0;
trk = &thread->insert;
@@ -494,15 +526,35 @@ populate_thread(void *arg)
goto err;
}
- /* Do a bulk load if populate is single-threaded. */
- if ((ret = session->open_cursor(session, cfg->uri, NULL,
- cfg->populate_threads == 1 ? "bulk" : NULL, &cursor)) != 0) {
+ /* Do bulk loads if populate is single-threaded. */
+ cursor_config = cfg->populate_threads == 1 ? "bulk" : NULL;
+ /* Create the cursor or cursors if there are multiple tables. */
+ if (cfg->table_count > 1) {
+ cursors = (WT_CURSOR **)calloc(
+ cfg->table_count, sizeof(WT_CURSOR *));
+ if (cursors == NULL) {
+ lprintf(cfg, ENOMEM, 0,
+ "worker: couldn't allocate cursor array");
+ goto err;
+ }
+ for (i = 0; i < cfg->table_count; i++) {
+ if ((ret = session->open_cursor(
+ session, cfg->uris[i], NULL,
+ cursor_config, &cursors[i])) != 0) {
+ lprintf(cfg, ret, 0,
+ "populate: WT_SESSION.open_cursor: %s",
+ cfg->uris[i]);
+ goto err;
+ }
+ }
+ } else if ((ret = session->open_cursor(
+ session, cfg->uri, NULL, cursor_config, &cursor)) != 0) {
lprintf(cfg,
ret, 0, "populate: WT_SESSION.open_cursor: %s", cfg->uri);
goto err;
}
- /* Populate the database. */
+ /* Populate the databases. */
for (intxn = 0, opcount = 0;;) {
op = get_next_incr();
if (op > cfg->icount)
@@ -525,26 +577,30 @@ populate_thread(void *arg)
lprintf(cfg, ret, 0, "Get time call failed");
goto err;
}
- cursor->set_key(cursor, key_buf);
- if (cfg->random_value)
- randomize_value(cfg, value_buf);
- cursor->set_value(cursor, value_buf);
- if ((ret = cursor->insert(cursor)) != 0) {
- lprintf(cfg, ret, 0, "Failed inserting");
- goto err;
- }
- /* Gather statistics */
- if (measure_latency) {
- if ((ret = __wt_epoch(NULL, &stop)) != 0) {
- lprintf(cfg, ret, 0,
- "Get time call failed");
+ for (i = 0; i < cfg->table_count; i++) {
+ if (cfg->table_count > 1)
+ cursor = cursors[i];
+ cursor->set_key(cursor, key_buf);
+ if (cfg->random_value)
+ randomize_value(cfg, value_buf);
+ cursor->set_value(cursor, value_buf);
+ if ((ret = cursor->insert(cursor)) != 0) {
+ lprintf(cfg, ret, 0, "Failed inserting");
goto err;
}
- ++trk->latency_ops;
- usecs = ns_to_us(WT_TIMEDIFF(stop, start));
- track_operation(trk, usecs);
+ /* Gather statistics */
+ if (measure_latency) {
+ if ((ret = __wt_epoch(NULL, &stop)) != 0) {
+ lprintf(cfg, ret, 0,
+ "Get time call failed");
+ goto err;
+ }
+ ++trk->latency_ops;
+ usecs = ns_to_us(WT_TIMEDIFF(stop, start));
+ track_operation(trk, usecs);
+ }
+ ++thread->insert.ops; /* Same as trk->ops */
}
- ++thread->insert.ops; /* Same as trk->ops */
if (cfg->populate_ops_per_txn != 0) {
if (++opcount < cfg->populate_ops_per_txn)
@@ -572,6 +628,8 @@ populate_thread(void *arg)
if (0) {
err: g_error = g_stop = 1;
}
+ if (cursors != NULL)
+ free(cursors);
return (NULL);
}
@@ -1053,11 +1111,90 @@ err: if ((t_ret = session->close(session, NULL)) != 0) {
return (ret);
}
+/*
+ * Populate the uri array if more than one table is being used.
+ */
+int
+create_uris(CONFIG *cfg)
+{
+ char *uri;
+ int ret;
+ size_t base_uri_len;
+ uint32_t i;
+
+ ret = 0;
+ if (cfg->table_count < 2) {
+ cfg->uris = NULL;
+ return (0);
+ }
+
+ base_uri_len = strlen(cfg->uri);
+ cfg->uris = (char **)malloc(cfg->table_count * sizeof(char *));
+ if (cfg->uris == NULL) {
+ ret = ENOMEM;
+ goto err;
+ }
+ for (i = 0; i < cfg->table_count; i++) {
+ uri = cfg->uris[i] = (char *)calloc(base_uri_len + 3, 1);
+ if (uri == NULL) {
+ ret = ENOMEM;
+ goto err;
+ }
+ memcpy(uri, cfg->uri, base_uri_len);
+ uri[base_uri_len] = uri[base_uri_len + 1] = '0';
+ uri[base_uri_len] = '0' + (i / 10);
+ uri[base_uri_len + 1] = '0' + (i % 10);
+ }
+err: if (ret != 0) {
+ for (i = 0; i < cfg->table_count; i++)
+ free(cfg->uris[i]);
+ free(cfg->uris);
+ cfg->uris = NULL;
+ }
+ return (ret);
+}
+
+int
+create_tables(CONFIG *cfg)
+{
+ WT_SESSION *session;
+ char *uri;
+ int ret;
+ size_t i;
+
+ session = NULL;
+ if (cfg->create == 0)
+ return (0);
+
+ uri = cfg->uri;
+ if ((ret = cfg->conn->open_session(
+ cfg->conn, NULL, cfg->sess_config, &session)) != 0) {
+ lprintf(cfg, ret, 0,
+ "Error opening a session on %s", cfg->home);
+ return (ret);
+ }
+ for (i = 0; i < cfg->table_count; i++) {
+ if (cfg->table_count > 1)
+ uri = cfg->uris[i];
+ if ((ret = session->create(
+ session, uri, cfg->table_config)) != 0) {
+ lprintf(cfg, ret, 0,
+ "Error creating table %s", cfg->uri);
+ return (ret);
+ }
+ }
+ if ((ret = session->close(session, NULL)) != 0) {
+ lprintf(cfg,
+ ret, 0, "Error closing session");
+ return (ret);
+ }
+ return (ret);
+}
+
int
main(int argc, char *argv[])
{
CONFIG *cfg, _cfg;
- WT_SESSION *session;
pthread_t monitor_thread;
size_t len;
uint64_t req_len, total_ops;
@@ -1068,7 +1205,6 @@ main(int argc, char *argv[])
const char *user_cconfig, *user_tconfig;
char *cmd, *cc_buf, *tc_buf, *tmphome;
- session = NULL;
monitor_created = monitor_set = ret = 0;
helium_mount = user_cconfig = user_tconfig = NULL;
cmd = cc_buf = tc_buf = tmphome = NULL;
@@ -1252,26 +1388,10 @@ main(int argc, char *argv[])
ret, 0, "Error loading Helium: %s", helium_buf);
}
- if (cfg->create != 0) { /* If creating, create the table. */
- if ((ret = cfg->conn->open_session(
- cfg->conn, NULL, cfg->sess_config, &session)) != 0) {
- lprintf(cfg, ret, 0,
- "Error opening a session on %s", cfg->home);
- goto err;
- }
- if ((ret = session->create(
- session, cfg->uri, cfg->table_config)) != 0) {
- lprintf(cfg,
- ret, 0, "Error creating table %s", cfg->uri);
- goto err;
- }
- if ((ret = session->close(session, NULL)) != 0) {
- lprintf(cfg,
- ret, 0, "Error closing session");
- goto err;
- }
- session = NULL;
- }
+ if ((ret = create_uris(cfg)) != 0)
+ goto err;
+ if ((ret = create_tables(cfg)) != 0)
+ goto err;
/* Start the monitor thread. */
if (cfg->sample_interval != 0) {
if ((ret = pthread_create(
diff --git a/bench/wtperf/wtperf.h b/bench/wtperf/wtperf.h
index 54c4334a2ac..00e6dd5fb95 100644
--- a/bench/wtperf/wtperf.h
+++ b/bench/wtperf/wtperf.h
@@ -65,6 +65,7 @@ struct __config { /* Configuration struction */
const char *home; /* WiredTiger home */
const char *monitor_dir; /* Monitor output dir */
char *uri; /* Object URI */
+ char **uris; /* URIs if multiple tables */
WT_CONNECTION *conn; /* Database connection */
diff --git a/bench/wtperf/wtperf_opt.i b/bench/wtperf/wtperf_opt.i
index 4e11799781f..28e682de364 100644
--- a/bench/wtperf/wtperf_opt.i
+++ b/bench/wtperf/wtperf_opt.i
@@ -114,6 +114,9 @@ DEF_OPT_AS_CONFIG_STRING(table_config,
"key_format=S,value_format=S,type=lsm,exclusive=true,"
"leaf_page_max=4kb,internal_page_max=64kb,allocation_size=4kb,",
"table configuration string")
+DEF_OPT_AS_UINT32(table_count, 1,
+ "number of tables to run operations over. Operations are spread evenly "
+ "over the tables amongst all threads. Default 1, maximum 99.")
DEF_OPT_AS_STRING(threads, "", "workload configuration: each 'count' "
"entry is the total number of threads, and the 'insert', 'read' and "
"'update' entries are the ratios of insert, read and update operations "