/*- * Public Domain 2014-2017 MongoDB, Inc. * Public Domain 2008-2014 WiredTiger, Inc. * * This is free and unencumbered software released into the public domain. * * Anyone is free to copy, modify, publish, use, compile, sell, or * distribute this software, either in source code form or as a compiled * binary, for any purpose, commercial or non-commercial, and by any * means. * * In jurisdictions that recognize copyright laws, the author or authors * of this software dedicate any and all copyright interest in the * software to the public domain. We make this dedication for the benefit * of the public at large and to the detriment of our heirs and * successors. We intend this dedication to be an overt act of * relinquishment in perpetuity of all present and future rights to this * software under copyright law. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR * OTHER DEALINGS IN THE SOFTWARE. */ #include "test_checkpoint.h" static int real_worker(void); static WT_THREAD_RET worker(void *); /* * create_table -- * Create a WiredTiger table of the configured type for this cookie. */ static int create_table(WT_SESSION *session, COOKIE *cookie) { int ret; char config[128]; testutil_check(__wt_snprintf(config, sizeof(config), "key_format=%s,value_format=S,%s", cookie->type == COL ? "r" : "q", cookie->type == LSM ? ",type=lsm" : "")); if ((ret = session->create(session, cookie->uri, config)) != 0) if (ret != EEXIST) return (log_print_err("session.create", ret, 1)); ++g.ntables_created; return (0); } /* * start_workers -- * Setup the configuration for the tables being populated, then start * the worker thread(s) and wait for them to finish. */ int start_workers(table_type type) { WT_SESSION *session; struct timeval start, stop; double seconds; wt_thread_t *tids; int i, ret; ret = 0; /* Create statistics and thread structures. */ if ((tids = calloc((size_t)(g.nworkers), sizeof(*tids))) == NULL) return (log_print_err("calloc", errno, 1)); if ((ret = g.conn->open_session(g.conn, NULL, NULL, &session)) != 0) { (void)log_print_err("conn.open_session", ret, 1); goto err; } /* Setup the cookies */ for (i = 0; i < g.ntables; ++i) { g.cookies[i].id = i; if (type == MIX) g.cookies[i].type = (table_type)((i % MAX_TABLE_TYPE) + 1); else g.cookies[i].type = type; testutil_check(__wt_snprintf( g.cookies[i].uri, sizeof(g.cookies[i].uri), "%s%04d", URI_BASE, g.cookies[i].id)); /* Should probably be atomic to avoid races. */ if ((ret = create_table(session, &g.cookies[i])) != 0) goto err; } (void)gettimeofday(&start, NULL); /* Create threads. */ for (i = 0; i < g.nworkers; ++i) testutil_check(__wt_thread_create( NULL, &tids[i], worker, &g.cookies[i])); /* Wait for the threads. */ for (i = 0; i < g.nworkers; ++i) testutil_check(__wt_thread_join(NULL, tids[i])); (void)gettimeofday(&stop, NULL); seconds = (stop.tv_sec - start.tv_sec) + (stop.tv_usec - start.tv_usec) * 1e-6; printf("Ran workers for: %f seconds\n", seconds); err: free(tids); return (ret); } /* * worker_op -- * Write operation. */ static inline int worker_op(WT_CURSOR *cursor, uint64_t keyno, u_int new_val) { int ret; char valuebuf[64]; cursor->set_key(cursor, keyno); testutil_check(__wt_snprintf( valuebuf, sizeof(valuebuf), "%037u", new_val)); cursor->set_value(cursor, valuebuf); if ((ret = cursor->insert(cursor)) != 0) { if (ret == WT_ROLLBACK) return (WT_ROLLBACK); return (log_print_err("cursor.insert", ret, 1)); } return (0); } /* * worker -- * Worker thread start function. */ static WT_THREAD_RET worker(void *arg) { char tid[128]; WT_UNUSED(arg); testutil_check(__wt_thread_id(tid, sizeof(tid))); printf("worker thread starting: tid: %s\n", tid); (void)real_worker(); return (WT_THREAD_RET_VALUE); } /* * real_worker -- * A single worker thread that transactionally updates all tables with * consistent values. */ static int real_worker(void) { WT_CURSOR **cursors; WT_SESSION *session; WT_RAND_STATE rnd; u_int i, keyno; int j, ret, t_ret; ret = t_ret = 0; __wt_random_init(&rnd); if ((cursors = calloc( (size_t)(g.ntables), sizeof(WT_CURSOR *))) == NULL) return (log_print_err("malloc", ENOMEM, 1)); if ((ret = g.conn->open_session( g.conn, NULL, "isolation=snapshot", &session)) != 0) { (void)log_print_err("conn.open_session", ret, 1); goto err; } for (j = 0; j < g.ntables; j++) if ((ret = session->open_cursor(session, g.cookies[j].uri, NULL, NULL, &cursors[j])) != 0) { (void)log_print_err("session.open_cursor", ret, 1); goto err; } for (i = 0; i < g.nops && g.running; ++i, __wt_yield()) { if ((ret = session->begin_transaction(session, NULL)) != 0) { (void)log_print_err( "real_worker:begin_transaction", ret, 1); goto err; } keyno = __wt_random(&rnd) % g.nkeys + 1; for (j = 0; j < g.ntables; j++) { if ((ret = worker_op(cursors[j], keyno, i)) != 0) break; } if (ret == 0) { if ((ret = session->commit_transaction( session, NULL)) != 0) { (void)log_print_err( "real_worker:commit_transaction", ret, 1); goto err; } } else if (ret == WT_ROLLBACK) { if ((ret = session->rollback_transaction( session, NULL)) != 0) { (void)log_print_err( "real_worker:rollback_transaction", ret, 1); goto err; } } else { (void)log_print_err("worker op failed", ret, 1); goto err; } } err: if ((t_ret = session->close(session, NULL)) != 0 && ret == 0) { ret = t_ret; (void)log_print_err("session.close", ret, 1); } free(cursors); return (ret); }