summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorAlex Gorrod <alexg@wiredtiger.com>2012-11-08 14:24:27 +1100
committerAlex Gorrod <alexg@wiredtiger.com>2012-11-08 14:24:27 +1100
commit5d8c1e28bcb2d0b9f6592b0785b0643973a88c36 (patch)
tree64f2c48e24dad3a7852edca198619cab9f1dbeaf /examples
parent3844f602ff5dab413d70b6d381eee203f245e40f (diff)
downloadmongo-5d8c1e28bcb2d0b9f6592b0785b0643973a88c36.tar.gz
Clean up error handling in ex_test_perf and add option to populate using
multiple threads.
Diffstat (limited to 'examples')
-rw-r--r--examples/c/ex_test_perf.c540
1 files changed, 352 insertions, 188 deletions
diff --git a/examples/c/ex_test_perf.c b/examples/c/ex_test_perf.c
index 6c8dc776b91..32b820b5fa4 100644
--- a/examples/c/ex_test_perf.c
+++ b/examples/c/ex_test_perf.c
@@ -39,6 +39,9 @@
#include <wiredtiger.h>
+#define ATOMIC_ADD(v, val) \
+ __sync_add_and_fetch(&(v), val)
+
typedef struct {
const char *home;
const char *uri;
@@ -52,6 +55,7 @@ typedef struct {
uint32_t report_interval;
uint32_t read_time;
uint32_t elapsed_time;
+ uint32_t populate_threads;/* Number of populate threads. */
uint32_t read_threads; /* Number of read threads. */
uint32_t verbose;
uint32_t stat_thread; /* Whether to create a stat thread. */
@@ -65,12 +69,21 @@ typedef struct {
} CONFIG;
/* Forward function definitions. */
+int execute_populate(CONFIG *);
int execute_reads(CONFIG *);
-int populate(CONFIG *);
+int get_next_op(uint64_t *);
+int lprintf(CONFIG *cfg, int err, uint32_t level, const char *fmt, ...)
+#ifdef __GNUC__
+ __attribute__((format (printf, 4, 5)))
+#endif
+;
+void *populate_thread(void *);
void print_config(CONFIG *);
void *read_thread(void *);
int setup_log_file(CONFIG *);
+int start_threads(CONFIG *, int, pthread_t **, void *(*func)(void *));
void *stat_worker(void *);
+int stop_threads(CONFIG *, int, pthread_t *);
void usage(void);
/* Default values - these are tiny, we want the basic run to be fast. */
@@ -78,7 +91,7 @@ CONFIG default_cfg = {
"WT_TEST", /* home */
"lsm:test", /* uri */
"create,cache_size=200MB", /* conn_config */
- "key_format=S,value_format=S", /* table_config */
+ "key_format=S,value_format=S,exclusive=true", /* table_config */
1, /* create */
14023954, /* rand_seed */
5000, /* icount */
@@ -87,6 +100,7 @@ CONFIG default_cfg = {
2, /* report_interval */
2, /* read_time */
0, /* elapsed_time */
+ 1, /* populate_threads */
2, /* read_threads */
0, /* verbose */
0, /* stat_thread */
@@ -100,7 +114,7 @@ CONFIG small_cfg = {
"WT_TEST", /* home */
"lsm:test", /* uri */
"create,cache_size=500MB", /* conn_config */
- "key_format=S,value_format=S,lsm_chunk_size=5MB,"
+ "key_format=S,value_format=S,exclusive=true,lsm_chunk_size=5MB,"
"leaf_page_max=16k,internal_page_max=16kb", /* table_config */
1, /* create */
14023954, /* rand_seed */
@@ -110,6 +124,7 @@ CONFIG small_cfg = {
10, /* report_interval */
20, /* read_time */
0, /* elapsed_time */
+ 1, /* populate_threads */
8, /* read_threads */
0, /* verbose */
0, /* stat_thread */
@@ -123,7 +138,7 @@ CONFIG med_cfg = {
"WT_TEST", /* home */
"lsm:test", /* uri */
"create,cache_size=1GB", /* conn_config */
- "key_format=S,value_format=S,lsm_chunk_size=20MB,"
+ "key_format=S,value_format=S,exclusive=true,lsm_chunk_size=20MB,"
"leaf_page_max=16k,internal_page_max=16kb", /* table_config */
1, /* create */
14023954, /* rand_seed */
@@ -133,6 +148,7 @@ CONFIG med_cfg = {
20, /* report_interval */
100, /* read_time */
0, /* elapsed_time */
+ 1, /* populate_threads */
16, /* read_threads */
0, /* verbose */
0, /* stat_thread */
@@ -146,7 +162,7 @@ CONFIG large_cfg = {
"WT_TEST", /* home */
"lsm:test", /* uri */
"create,cache_size=2GB", /* conn_config */
- "key_format=S,value_format=S,lsm_chunk_size=50MB,"
+ "key_format=S,value_format=S,exclusive=true,lsm_chunk_size=50MB,"
"leaf_page_max=16k,internal_page_max=16kb", /* table_config */
1, /* create */
14023954, /* rand_seed */
@@ -156,6 +172,7 @@ CONFIG large_cfg = {
20, /* report_interval */
600, /* read_time */
0, /* elapsed_time */
+ 1, /* populate_threads */
16, /* read_threads */
0, /* verbose */
0, /* stat_thread */
@@ -169,9 +186,10 @@ const char *debug_cconfig = "verbose=[lsm]";
const char *debug_tconfig = "";
/* Global values shared by threads. */
-uint64_t nops;
-int running;
-int stat_running;
+uint64_t g_nops;
+int g_running;
+int g_stat_running;
+uint32_t g_threads_quit; /* For tracking threads that exit early. */
void *
read_thread(void *arg)
@@ -181,34 +199,125 @@ read_thread(void *arg)
WT_SESSION *session;
WT_CURSOR *cursor;
char *key_buf;
- int ret;
+ int ret, search_ret;
+
+ session = NULL;
+ key_buf = NULL;
cfg = (CONFIG *)arg;
conn = cfg->conn;
key_buf = calloc(cfg->key_sz, 1);
- if (key_buf == NULL)
- return (arg);
+ if (key_buf == NULL) {
+ ret = ENOMEM;
+ goto err;
+ }
if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) {
- fprintf(stderr,
- "open_session failed in read thread: %d\n", ret);
- return (NULL);
+ lprintf(cfg, ret, 0,
+ "open_session failed in read thread");
+ goto err;
}
if ((ret = session->open_cursor(session, cfg->uri,
NULL, NULL, &cursor)) != 0) {
- fprintf(stderr, "open_cursor failed in read thread: %d\n", ret);
- return (NULL);
+ lprintf(cfg, ret, 0,
+ "open_cursor failed in read thread");
+ goto err;
}
- while (running) {
- ++nops;
- sprintf(key_buf, "%d", rand() % cfg->icount);
+ while (g_running) {
+ ++g_nops;
+ /* Get a value in range, avoid zero. */
+ sprintf(key_buf, "%d", (rand() % (cfg->icount - 1)) + 1);
cursor->set_key(cursor, key_buf);
- cursor->search(cursor);
+ /* Report errors and continue. */
+ if ((search_ret = cursor->search(cursor)) != 0)
+ lprintf(cfg, search_ret, 0,
+ "Search failed for: %s", key_buf);
}
- session->close(session, NULL);
- free(key_buf);
+err: if (ret != 0)
+ ++g_threads_quit;
+ if (session != NULL)
+ session->close(session, NULL);
+ if (key_buf != NULL)
+ free(key_buf);
+ return (arg);
+}
+
+/* Retrieve an ID for the next insert operation. */
+int get_next_op(uint64_t *op)
+{
+ *op = ATOMIC_ADD(g_nops, 1);
+ return (0);
+}
+
+void *
+populate_thread(void *arg)
+{
+ CONFIG *cfg;
+ WT_CONNECTION *conn;
+ WT_CURSOR *cursor;
+ WT_SESSION *session;
+ char *data_buf, *key_buf;
+ int ret;
+ uint64_t op;
+
+ cfg = (CONFIG *)arg;
+ conn = cfg->conn;
+ session = NULL;
+ data_buf = key_buf = NULL;
+
+ cfg->phase = LSM_TEST_PERF_POP;
+
+ data_buf = calloc(cfg->data_sz, 1);
+ if (data_buf == NULL) {
+ lprintf(cfg, ENOMEM, 0, "Populate data buffer");
+ goto err;
+ }
+ key_buf = calloc(cfg->key_sz, 1);
+ if (key_buf == NULL) {
+ lprintf(cfg, ENOMEM, 0, "Populate key buffer");
+ goto err;
+ }
+
+ /* Open a session for the current thread's work. */
+ if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) {
+ lprintf(cfg, ret, 0,
+ "Error opening a session on %s", cfg->home);
+ goto err;
+ }
+
+ /* Can only use bulk load single threaded. */
+ if ((ret = session->open_cursor(
+ session, cfg->uri, NULL,
+ cfg->populate_threads == 1 ? "bulk" : "", &cursor)) != 0) {
+ lprintf(cfg, ret, 0, "Error opening cursor %s", cfg->uri);
+ goto err;
+ }
+
+ memset(data_buf, 'a', cfg->data_sz - 1);
+ cursor->set_value(cursor, data_buf);
+ /* Populate the database. */
+ while (1) {
+ get_next_op(&op);
+ if (op > cfg->icount)
+ break;
+ sprintf(key_buf, "%"PRIu64, op);
+ cursor->set_key(cursor, key_buf);
+ if ((ret = cursor->insert(cursor)) != 0) {
+ lprintf(cfg, ret, 0, "Failed inserting");
+ goto err;
+ }
+ }
+ /* To ensure managing thread knows if we exited early. */
+err: if (ret != 0)
+ ++g_threads_quit;
+ if (session != NULL)
+ session->close(session, NULL);
+ if (data_buf)
+ free(data_buf);
+ if (key_buf)
+ free(key_buf);
return (arg);
}
@@ -226,193 +335,192 @@ stat_worker(void *arg)
struct timeval e;
uint64_t value;
+ session = NULL;
cfg = (CONFIG *)arg;
conn = cfg->conn;
lsm_uri = NULL;
if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) {
- fprintf(stderr,
- "open_session failed in read thread: %d\n", ret);
- return (NULL);
+ lprintf(cfg, ret, 0,
+ "open_session failed in statistics thread.");
+ goto err;
}
if (strncmp(cfg->uri, "lsm:", strlen("lsm:")) == 0) {
lsm_uri = calloc(
strlen(cfg->uri) + strlen("statistics:") + 1, 1);
if (lsm_uri == NULL) {
- fprintf(stderr, "No memory in stat thread.\n");
+ lprintf(cfg, ENOMEM, 0, "Statistics thread uri create.");
goto err;
}
sprintf(lsm_uri, "statistics:%s", cfg->uri);
}
- while (stat_running) {
+ while (g_stat_running) {
sleep(cfg->report_interval);
/* Generic header. */
- fprintf(cfg->logf, "=======================================\n");
+ lprintf(cfg, 0, cfg->verbose,
+ "=======================================");
gettimeofday(&e, NULL);
secs = e.tv_sec + e.tv_usec / 1000000.0;
secs -= (cfg->phase_start_time.tv_sec +
cfg->phase_start_time.tv_usec / 1000000.0);
if (secs == 0)
++secs;
- fprintf(cfg->logf,
- "%s completed: %" PRIu64", elapsed time: %.2f\n",
+ lprintf(cfg, 0, cfg->verbose,
+ "%s completed: %" PRIu64", elapsed time: %.2f",
cfg->phase == LSM_TEST_PERF_READ ? "reads" : "inserts",
- nops, secs);
+ g_nops, secs);
/* Report LSM tree stats, if using LSM. */
if (lsm_uri != NULL) {
if ((ret = session->open_cursor(session, lsm_uri,
NULL, NULL, &cursor)) != 0) {
- fprintf(stderr,
- "open_cursor LSM statistics: %d\n", ret);
+ lprintf(cfg, ret, 0,
+ "open_cursor failed in LSM statistics");
goto err;
}
while (
(ret = cursor->next(cursor)) == 0 &&
(ret = cursor->get_value(
cursor, &desc, &pvalue, &value)) == 0)
- fprintf(cfg->logf,
- "stat:lsm: %s=%s\n", desc, pvalue);
+ lprintf(cfg, 0, cfg->verbose,
+ "stat:lsm: %s=%s", desc, pvalue);
cursor->close(cursor);
- fprintf(cfg->logf, "\n");
+ lprintf(cfg, 0, cfg->verbose, "-----------------");
}
/* Dump the connection statistics since last time. */
if ((ret = session->open_cursor(session, "statistics:",
NULL, "statistics_clear", &cursor)) != 0) {
- fprintf(stderr,
- "open_cursor statistics: %d\n", ret);
+ lprintf(cfg, ret, 0,
+ "open_cursor failed in statistics");
goto err;
}
while (
(ret = cursor->next(cursor)) == 0 &&
(ret = cursor->get_value(
cursor, &desc, &pvalue, &value)) == 0)
- fprintf(cfg->logf,
- "stat:conn: %s=%s\n", desc, pvalue);
+ lprintf(cfg, 0, cfg->verbose,
+ "stat:conn: %s=%s", desc, pvalue);
cursor->close(cursor);
}
-err: session->close(session, NULL);
+err: if (session != NULL)
+ session->close(session, NULL);
if (lsm_uri != NULL)
free(lsm_uri);
return (arg);
}
-int populate(CONFIG *cfg)
+int execute_populate(CONFIG *cfg)
{
WT_CONNECTION *conn;
- WT_CURSOR *cursor;
WT_SESSION *session;
- char *data_buf, *key_buf;
+ pthread_t *threads;
double secs;
int ret;
+ uint64_t elapsed, last_ops;
struct timeval e;
conn = cfg->conn;
-
cfg->phase = LSM_TEST_PERF_POP;
- if (cfg->verbose > 0)
- fprintf(cfg->logf, "Starting bulk load\n");
-
- data_buf = calloc(cfg->data_sz, 1);
- if (data_buf == NULL)
- return (ENOMEM);
- key_buf = calloc(cfg->key_sz, 1);
- if (key_buf == NULL)
- return (ENOMEM);
+ lprintf(cfg, 0, 1, "Starting populate threads");
- /* Open a session for the current thread's work. */
+ /* First create the table. */
if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) {
- fprintf(stderr, "Error opening a session on %s: %s\n",
- cfg->home, wiredtiger_strerror(ret));
+ lprintf(cfg, ret, 0,
+ "Error opening a session on %s", cfg->home);
return (ret);
}
if ((ret = session->create(
session, cfg->uri, cfg->table_config)) != 0) {
- fprintf(stderr, "Error creating table %s: %s\n",
- cfg->uri, wiredtiger_strerror(ret));
+ lprintf(cfg, ret, 0, "Error creating table %s", cfg->uri);
+ session->close(session, NULL);
return (ret);
}
+ session->close(session, NULL);
- if ((ret = session->open_cursor(
- session, cfg->uri, NULL, "bulk", &cursor)) != 0) {
- fprintf(stderr, "Error opening cursor %s: %s\n",
- cfg->uri, wiredtiger_strerror(ret));
+ if ((ret = start_threads(
+ cfg, cfg->populate_threads, &threads, populate_thread)) != 0)
return (ret);
- }
- memset(data_buf, 'a', cfg->data_sz - 1);
- cursor->set_value(cursor, data_buf);
- /* Populate the database. */
gettimeofday(&cfg->phase_start_time, NULL);
- for (nops = 0; nops < cfg->icount; nops++) {
- if (cfg->verbose > 0) {
- if (nops % 1000000 == 0)
- printf(".");
- if (nops % 50000000 == 0)
- printf("\n");
- }
- sprintf(key_buf, "%"PRIu64, nops);
- cursor->set_key(cursor, key_buf);
- if ((ret = cursor->insert(cursor)) != 0) {
- fprintf(stderr, "Failed inserting with: %d\n", ret);
- return (ret);
+ for (cfg->elapsed_time = 0, last_ops = 0;
+ g_nops < cfg->icount && g_threads_quit < cfg->populate_threads;) {
+ /*
+ * Sleep for 100th of a second, report_interval is in second
+ * granularity, so adjust accordingly.
+ */
+ usleep(10000);
+ elapsed += 1;
+ if (elapsed % 100 == 0 &&
+ (elapsed / 100) % cfg->report_interval == 0) {
+ lprintf(cfg, 0, 1, "%" PRIu64 " ops in %d secs",
+ g_nops - last_ops, cfg->report_interval);
+ last_ops = g_nops;
}
}
- gettimeofday(&e, NULL);
- cursor->close(cursor);
- session->close(session, NULL);
- if (cfg->verbose > 0) {
- fprintf(cfg->logf,
- "Finished bulk load of %d items\n", cfg->icount);
- secs = e.tv_sec + e.tv_usec / 1000000.0;
- secs -= (cfg->phase_start_time.tv_sec +
- cfg->phase_start_time.tv_usec / 1000000.0);
- if (secs == 0)
- ++secs;
- fprintf(cfg->logf,
- "Load time: %.2f\n" "load ops/sec: %.2f\n",
- secs, cfg->icount / secs);
+ if (g_threads_quit == cfg->populate_threads) {
+ lprintf(cfg, WT_ERROR, 0,
+ "Populate threads exited without finishing.");
+ return (WT_ERROR);
}
+ gettimeofday(&e, NULL);
- free(data_buf);
- free(key_buf);
- return (ret);
+ if ((ret = stop_threads(cfg, cfg->populate_threads, threads)) != 0)
+ return (ret);
+
+ lprintf(cfg, 0, 1,
+ "Finished bulk load of %d items", cfg->icount);
+ secs = e.tv_sec + e.tv_usec / 1000000.0;
+ secs -= (cfg->phase_start_time.tv_sec +
+ cfg->phase_start_time.tv_usec / 1000000.0);
+ if (secs == 0)
+ ++secs;
+ lprintf(cfg, 0, 1,
+ "Load time: %.2f\n" "load ops/sec: %.2f",
+ secs, cfg->icount / secs);
+
+ return (0);
}
-/* Setup the logging output mechanism. */
-int setup_log_file(CONFIG *cfg)
+int execute_reads(CONFIG *cfg)
{
- char *fname;
- int offset;
+ pthread_t *threads;
+ int ret;
+ uint64_t last_ops;
- if (cfg->verbose < 1 && cfg->stat_thread == 0)
- return (0);
+ cfg->phase = LSM_TEST_PERF_READ;
+ lprintf(cfg, 0, 1, "Starting read threads");
- if ((fname = calloc(strlen(cfg->home) +
- strlen(cfg->uri) + strlen(".stat") + 1, 1)) == NULL) {
- fprintf(stderr, "No memory in stat thread\n");
- return (ENOMEM);
+ if ((ret = start_threads(
+ cfg, cfg->read_threads, &threads, read_thread)) != 0)
+ return (ret);
+
+ /* Sanity check reporting interval. */
+ if (cfg->report_interval > cfg->read_time)
+ cfg->report_interval = cfg->read_time;
+
+ gettimeofday(&cfg->phase_start_time, NULL);
+ for (cfg->elapsed_time = 0, last_ops = 0;
+ cfg->elapsed_time < cfg->read_time &&
+ g_threads_quit < cfg->read_threads;
+ cfg->elapsed_time += cfg->report_interval) {
+ sleep(cfg->report_interval);
+ lprintf(cfg, 0, 1, "%" PRIu64 " ops in %d secs",
+ g_nops - last_ops, cfg->report_interval);
+ last_ops = g_nops;
}
- for (offset = 0;
- cfg->uri[offset] != 0 && cfg->uri[offset] != ':';
- offset++) {}
- if (cfg->uri[offset] == 0)
- offset = 0;
- else
- ++offset;
- sprintf(fname, "%s/%s.stat", cfg->home, cfg->uri + offset);
- if ((cfg->logf = fopen(fname, "w")) == NULL) {
- fprintf(stderr, "Statistics failed to open log file.\n");
- return (EINVAL);
+ if (g_threads_quit == cfg->populate_threads) {
+ lprintf(cfg, WT_ERROR, 0,
+ "Populate threads exited without finishing.");
+ return (WT_ERROR);
}
- /* Use line buffering for the log file. */
- (void)setvbuf(cfg->logf, NULL, _IOLBF, 0);
- if (fname != NULL)
- free(fname);
+
+ if ((ret = stop_threads(cfg, cfg->read_threads, threads)) != 0)
+ return (ret);
+
return (0);
}
@@ -421,7 +529,7 @@ int main(int argc, char **argv)
CONFIG cfg;
WT_CONNECTION *conn;
const char *user_cconfig, *user_tconfig;
- const char *opts = "C:R:T:d:eh:i:k:lr:s:u:v:SML";
+ const char *opts = "C:P:R:T:d:eh:i:k:lr:s:u:v:SML";
char *cc_buf, *tc_buf;
int ch, ret, stat_created;
pthread_t stat;
@@ -468,7 +576,7 @@ int main(int argc, char **argv)
cfg.home = optarg;
break;
case 'i':
- cfg.icount = atoi(optarg) * 1000;
+ cfg.icount = atoi(optarg);
break;
case 'k':
cfg.key_sz = atoi(optarg);
@@ -491,6 +599,9 @@ int main(int argc, char **argv)
case 'C':
user_cconfig = optarg;
break;
+ case 'P':
+ cfg.populate_threads = atoi(optarg);
+ break;
case 'R':
cfg.read_threads = atoi(optarg);
break;
@@ -556,46 +667,43 @@ int main(int argc, char **argv)
/* Open a connection to the database, creating it if necessary. */
if ((ret = wiredtiger_open(
cfg.home, NULL, cfg.conn_config, &conn)) != 0) {
- fprintf(stderr, "Error connecting to %s: %s\n",
- cfg.home, wiredtiger_strerror(ret));
+ lprintf(&cfg, ret, 0, "Error connecting to %s", cfg.home);
goto err;
}
cfg.conn = conn;
if (cfg.stat_thread) {
- stat_running = 1;
+ g_stat_running = 1;
if ((ret = pthread_create(
&stat, NULL, stat_worker, &cfg)) != 0) {
- fprintf(stderr, "Error creating statistics thread.\n");
+ lprintf(&cfg, ret, 0,
+ "Error creating statistics thread.");
goto err;
}
stat_created = 1;
}
- if (cfg.create != 0 && (ret = populate(&cfg)) != 0)
+ if (cfg.create != 0 && execute_populate(&cfg) != 0)
goto err;
- if (cfg.read_time != 0 && cfg.read_threads != 0)
- if ((ret = execute_reads(&cfg)) != 0)
+ if (cfg.read_time != 0 && cfg.read_threads != 0 &&
+ (ret = execute_reads(&cfg)) != 0)
goto err;
- if (cfg.verbose > 0) {
- fprintf(cfg.logf,
- "Ran performance test example with %d threads for %d seconds.\n",
- cfg.read_threads, cfg.read_time);
- fprintf(cfg.logf,
- "Executed %" PRIu64 " read operations\n", nops);
- }
+ lprintf(&cfg, 0, 1,
+ "Ran performance test example with %d threads for %d seconds.",
+ cfg.read_threads, cfg.read_time);
+ lprintf(&cfg, 0, 1,
+ "Executed %" PRIu64 " read operations", g_nops);
- if (cfg.stat_thread)
- stat_running = 0;
+err: if (cfg.stat_thread)
+ g_stat_running = 0;
- /* Cleanup. */
-err: if (stat_created != 0 && (ret = pthread_join(stat, NULL)) != 0)
- fprintf(stderr, "Error joining stat thread: %d.\n", ret);
+ if (stat_created != 0 && (ret = pthread_join(stat, NULL)) != 0)
+ lprintf(&cfg, ret, 0, "Error joining stat thread.");
if (conn != NULL && (ret = conn->close(conn, NULL)) != 0)
- fprintf(stderr, "Error connecting to %s: %s\n",
- cfg.home, wiredtiger_strerror(ret));
+ lprintf(&cfg, ret, 0,
+ "Error closing connection to %s", cfg.home);
if (cc_buf != NULL)
free(cc_buf);
if (tc_buf != NULL)
@@ -608,60 +716,112 @@ err: if (stat_created != 0 && (ret = pthread_join(stat, NULL)) != 0)
return (ret);
}
-int execute_reads(CONFIG *cfg)
+/*
+ * Following are utility functions.
+ */
+int
+start_threads(CONFIG *cfg, int num, pthread_t **threadsp, void *(*func)(void *))
{
- pthread_t *read_threads;
- int ret;
- uint32_t i;
- uint64_t last_ops;
-
- cfg->phase = LSM_TEST_PERF_READ;
- if (cfg->verbose > 0)
- fprintf(cfg->logf, "Starting read threads\n");
-
- running = 1;
- nops = 0;
-
- read_threads = calloc(cfg->read_threads, sizeof(pthread_t *));
- if (read_threads == NULL)
+ pthread_t *threads;
+ int i, ret;
+
+ g_running = 1;
+ g_nops = 0;
+ g_threads_quit = 0;
+ threads = calloc(num, sizeof(pthread_t *));
+ if (threads == NULL)
return (ENOMEM);
- for (i = 0; i < cfg->read_threads; i++) {
+ for (i = 0; i < num; i++) {
if ((ret = pthread_create(
- &read_threads[i], NULL, read_thread, cfg)) != 0) {
- fprintf(stderr, "Error creating thread: %d\n", i);
+ &threads[i], NULL, func, cfg)) != 0) {
+ g_running = 0;
+ lprintf(cfg, ret, 0, "Error creating thread: %d", i);
return (ret);
}
}
+ *threadsp = threads;
+ return (0);
+}
- /* Sanity check reporting interval. */
- if (cfg->report_interval > cfg->read_time)
- cfg->report_interval = cfg->read_time;
+int
+stop_threads(CONFIG *cfg, int num, pthread_t *threads)
+{
+ int i, ret;
+ g_running = 0;
- gettimeofday(&cfg->phase_start_time, NULL);
- for (cfg->elapsed_time = 0, last_ops = 0;
- cfg->elapsed_time < cfg->read_time;
- cfg->elapsed_time += cfg->report_interval) {
- sleep(cfg->report_interval);
- if (cfg->verbose > 0) {
- fprintf(cfg->logf, "%" PRIu64 " ops in %d secs\n",
- nops - last_ops, cfg->report_interval);
- printf("%" PRIu64 " ops in %d secs\n",
- nops - last_ops, cfg->report_interval);
+ for (i = 0; i < num; i++) {
+ if ((ret = pthread_join(threads[i], NULL)) != 0) {
+ lprintf(cfg, ret, 0, "Error joining thread %d", i);
+ return (ret);
}
- last_ops = nops;
}
- running = 0;
- for (i = 0; i < cfg->read_threads; i++) {
- if ((ret = pthread_join(read_threads[i], NULL)) != 0) {
- fprintf(stderr, "Error joining thread %d\n", i);
- return (ret);
- }
+ free(threads);
+ return (0);
+}
+
+/*
+ * Log printf - output a log message.
+ */
+int lprintf(CONFIG *cfg, int err, uint32_t level, const char *fmt, ...)
+{
+ va_list ap;
+
+ if (err == 0 && level <= cfg->verbose) {
+ va_start(ap, fmt);
+ vfprintf(cfg->logf, fmt, ap);
+ va_end(ap);
+ fprintf(cfg->logf, "\n");
}
+ if (err == 0)
+ return (0);
- if (read_threads != NULL)
- free(read_threads);
- return (ret);
+ /* We are dealing with an error. */
+ va_start(ap, fmt);
+ vfprintf(cfg->logf, fmt, ap);
+ va_end(ap);
+ fprintf(cfg->logf, " Error: %s\n", wiredtiger_strerror(err));
+ if (cfg->logf != stderr) {
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, " Error: %s\n", wiredtiger_strerror(err));
+ }
+
+ return (0);
+}
+
+/* Setup the logging output mechanism. */
+int setup_log_file(CONFIG *cfg)
+{
+ char *fname;
+ int offset;
+
+ if (cfg->verbose < 1 && cfg->stat_thread == 0)
+ return (0);
+
+ if ((fname = calloc(strlen(cfg->home) +
+ strlen(cfg->uri) + strlen(".stat") + 1, 1)) == NULL) {
+ fprintf(stderr, "No memory in stat thread\n");
+ return (ENOMEM);
+ }
+ for (offset = 0;
+ cfg->uri[offset] != 0 && cfg->uri[offset] != ':';
+ offset++) {}
+ if (cfg->uri[offset] == 0)
+ offset = 0;
+ else
+ ++offset;
+ sprintf(fname, "%s/%s.stat", cfg->home, cfg->uri + offset);
+ if ((cfg->logf = fopen(fname, "w")) == NULL) {
+ fprintf(stderr, "Statistics failed to open log file.\n");
+ return (EINVAL);
+ }
+ /* Use line buffering for the log file. */
+ (void)setvbuf(cfg->logf, NULL, _IOLBF, 0);
+ if (fname != NULL)
+ free(fname);
+ return (0);
}
void print_config(CONFIG *cfg)
@@ -673,8 +833,11 @@ void print_config(CONFIG *cfg)
printf("\t Table configuration: %s\n", cfg->table_config);
printf("\t %s\n", cfg->create ? "Creating" : "Using existing");
printf("\t Random seed: %d\n", cfg->rand_seed);
- if (cfg->create)
+ if (cfg->create) {
printf("\t Insert count: %d\n", cfg->icount);
+ printf("\t Number populate threads: %d\n",
+ cfg->populate_threads);
+ }
printf("\t key size: %d data size: %d\n", cfg->key_sz, cfg->data_sz);
printf("\t Reporting interval: %d\n", cfg->report_interval);
printf("\t Read workload period: %d\n", cfg->read_time);
@@ -684,17 +847,18 @@ void print_config(CONFIG *cfg)
void usage(void)
{
- printf("ex_perf_test [-CDLMRSTdehikrsuv]\n");
+ printf("ex_perf_test [-CDLMPRSTdehikrsuv]\n");
printf("\t-S Use a small default configuration\n");
printf("\t-M Use a medium default configuration\n");
printf("\t-L Use a large default configuration\n");
printf("\t-C <string> additional connection configuration\n");
+ printf("\t-P <int> number of populate threads\n");
printf("\t-R <int> number of read threads\n");
printf("\t-T <string> additional table configuration\n");
printf("\t-d <int> data item size\n");
printf("\t-e use existing database (skip population phase)\n");
printf("\t-h <string> Wired Tiger home must exist, default WT_TEST \n");
- printf("\t-i <int> number of records to insert in thousands\n");
+ printf("\t-i <int> number of records to insert\n");
printf("\t-k <int> key item size\n");
printf("\t-r <int> number of seconds to run read phase\n");
printf("\t-s <int> seed for random number generator\n");