diff options
author | Alex Gorrod <alexg@wiredtiger.com> | 2012-11-08 14:24:27 +1100 |
---|---|---|
committer | Alex Gorrod <alexg@wiredtiger.com> | 2012-11-08 14:24:27 +1100 |
commit | 5d8c1e28bcb2d0b9f6592b0785b0643973a88c36 (patch) | |
tree | 64f2c48e24dad3a7852edca198619cab9f1dbeaf /examples | |
parent | 3844f602ff5dab413d70b6d381eee203f245e40f (diff) | |
download | mongo-5d8c1e28bcb2d0b9f6592b0785b0643973a88c36.tar.gz |
Clean up error handling in ex_test_perf and add option to populate using
multiple threads.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/c/ex_test_perf.c | 540 |
1 files changed, 352 insertions, 188 deletions
diff --git a/examples/c/ex_test_perf.c b/examples/c/ex_test_perf.c index 6c8dc776b91..32b820b5fa4 100644 --- a/examples/c/ex_test_perf.c +++ b/examples/c/ex_test_perf.c @@ -39,6 +39,9 @@ #include <wiredtiger.h> +#define ATOMIC_ADD(v, val) \ + __sync_add_and_fetch(&(v), val) + typedef struct { const char *home; const char *uri; @@ -52,6 +55,7 @@ typedef struct { uint32_t report_interval; uint32_t read_time; uint32_t elapsed_time; + uint32_t populate_threads;/* Number of populate threads. */ uint32_t read_threads; /* Number of read threads. */ uint32_t verbose; uint32_t stat_thread; /* Whether to create a stat thread. */ @@ -65,12 +69,21 @@ typedef struct { } CONFIG; /* Forward function definitions. */ +int execute_populate(CONFIG *); int execute_reads(CONFIG *); -int populate(CONFIG *); +int get_next_op(uint64_t *); +int lprintf(CONFIG *cfg, int err, uint32_t level, const char *fmt, ...) +#ifdef __GNUC__ + __attribute__((format (printf, 4, 5))) +#endif +; +void *populate_thread(void *); void print_config(CONFIG *); void *read_thread(void *); int setup_log_file(CONFIG *); +int start_threads(CONFIG *, int, pthread_t **, void *(*func)(void *)); void *stat_worker(void *); +int stop_threads(CONFIG *, int, pthread_t *); void usage(void); /* Default values - these are tiny, we want the basic run to be fast. */ @@ -78,7 +91,7 @@ CONFIG default_cfg = { "WT_TEST", /* home */ "lsm:test", /* uri */ "create,cache_size=200MB", /* conn_config */ - "key_format=S,value_format=S", /* table_config */ + "key_format=S,value_format=S,exclusive=true", /* table_config */ 1, /* create */ 14023954, /* rand_seed */ 5000, /* icount */ @@ -87,6 +100,7 @@ CONFIG default_cfg = { 2, /* report_interval */ 2, /* read_time */ 0, /* elapsed_time */ + 1, /* populate_threads */ 2, /* read_threads */ 0, /* verbose */ 0, /* stat_thread */ @@ -100,7 +114,7 @@ CONFIG small_cfg = { "WT_TEST", /* home */ "lsm:test", /* uri */ "create,cache_size=500MB", /* conn_config */ - "key_format=S,value_format=S,lsm_chunk_size=5MB," + "key_format=S,value_format=S,exclusive=true,lsm_chunk_size=5MB," "leaf_page_max=16k,internal_page_max=16kb", /* table_config */ 1, /* create */ 14023954, /* rand_seed */ @@ -110,6 +124,7 @@ CONFIG small_cfg = { 10, /* report_interval */ 20, /* read_time */ 0, /* elapsed_time */ + 1, /* populate_threads */ 8, /* read_threads */ 0, /* verbose */ 0, /* stat_thread */ @@ -123,7 +138,7 @@ CONFIG med_cfg = { "WT_TEST", /* home */ "lsm:test", /* uri */ "create,cache_size=1GB", /* conn_config */ - "key_format=S,value_format=S,lsm_chunk_size=20MB," + "key_format=S,value_format=S,exclusive=true,lsm_chunk_size=20MB," "leaf_page_max=16k,internal_page_max=16kb", /* table_config */ 1, /* create */ 14023954, /* rand_seed */ @@ -133,6 +148,7 @@ CONFIG med_cfg = { 20, /* report_interval */ 100, /* read_time */ 0, /* elapsed_time */ + 1, /* populate_threads */ 16, /* read_threads */ 0, /* verbose */ 0, /* stat_thread */ @@ -146,7 +162,7 @@ CONFIG large_cfg = { "WT_TEST", /* home */ "lsm:test", /* uri */ "create,cache_size=2GB", /* conn_config */ - "key_format=S,value_format=S,lsm_chunk_size=50MB," + "key_format=S,value_format=S,exclusive=true,lsm_chunk_size=50MB," "leaf_page_max=16k,internal_page_max=16kb", /* table_config */ 1, /* create */ 14023954, /* rand_seed */ @@ -156,6 +172,7 @@ CONFIG large_cfg = { 20, /* report_interval */ 600, /* read_time */ 0, /* elapsed_time */ + 1, /* populate_threads */ 16, /* read_threads */ 0, /* verbose */ 0, /* stat_thread */ @@ -169,9 +186,10 @@ const char *debug_cconfig = "verbose=[lsm]"; const char *debug_tconfig = ""; /* Global values shared by threads. */ -uint64_t nops; -int running; -int stat_running; +uint64_t g_nops; +int g_running; +int g_stat_running; +uint32_t g_threads_quit; /* For tracking threads that exit early. */ void * read_thread(void *arg) @@ -181,34 +199,125 @@ read_thread(void *arg) WT_SESSION *session; WT_CURSOR *cursor; char *key_buf; - int ret; + int ret, search_ret; + + session = NULL; + key_buf = NULL; cfg = (CONFIG *)arg; conn = cfg->conn; key_buf = calloc(cfg->key_sz, 1); - if (key_buf == NULL) - return (arg); + if (key_buf == NULL) { + ret = ENOMEM; + goto err; + } if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) { - fprintf(stderr, - "open_session failed in read thread: %d\n", ret); - return (NULL); + lprintf(cfg, ret, 0, + "open_session failed in read thread"); + goto err; } if ((ret = session->open_cursor(session, cfg->uri, NULL, NULL, &cursor)) != 0) { - fprintf(stderr, "open_cursor failed in read thread: %d\n", ret); - return (NULL); + lprintf(cfg, ret, 0, + "open_cursor failed in read thread"); + goto err; } - while (running) { - ++nops; - sprintf(key_buf, "%d", rand() % cfg->icount); + while (g_running) { + ++g_nops; + /* Get a value in range, avoid zero. */ + sprintf(key_buf, "%d", (rand() % (cfg->icount - 1)) + 1); cursor->set_key(cursor, key_buf); - cursor->search(cursor); + /* Report errors and continue. */ + if ((search_ret = cursor->search(cursor)) != 0) + lprintf(cfg, search_ret, 0, + "Search failed for: %s", key_buf); } - session->close(session, NULL); - free(key_buf); +err: if (ret != 0) + ++g_threads_quit; + if (session != NULL) + session->close(session, NULL); + if (key_buf != NULL) + free(key_buf); + return (arg); +} + +/* Retrieve an ID for the next insert operation. */ +int get_next_op(uint64_t *op) +{ + *op = ATOMIC_ADD(g_nops, 1); + return (0); +} + +void * +populate_thread(void *arg) +{ + CONFIG *cfg; + WT_CONNECTION *conn; + WT_CURSOR *cursor; + WT_SESSION *session; + char *data_buf, *key_buf; + int ret; + uint64_t op; + + cfg = (CONFIG *)arg; + conn = cfg->conn; + session = NULL; + data_buf = key_buf = NULL; + + cfg->phase = LSM_TEST_PERF_POP; + + data_buf = calloc(cfg->data_sz, 1); + if (data_buf == NULL) { + lprintf(cfg, ENOMEM, 0, "Populate data buffer"); + goto err; + } + key_buf = calloc(cfg->key_sz, 1); + if (key_buf == NULL) { + lprintf(cfg, ENOMEM, 0, "Populate key buffer"); + goto err; + } + + /* Open a session for the current thread's work. */ + if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) { + lprintf(cfg, ret, 0, + "Error opening a session on %s", cfg->home); + goto err; + } + + /* Can only use bulk load single threaded. */ + if ((ret = session->open_cursor( + session, cfg->uri, NULL, + cfg->populate_threads == 1 ? "bulk" : "", &cursor)) != 0) { + lprintf(cfg, ret, 0, "Error opening cursor %s", cfg->uri); + goto err; + } + + memset(data_buf, 'a', cfg->data_sz - 1); + cursor->set_value(cursor, data_buf); + /* Populate the database. */ + while (1) { + get_next_op(&op); + if (op > cfg->icount) + break; + sprintf(key_buf, "%"PRIu64, op); + cursor->set_key(cursor, key_buf); + if ((ret = cursor->insert(cursor)) != 0) { + lprintf(cfg, ret, 0, "Failed inserting"); + goto err; + } + } + /* To ensure managing thread knows if we exited early. */ +err: if (ret != 0) + ++g_threads_quit; + if (session != NULL) + session->close(session, NULL); + if (data_buf) + free(data_buf); + if (key_buf) + free(key_buf); return (arg); } @@ -226,193 +335,192 @@ stat_worker(void *arg) struct timeval e; uint64_t value; + session = NULL; cfg = (CONFIG *)arg; conn = cfg->conn; lsm_uri = NULL; if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) { - fprintf(stderr, - "open_session failed in read thread: %d\n", ret); - return (NULL); + lprintf(cfg, ret, 0, + "open_session failed in statistics thread."); + goto err; } if (strncmp(cfg->uri, "lsm:", strlen("lsm:")) == 0) { lsm_uri = calloc( strlen(cfg->uri) + strlen("statistics:") + 1, 1); if (lsm_uri == NULL) { - fprintf(stderr, "No memory in stat thread.\n"); + lprintf(cfg, ENOMEM, 0, "Statistics thread uri create."); goto err; } sprintf(lsm_uri, "statistics:%s", cfg->uri); } - while (stat_running) { + while (g_stat_running) { sleep(cfg->report_interval); /* Generic header. */ - fprintf(cfg->logf, "=======================================\n"); + lprintf(cfg, 0, cfg->verbose, + "======================================="); gettimeofday(&e, NULL); secs = e.tv_sec + e.tv_usec / 1000000.0; secs -= (cfg->phase_start_time.tv_sec + cfg->phase_start_time.tv_usec / 1000000.0); if (secs == 0) ++secs; - fprintf(cfg->logf, - "%s completed: %" PRIu64", elapsed time: %.2f\n", + lprintf(cfg, 0, cfg->verbose, + "%s completed: %" PRIu64", elapsed time: %.2f", cfg->phase == LSM_TEST_PERF_READ ? "reads" : "inserts", - nops, secs); + g_nops, secs); /* Report LSM tree stats, if using LSM. */ if (lsm_uri != NULL) { if ((ret = session->open_cursor(session, lsm_uri, NULL, NULL, &cursor)) != 0) { - fprintf(stderr, - "open_cursor LSM statistics: %d\n", ret); + lprintf(cfg, ret, 0, + "open_cursor failed in LSM statistics"); goto err; } while ( (ret = cursor->next(cursor)) == 0 && (ret = cursor->get_value( cursor, &desc, &pvalue, &value)) == 0) - fprintf(cfg->logf, - "stat:lsm: %s=%s\n", desc, pvalue); + lprintf(cfg, 0, cfg->verbose, + "stat:lsm: %s=%s", desc, pvalue); cursor->close(cursor); - fprintf(cfg->logf, "\n"); + lprintf(cfg, 0, cfg->verbose, "-----------------"); } /* Dump the connection statistics since last time. */ if ((ret = session->open_cursor(session, "statistics:", NULL, "statistics_clear", &cursor)) != 0) { - fprintf(stderr, - "open_cursor statistics: %d\n", ret); + lprintf(cfg, ret, 0, + "open_cursor failed in statistics"); goto err; } while ( (ret = cursor->next(cursor)) == 0 && (ret = cursor->get_value( cursor, &desc, &pvalue, &value)) == 0) - fprintf(cfg->logf, - "stat:conn: %s=%s\n", desc, pvalue); + lprintf(cfg, 0, cfg->verbose, + "stat:conn: %s=%s", desc, pvalue); cursor->close(cursor); } -err: session->close(session, NULL); +err: if (session != NULL) + session->close(session, NULL); if (lsm_uri != NULL) free(lsm_uri); return (arg); } -int populate(CONFIG *cfg) +int execute_populate(CONFIG *cfg) { WT_CONNECTION *conn; - WT_CURSOR *cursor; WT_SESSION *session; - char *data_buf, *key_buf; + pthread_t *threads; double secs; int ret; + uint64_t elapsed, last_ops; struct timeval e; conn = cfg->conn; - cfg->phase = LSM_TEST_PERF_POP; - if (cfg->verbose > 0) - fprintf(cfg->logf, "Starting bulk load\n"); - - data_buf = calloc(cfg->data_sz, 1); - if (data_buf == NULL) - return (ENOMEM); - key_buf = calloc(cfg->key_sz, 1); - if (key_buf == NULL) - return (ENOMEM); + lprintf(cfg, 0, 1, "Starting populate threads"); - /* Open a session for the current thread's work. */ + /* First create the table. */ if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) { - fprintf(stderr, "Error opening a session on %s: %s\n", - cfg->home, wiredtiger_strerror(ret)); + lprintf(cfg, ret, 0, + "Error opening a session on %s", cfg->home); return (ret); } if ((ret = session->create( session, cfg->uri, cfg->table_config)) != 0) { - fprintf(stderr, "Error creating table %s: %s\n", - cfg->uri, wiredtiger_strerror(ret)); + lprintf(cfg, ret, 0, "Error creating table %s", cfg->uri); + session->close(session, NULL); return (ret); } + session->close(session, NULL); - if ((ret = session->open_cursor( - session, cfg->uri, NULL, "bulk", &cursor)) != 0) { - fprintf(stderr, "Error opening cursor %s: %s\n", - cfg->uri, wiredtiger_strerror(ret)); + if ((ret = start_threads( + cfg, cfg->populate_threads, &threads, populate_thread)) != 0) return (ret); - } - memset(data_buf, 'a', cfg->data_sz - 1); - cursor->set_value(cursor, data_buf); - /* Populate the database. */ gettimeofday(&cfg->phase_start_time, NULL); - for (nops = 0; nops < cfg->icount; nops++) { - if (cfg->verbose > 0) { - if (nops % 1000000 == 0) - printf("."); - if (nops % 50000000 == 0) - printf("\n"); - } - sprintf(key_buf, "%"PRIu64, nops); - cursor->set_key(cursor, key_buf); - if ((ret = cursor->insert(cursor)) != 0) { - fprintf(stderr, "Failed inserting with: %d\n", ret); - return (ret); + for (cfg->elapsed_time = 0, last_ops = 0; + g_nops < cfg->icount && g_threads_quit < cfg->populate_threads;) { + /* + * Sleep for 100th of a second, report_interval is in second + * granularity, so adjust accordingly. + */ + usleep(10000); + elapsed += 1; + if (elapsed % 100 == 0 && + (elapsed / 100) % cfg->report_interval == 0) { + lprintf(cfg, 0, 1, "%" PRIu64 " ops in %d secs", + g_nops - last_ops, cfg->report_interval); + last_ops = g_nops; } } - gettimeofday(&e, NULL); - cursor->close(cursor); - session->close(session, NULL); - if (cfg->verbose > 0) { - fprintf(cfg->logf, - "Finished bulk load of %d items\n", cfg->icount); - secs = e.tv_sec + e.tv_usec / 1000000.0; - secs -= (cfg->phase_start_time.tv_sec + - cfg->phase_start_time.tv_usec / 1000000.0); - if (secs == 0) - ++secs; - fprintf(cfg->logf, - "Load time: %.2f\n" "load ops/sec: %.2f\n", - secs, cfg->icount / secs); + if (g_threads_quit == cfg->populate_threads) { + lprintf(cfg, WT_ERROR, 0, + "Populate threads exited without finishing."); + return (WT_ERROR); } + gettimeofday(&e, NULL); - free(data_buf); - free(key_buf); - return (ret); + if ((ret = stop_threads(cfg, cfg->populate_threads, threads)) != 0) + return (ret); + + lprintf(cfg, 0, 1, + "Finished bulk load of %d items", cfg->icount); + secs = e.tv_sec + e.tv_usec / 1000000.0; + secs -= (cfg->phase_start_time.tv_sec + + cfg->phase_start_time.tv_usec / 1000000.0); + if (secs == 0) + ++secs; + lprintf(cfg, 0, 1, + "Load time: %.2f\n" "load ops/sec: %.2f", + secs, cfg->icount / secs); + + return (0); } -/* Setup the logging output mechanism. */ -int setup_log_file(CONFIG *cfg) +int execute_reads(CONFIG *cfg) { - char *fname; - int offset; + pthread_t *threads; + int ret; + uint64_t last_ops; - if (cfg->verbose < 1 && cfg->stat_thread == 0) - return (0); + cfg->phase = LSM_TEST_PERF_READ; + lprintf(cfg, 0, 1, "Starting read threads"); - if ((fname = calloc(strlen(cfg->home) + - strlen(cfg->uri) + strlen(".stat") + 1, 1)) == NULL) { - fprintf(stderr, "No memory in stat thread\n"); - return (ENOMEM); + if ((ret = start_threads( + cfg, cfg->read_threads, &threads, read_thread)) != 0) + return (ret); + + /* Sanity check reporting interval. */ + if (cfg->report_interval > cfg->read_time) + cfg->report_interval = cfg->read_time; + + gettimeofday(&cfg->phase_start_time, NULL); + for (cfg->elapsed_time = 0, last_ops = 0; + cfg->elapsed_time < cfg->read_time && + g_threads_quit < cfg->read_threads; + cfg->elapsed_time += cfg->report_interval) { + sleep(cfg->report_interval); + lprintf(cfg, 0, 1, "%" PRIu64 " ops in %d secs", + g_nops - last_ops, cfg->report_interval); + last_ops = g_nops; } - for (offset = 0; - cfg->uri[offset] != 0 && cfg->uri[offset] != ':'; - offset++) {} - if (cfg->uri[offset] == 0) - offset = 0; - else - ++offset; - sprintf(fname, "%s/%s.stat", cfg->home, cfg->uri + offset); - if ((cfg->logf = fopen(fname, "w")) == NULL) { - fprintf(stderr, "Statistics failed to open log file.\n"); - return (EINVAL); + if (g_threads_quit == cfg->populate_threads) { + lprintf(cfg, WT_ERROR, 0, + "Populate threads exited without finishing."); + return (WT_ERROR); } - /* Use line buffering for the log file. */ - (void)setvbuf(cfg->logf, NULL, _IOLBF, 0); - if (fname != NULL) - free(fname); + + if ((ret = stop_threads(cfg, cfg->read_threads, threads)) != 0) + return (ret); + return (0); } @@ -421,7 +529,7 @@ int main(int argc, char **argv) CONFIG cfg; WT_CONNECTION *conn; const char *user_cconfig, *user_tconfig; - const char *opts = "C:R:T:d:eh:i:k:lr:s:u:v:SML"; + const char *opts = "C:P:R:T:d:eh:i:k:lr:s:u:v:SML"; char *cc_buf, *tc_buf; int ch, ret, stat_created; pthread_t stat; @@ -468,7 +576,7 @@ int main(int argc, char **argv) cfg.home = optarg; break; case 'i': - cfg.icount = atoi(optarg) * 1000; + cfg.icount = atoi(optarg); break; case 'k': cfg.key_sz = atoi(optarg); @@ -491,6 +599,9 @@ int main(int argc, char **argv) case 'C': user_cconfig = optarg; break; + case 'P': + cfg.populate_threads = atoi(optarg); + break; case 'R': cfg.read_threads = atoi(optarg); break; @@ -556,46 +667,43 @@ int main(int argc, char **argv) /* Open a connection to the database, creating it if necessary. */ if ((ret = wiredtiger_open( cfg.home, NULL, cfg.conn_config, &conn)) != 0) { - fprintf(stderr, "Error connecting to %s: %s\n", - cfg.home, wiredtiger_strerror(ret)); + lprintf(&cfg, ret, 0, "Error connecting to %s", cfg.home); goto err; } cfg.conn = conn; if (cfg.stat_thread) { - stat_running = 1; + g_stat_running = 1; if ((ret = pthread_create( &stat, NULL, stat_worker, &cfg)) != 0) { - fprintf(stderr, "Error creating statistics thread.\n"); + lprintf(&cfg, ret, 0, + "Error creating statistics thread."); goto err; } stat_created = 1; } - if (cfg.create != 0 && (ret = populate(&cfg)) != 0) + if (cfg.create != 0 && execute_populate(&cfg) != 0) goto err; - if (cfg.read_time != 0 && cfg.read_threads != 0) - if ((ret = execute_reads(&cfg)) != 0) + if (cfg.read_time != 0 && cfg.read_threads != 0 && + (ret = execute_reads(&cfg)) != 0) goto err; - if (cfg.verbose > 0) { - fprintf(cfg.logf, - "Ran performance test example with %d threads for %d seconds.\n", - cfg.read_threads, cfg.read_time); - fprintf(cfg.logf, - "Executed %" PRIu64 " read operations\n", nops); - } + lprintf(&cfg, 0, 1, + "Ran performance test example with %d threads for %d seconds.", + cfg.read_threads, cfg.read_time); + lprintf(&cfg, 0, 1, + "Executed %" PRIu64 " read operations", g_nops); - if (cfg.stat_thread) - stat_running = 0; +err: if (cfg.stat_thread) + g_stat_running = 0; - /* Cleanup. */ -err: if (stat_created != 0 && (ret = pthread_join(stat, NULL)) != 0) - fprintf(stderr, "Error joining stat thread: %d.\n", ret); + if (stat_created != 0 && (ret = pthread_join(stat, NULL)) != 0) + lprintf(&cfg, ret, 0, "Error joining stat thread."); if (conn != NULL && (ret = conn->close(conn, NULL)) != 0) - fprintf(stderr, "Error connecting to %s: %s\n", - cfg.home, wiredtiger_strerror(ret)); + lprintf(&cfg, ret, 0, + "Error closing connection to %s", cfg.home); if (cc_buf != NULL) free(cc_buf); if (tc_buf != NULL) @@ -608,60 +716,112 @@ err: if (stat_created != 0 && (ret = pthread_join(stat, NULL)) != 0) return (ret); } -int execute_reads(CONFIG *cfg) +/* + * Following are utility functions. + */ +int +start_threads(CONFIG *cfg, int num, pthread_t **threadsp, void *(*func)(void *)) { - pthread_t *read_threads; - int ret; - uint32_t i; - uint64_t last_ops; - - cfg->phase = LSM_TEST_PERF_READ; - if (cfg->verbose > 0) - fprintf(cfg->logf, "Starting read threads\n"); - - running = 1; - nops = 0; - - read_threads = calloc(cfg->read_threads, sizeof(pthread_t *)); - if (read_threads == NULL) + pthread_t *threads; + int i, ret; + + g_running = 1; + g_nops = 0; + g_threads_quit = 0; + threads = calloc(num, sizeof(pthread_t *)); + if (threads == NULL) return (ENOMEM); - for (i = 0; i < cfg->read_threads; i++) { + for (i = 0; i < num; i++) { if ((ret = pthread_create( - &read_threads[i], NULL, read_thread, cfg)) != 0) { - fprintf(stderr, "Error creating thread: %d\n", i); + &threads[i], NULL, func, cfg)) != 0) { + g_running = 0; + lprintf(cfg, ret, 0, "Error creating thread: %d", i); return (ret); } } + *threadsp = threads; + return (0); +} - /* Sanity check reporting interval. */ - if (cfg->report_interval > cfg->read_time) - cfg->report_interval = cfg->read_time; +int +stop_threads(CONFIG *cfg, int num, pthread_t *threads) +{ + int i, ret; + g_running = 0; - gettimeofday(&cfg->phase_start_time, NULL); - for (cfg->elapsed_time = 0, last_ops = 0; - cfg->elapsed_time < cfg->read_time; - cfg->elapsed_time += cfg->report_interval) { - sleep(cfg->report_interval); - if (cfg->verbose > 0) { - fprintf(cfg->logf, "%" PRIu64 " ops in %d secs\n", - nops - last_ops, cfg->report_interval); - printf("%" PRIu64 " ops in %d secs\n", - nops - last_ops, cfg->report_interval); + for (i = 0; i < num; i++) { + if ((ret = pthread_join(threads[i], NULL)) != 0) { + lprintf(cfg, ret, 0, "Error joining thread %d", i); + return (ret); } - last_ops = nops; } - running = 0; - for (i = 0; i < cfg->read_threads; i++) { - if ((ret = pthread_join(read_threads[i], NULL)) != 0) { - fprintf(stderr, "Error joining thread %d\n", i); - return (ret); - } + free(threads); + return (0); +} + +/* + * Log printf - output a log message. + */ +int lprintf(CONFIG *cfg, int err, uint32_t level, const char *fmt, ...) +{ + va_list ap; + + if (err == 0 && level <= cfg->verbose) { + va_start(ap, fmt); + vfprintf(cfg->logf, fmt, ap); + va_end(ap); + fprintf(cfg->logf, "\n"); } + if (err == 0) + return (0); - if (read_threads != NULL) - free(read_threads); - return (ret); + /* We are dealing with an error. */ + va_start(ap, fmt); + vfprintf(cfg->logf, fmt, ap); + va_end(ap); + fprintf(cfg->logf, " Error: %s\n", wiredtiger_strerror(err)); + if (cfg->logf != stderr) { + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, " Error: %s\n", wiredtiger_strerror(err)); + } + + return (0); +} + +/* Setup the logging output mechanism. */ +int setup_log_file(CONFIG *cfg) +{ + char *fname; + int offset; + + if (cfg->verbose < 1 && cfg->stat_thread == 0) + return (0); + + if ((fname = calloc(strlen(cfg->home) + + strlen(cfg->uri) + strlen(".stat") + 1, 1)) == NULL) { + fprintf(stderr, "No memory in stat thread\n"); + return (ENOMEM); + } + for (offset = 0; + cfg->uri[offset] != 0 && cfg->uri[offset] != ':'; + offset++) {} + if (cfg->uri[offset] == 0) + offset = 0; + else + ++offset; + sprintf(fname, "%s/%s.stat", cfg->home, cfg->uri + offset); + if ((cfg->logf = fopen(fname, "w")) == NULL) { + fprintf(stderr, "Statistics failed to open log file.\n"); + return (EINVAL); + } + /* Use line buffering for the log file. */ + (void)setvbuf(cfg->logf, NULL, _IOLBF, 0); + if (fname != NULL) + free(fname); + return (0); } void print_config(CONFIG *cfg) @@ -673,8 +833,11 @@ void print_config(CONFIG *cfg) printf("\t Table configuration: %s\n", cfg->table_config); printf("\t %s\n", cfg->create ? "Creating" : "Using existing"); printf("\t Random seed: %d\n", cfg->rand_seed); - if (cfg->create) + if (cfg->create) { printf("\t Insert count: %d\n", cfg->icount); + printf("\t Number populate threads: %d\n", + cfg->populate_threads); + } printf("\t key size: %d data size: %d\n", cfg->key_sz, cfg->data_sz); printf("\t Reporting interval: %d\n", cfg->report_interval); printf("\t Read workload period: %d\n", cfg->read_time); @@ -684,17 +847,18 @@ void print_config(CONFIG *cfg) void usage(void) { - printf("ex_perf_test [-CDLMRSTdehikrsuv]\n"); + printf("ex_perf_test [-CDLMPRSTdehikrsuv]\n"); printf("\t-S Use a small default configuration\n"); printf("\t-M Use a medium default configuration\n"); printf("\t-L Use a large default configuration\n"); printf("\t-C <string> additional connection configuration\n"); + printf("\t-P <int> number of populate threads\n"); printf("\t-R <int> number of read threads\n"); printf("\t-T <string> additional table configuration\n"); printf("\t-d <int> data item size\n"); printf("\t-e use existing database (skip population phase)\n"); printf("\t-h <string> Wired Tiger home must exist, default WT_TEST \n"); - printf("\t-i <int> number of records to insert in thousands\n"); + printf("\t-i <int> number of records to insert\n"); printf("\t-k <int> key item size\n"); printf("\t-r <int> number of seconds to run read phase\n"); printf("\t-s <int> seed for random number generator\n"); |