summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/test/checkpoint/workers.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/test/checkpoint/workers.c')
-rw-r--r--src/third_party/wiredtiger/test/checkpoint/workers.c238
1 files changed, 238 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/test/checkpoint/workers.c b/src/third_party/wiredtiger/test/checkpoint/workers.c
new file mode 100644
index 00000000000..5cd2ef4e97b
--- /dev/null
+++ b/src/third_party/wiredtiger/test/checkpoint/workers.c
@@ -0,0 +1,238 @@
+/*-
+ * Public Domain 2014-2015 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include "test_checkpoint.h"
+
+static int real_worker(void);
+static void *worker(void *);
+
+/*
+ * create_table --
+ * Create a WiredTiger table of the configured type for this cookie.
+ */
+static int
+create_table(WT_SESSION *session, COOKIE *cookie)
+{
+ int ret;
+ char *p, *end, config[128];
+
+ p = config;
+ end = config + sizeof(config);
+ p += snprintf(p, (size_t)(end - p),
+ "key_format=%s,value_format=S",
+ cookie->type == COL ? "r" : "q");
+ if (cookie->type == LSM)
+ (void)snprintf(p, (size_t)(end - p), ",type=lsm");
+
+ if ((ret = session->create(session, cookie->uri, config)) != 0)
+ if (ret != EEXIST)
+ return (log_print_err("session.create", ret, 1));
+ ++g.ntables_created;
+ return (0);
+}
+
+/*
+ * start_workers --
+ * Setup the configuration for the tables being populated, then start
+ * the worker thread(s) and wait for them to finish.
+ */
+int
+start_workers(table_type type)
+{
+ WT_SESSION *session;
+ struct timeval start, stop;
+ double seconds;
+ pthread_t *tids;
+ int i, ret;
+ void *thread_ret;
+
+ ret = 0;
+
+ /* Create statistics and thread structures. */
+ if ((tids = calloc((size_t)(g.nworkers), sizeof(*tids))) == NULL)
+ return (log_print_err("calloc", errno, 1));
+
+ if ((ret = g.conn->open_session(g.conn, NULL, NULL, &session)) != 0) {
+ (void)log_print_err("conn.open_session", ret, 1);
+ goto err;
+ }
+ /* Setup the cookies */
+ for (i = 0; i < g.ntables; ++i) {
+ g.cookies[i].id = i;
+ if (type == MIX)
+ g.cookies[i].type =
+ (table_type)((i % MAX_TABLE_TYPE) + 1);
+ else
+ g.cookies[i].type = type;
+ (void)snprintf(g.cookies[i].uri, 128,
+ "%s%04d", URI_BASE, g.cookies[i].id);
+
+ /* Should probably be atomic to avoid races. */
+ if ((ret = create_table(session, &g.cookies[i])) != 0)
+ goto err;
+ }
+
+ (void)gettimeofday(&start, NULL);
+
+ /* Create threads. */
+ for (i = 0; i < g.nworkers; ++i) {
+ if ((ret = pthread_create(
+ &tids[i], NULL, worker, &g.cookies[i])) != 0) {
+ (void)log_print_err("pthread_create", ret, 1);
+ goto err;
+ }
+ }
+
+ /* Wait for the threads. */
+ for (i = 0; i < g.nworkers; ++i)
+ (void)pthread_join(tids[i], &thread_ret);
+
+ (void)gettimeofday(&stop, NULL);
+ seconds = (stop.tv_sec - start.tv_sec) +
+ (stop.tv_usec - start.tv_usec) * 1e-6;
+ printf("Ran workers for: %f seconds\n", seconds);
+
+err: free(tids);
+
+ return (ret);
+}
+
+/*
+ * worker_op --
+ * Write operation.
+ */
+static inline int
+worker_op(WT_CURSOR *cursor, uint64_t keyno, u_int new_val)
+{
+ int ret;
+ char valuebuf[64];
+
+ cursor->set_key(cursor, keyno);
+ (void)snprintf(
+ valuebuf, sizeof(valuebuf), "%037u", new_val);
+ cursor->set_value(cursor, valuebuf);
+ if ((ret = cursor->insert(cursor)) != 0) {
+ if (ret == WT_ROLLBACK)
+ return (WT_ROLLBACK);
+ return (log_print_err("cursor.insert", ret, 1));
+ }
+ return (0);
+}
+
+/*
+ * worker --
+ * Worker thread start function.
+ */
+static void *
+worker(void *arg)
+{
+ char tid[128];
+
+ WT_UNUSED(arg);
+
+ __wt_thread_id(tid, sizeof(tid));
+ printf("worker thread starting: tid: %s\n", tid);
+
+ (void)real_worker();
+ return (NULL);
+}
+
+/*
+ * real_worker --
+ * A single worker thread that transactionally updates all tables with
+ * consistent values.
+ */
+static int
+real_worker(void)
+{
+ WT_CURSOR **cursors;
+ WT_SESSION *session;
+ WT_RAND_STATE rnd;
+ u_int i, keyno;
+ int j, ret, t_ret;
+
+ ret = t_ret = 0;
+
+ __wt_random_init(&rnd);
+
+ if ((cursors = calloc(
+ (size_t)(g.ntables), sizeof(WT_CURSOR *))) == NULL)
+ return (log_print_err("malloc", ENOMEM, 1));
+
+ if ((ret = g.conn->open_session(
+ g.conn, NULL, "isolation=snapshot", &session)) != 0) {
+ (void)log_print_err("conn.open_session", ret, 1);
+ goto err;
+ }
+
+ for (j = 0; j < g.ntables; j++)
+ if ((ret = session->open_cursor(session,
+ g.cookies[j].uri, NULL, NULL, &cursors[j])) != 0) {
+ (void)log_print_err("session.open_cursor", ret, 1);
+ goto err;
+ }
+
+ for (i = 0; i < g.nops && g.running; ++i, sched_yield()) {
+ if ((ret = session->begin_transaction(session, NULL)) != 0) {
+ (void)log_print_err(
+ "real_worker:begin_transaction", ret, 1);
+ goto err;
+ }
+ keyno = __wt_random(&rnd) % g.nkeys + 1;
+ for (j = 0; j < g.ntables; j++) {
+ if ((ret = worker_op(cursors[j], keyno, i)) != 0)
+ break;
+ }
+ if (ret == 0) {
+ if ((ret = session->commit_transaction(
+ session, NULL)) != 0) {
+ (void)log_print_err(
+ "real_worker:commit_transaction", ret, 1);
+ goto err;
+ }
+ } else if (ret == WT_ROLLBACK) {
+ if ((ret = session->rollback_transaction(
+ session, NULL)) != 0) {
+ (void)log_print_err(
+ "real_worker:rollback_transaction", ret, 1);
+ goto err;
+ }
+ } else {
+ (void)log_print_err("worker op failed", ret, 1);
+ goto err;
+ }
+ }
+
+err: if ((t_ret = session->close(session, NULL)) != 0 && ret == 0) {
+ ret = t_ret;
+ (void)log_print_err("session.close", ret, 1);
+ }
+ free(cursors);
+
+ return (ret);
+}