summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/cursor/cur_join.c3
-rw-r--r--test/csuite/Makefile.am3
-rw-r--r--test/csuite/wt2323_join_visibility/main.c404
-rw-r--r--test/suite/test_join06.py19
4 files changed, 420 insertions, 9 deletions
diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c
index f6cd5d90f62..601bb593240 100644
--- a/src/cursor/cur_join.c
+++ b/src/cursor/cur_join.c
@@ -916,6 +916,9 @@ __curjoin_init_next(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
"join cursor has not yet been joined with any other "
"cursors");
+ /* Get a consistent view of our subordinate cursors if appropriate. */
+ WT_RET(__wt_txn_cursor_op(session));
+
if (F_ISSET((WT_CURSOR *)cjoin, WT_CURSTD_RAW))
config = &raw_cfg[0];
else
diff --git a/test/csuite/Makefile.am b/test/csuite/Makefile.am
index 097468f0e85..b8a62b69612 100644
--- a/test/csuite/Makefile.am
+++ b/test/csuite/Makefile.am
@@ -11,6 +11,9 @@ noinst_PROGRAMS = test_wt1965_col_efficiency
test_wt2246_col_append_SOURCES = wt2246_col_append/main.c
noinst_PROGRAMS += test_wt2246_col_append
+test_wt2323_join_visibility_SOURCES = wt2323_join_visibility/main.c
+noinst_PROGRAMS += test_wt2323_join_visibility
+
test_wt2535_insert_race_SOURCES = wt2535_insert_race/main.c
noinst_PROGRAMS += test_wt2535_insert_race
diff --git a/test/csuite/wt2323_join_visibility/main.c b/test/csuite/wt2323_join_visibility/main.c
new file mode 100644
index 00000000000..fa499ee3396
--- /dev/null
+++ b/test/csuite/wt2323_join_visibility/main.c
@@ -0,0 +1,404 @@
+/*-
+ * Public Domain 2014-2016 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+#include "test_util.h"
+
+/*
+ * JIRA ticket reference: WT-2323
+ *
+ * Test case description: We create two kinds of threads that race: One kind
+ * is populating/updating records in a table with a few indices, the other
+ * is reading from a join of that table. The hope in constructing this test
+ * was to have the updates interleaved between reads of multiple indices by
+ * the join, yielding an inconsistent view of the data. In the main table,
+ * we insert account records, with a positive or negative balance. The
+ * negative balance accounts always have a flag set to non-zero, positive
+ * balances have the flag set to zero. The join we do is:
+ *
+ * select (*) from account where account.postal_code = '54321' and
+ * account.balance < 0 and account.flags == 0
+ *
+ * which should always yield no results.
+ *
+ * Failure mode: This test never actually failed with any combination of
+ * parameters, with N_INSERT up to 50000000. It seems that a snapshot is
+ * implicitly allocated in the session used by a join by the set_key calls
+ * that occur before the first 'next' of the join cursor is done. Despite
+ * that, the test seems interesting enough to keep around, with the number
+ * of inserts set low as a default.
+ */
+
+void (*custom_die)(void) = NULL;
+
+#define N_RECORDS 10000
+#define N_INSERT 500000
+#define N_INSERT_THREAD 2
+#define N_JOIN_THREAD 2
+#define S64 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789::"
+#define S1024 (S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64)
+
+typedef struct {
+ char posturi[256];
+ char baluri[256];
+ char flaguri[256];
+ char joinuri[256];
+ bool bloom;
+ bool remove;
+} SHARED_OPTS;
+
+typedef struct {
+ TEST_OPTS *testopts;
+ SHARED_OPTS *sharedopts;
+ int threadnum;
+ int nthread;
+ int done;
+ int joins;
+ int removes;
+ int inserts;
+ int notfounds;
+ int rollbacks;
+} THREAD_ARGS;
+
+static void *thread_insert(void *);
+static void *thread_join(void *);
+static int test_join(TEST_OPTS *, SHARED_OPTS *, bool, bool);
+
+int
+main(int argc, char *argv[])
+{
+ SHARED_OPTS *sharedopts, _sharedopts;
+ TEST_OPTS *opts, _opts;
+ const char *tablename;
+
+ opts = &_opts;
+ sharedopts = &_sharedopts;
+ memset(opts, 0, sizeof(*opts));
+ memset(sharedopts, 0, sizeof(*sharedopts));
+
+ testutil_check(testutil_parse_opts(argc, argv, opts));
+ testutil_make_work_dir(opts->home);
+
+ tablename = strchr(opts->uri, ':');
+ testutil_assert(tablename != NULL);
+ tablename++;
+ snprintf(sharedopts->posturi, sizeof(sharedopts->posturi),
+ "index:%s:post", tablename);
+ snprintf(sharedopts->baluri, sizeof(sharedopts->baluri),
+ "index:%s:bal", tablename);
+ snprintf(sharedopts->flaguri, sizeof(sharedopts->flaguri),
+ "index:%s:flag", tablename);
+ snprintf(sharedopts->joinuri, sizeof(sharedopts->joinuri),
+ "join:%s", opts->uri);
+
+ testutil_check(wiredtiger_open(opts->home, NULL,
+ "create,cache_size=1G", &opts->conn));
+
+ testutil_check(test_join(opts, sharedopts, true, true));
+ testutil_check(test_join(opts, sharedopts, true, false));
+ testutil_check(test_join(opts, sharedopts, false, true));
+ testutil_check(test_join(opts, sharedopts, false, false));
+
+ testutil_cleanup(opts);
+
+ return (0);
+}
+
+static int
+test_join(TEST_OPTS *opts, SHARED_OPTS *sharedopts, bool bloom,
+ bool sometimes_remove)
+{
+ THREAD_ARGS insert_args[N_INSERT_THREAD], join_args[N_JOIN_THREAD];
+ WT_CURSOR *maincur;
+ WT_SESSION *session;
+ pthread_t insert_tid[N_INSERT_THREAD], join_tid[N_JOIN_THREAD];
+ int i;
+
+ memset(insert_args, 0, sizeof(insert_args));
+ memset(join_args, 0, sizeof(join_args));
+
+ sharedopts->bloom = bloom;
+ sharedopts->remove = sometimes_remove;
+
+ fprintf(stderr, "Running with bloom=%d, remove=%d\n",
+ (int)bloom, (int)sometimes_remove);
+
+ testutil_check(
+ opts->conn->open_session(opts->conn, NULL, NULL, &session));
+
+ /*
+ * Note: id is repeated as id2. This makes it easier to
+ * identify the primary key in dumps of the index files.
+ */
+ testutil_check(session->create(session, opts->uri,
+ "key_format=i,value_format=iiSii,"
+ "columns=(id,post,bal,extra,flag,id2)"));
+
+ testutil_check(session->create(session, sharedopts->posturi,
+ "columns=(post)"));
+ testutil_check(session->create(session, sharedopts->baluri,
+ "columns=(bal)"));
+ testutil_check(session->create(session, sharedopts->flaguri,
+ "columns=(flag)"));
+
+ /*
+ * Insert a single record with all items we need to
+ * call search() on, this makes our join logic easier.
+ */
+ testutil_check(session->open_cursor(session, opts->uri, NULL, NULL,
+ &maincur));
+ maincur->set_key(maincur, N_RECORDS);
+ maincur->set_value(maincur, 54321, 0, "", 0, N_RECORDS);
+ testutil_check(maincur->insert(maincur));
+ testutil_check(maincur->close(maincur));
+
+ for (i = 0; i < N_INSERT_THREAD; ++i) {
+ insert_args[i].threadnum = i;
+ insert_args[i].nthread = N_INSERT_THREAD;
+ insert_args[i].testopts = opts;
+ insert_args[i].sharedopts = sharedopts;
+ testutil_check(pthread_create(&insert_tid[i], NULL,
+ thread_insert, (void *)&insert_args[i]));
+ }
+
+ for (i = 0; i < N_JOIN_THREAD; ++i) {
+ join_args[i].threadnum = i;
+ join_args[i].nthread = N_JOIN_THREAD;
+ join_args[i].testopts = opts;
+ join_args[i].sharedopts = sharedopts;
+ testutil_check(pthread_create(&join_tid[i], NULL,
+ thread_join, (void *)&join_args[i]));
+ }
+
+ /*
+ * Wait for insert threads to finish. When they
+ * are done, signal join threads to complete.
+ */
+ for (i = 0; i < N_INSERT_THREAD; ++i)
+ testutil_check(pthread_join(insert_tid[i], NULL));
+
+ for (i = 0; i < N_JOIN_THREAD; ++i)
+ join_args[i].done = 1;
+
+ for (i = 0; i < N_JOIN_THREAD; ++i)
+ testutil_check(pthread_join(join_tid[i], NULL));
+
+ fprintf(stderr, "\n");
+ for (i = 0; i < N_JOIN_THREAD; ++i) {
+ fprintf(stderr, " join thread %d did %d joins\n",
+ i, join_args[i].joins);
+ }
+ for (i = 0; i < N_INSERT_THREAD; ++i)
+ fprintf(stderr,
+ " insert thread %d did "
+ "%d inserts, %d removes, %d notfound, %d rollbacks\n",
+ i, insert_args[i].inserts, insert_args[i].removes,
+ insert_args[i].notfounds, insert_args[i].rollbacks);
+
+ testutil_check(session->drop(session, sharedopts->posturi, NULL));
+ testutil_check(session->drop(session, sharedopts->baluri, NULL));
+ testutil_check(session->drop(session, sharedopts->flaguri, NULL));
+ testutil_check(session->drop(session, opts->uri, NULL));
+ testutil_check(session->close(session, NULL));
+
+ return (0);
+}
+
+static void *thread_insert(void *arg)
+{
+ SHARED_OPTS *sharedopts;
+ TEST_OPTS *opts;
+ THREAD_ARGS *threadargs;
+ WT_CURSOR *maincur;
+ WT_RAND_STATE rnd;
+ WT_SESSION *session;
+ int bal, i, flag, key, post, ret;
+ const char *extra = S1024;
+
+ threadargs = (THREAD_ARGS *)arg;
+ opts = threadargs->testopts;
+ sharedopts = threadargs->sharedopts;
+ testutil_check(__wt_random_init_seed(NULL, &rnd));
+
+ testutil_check(opts->conn->open_session(
+ opts->conn, NULL, NULL, &session));
+
+ testutil_check(session->open_cursor(session, opts->uri, NULL, NULL,
+ &maincur));
+
+ for (i = 0; i < N_INSERT; i++) {
+ /*
+ * Insert threads may stomp on each other's records;
+ * that's okay.
+ */
+ key = (int)(__wt_random(&rnd) % N_RECORDS);
+ maincur->set_key(maincur, key);
+ if (sharedopts->remove)
+ testutil_check(session->begin_transaction(session,
+ "isolation=snapshot"));
+ if (sharedopts->remove && __wt_random(&rnd) % 5 == 0 &&
+ maincur->search(maincur) == 0) {
+ /*
+ * Another thread can be removing at the
+ * same time.
+ */
+ ret = maincur->remove(maincur);
+ testutil_assert(ret == 0 ||
+ (N_INSERT_THREAD > 1 &&
+ (ret == WT_NOTFOUND || ret == WT_ROLLBACK)));
+ if (ret == 0)
+ threadargs->removes++;
+ else if (ret == WT_NOTFOUND)
+ threadargs->notfounds++;
+ else if (ret == WT_ROLLBACK)
+ threadargs->rollbacks++;
+ } else {
+ if (__wt_random(&rnd) % 2 == 0)
+ post = 54321;
+ else
+ post = i % 100000;
+ if (__wt_random(&rnd) % 2 == 0) {
+ bal = -100;
+ flag = 1;
+ } else {
+ bal = 1 + (i % 1000) * 100;
+ flag = 0;
+ }
+ maincur->set_value(maincur, post, bal, extra, flag,
+ key);
+ ret = maincur->insert(maincur);
+ testutil_assert(ret == 0 ||
+ (N_INSERT_THREAD > 1 && ret == WT_ROLLBACK));
+ testutil_check(maincur->reset(maincur));
+ if (ret == 0)
+ threadargs->inserts++;
+ else if (ret == WT_ROLLBACK)
+ threadargs->rollbacks++;
+ }
+ if (sharedopts->remove)
+ testutil_check(session->commit_transaction(session,
+ NULL));
+ if (i % 1000 == 0 && i != 0) {
+ if (i % 10000 == 0)
+ fprintf(stderr, "*");
+ else
+ fprintf(stderr, ".");
+ }
+ }
+ testutil_check(maincur->close(maincur));
+ testutil_check(session->close(session, NULL));
+ return (NULL);
+}
+
+static void *thread_join(void *arg)
+{
+ SHARED_OPTS *sharedopts;
+ TEST_OPTS *opts;
+ THREAD_ARGS *threadargs;
+ WT_CURSOR *postcur, *balcur, *flagcur, *joincur;
+ WT_SESSION *session;
+ int bal, flag, key, key2, post, ret;
+ char cfg[128];
+ char *extra;
+
+ threadargs = (THREAD_ARGS *)arg;
+ opts = threadargs->testopts;
+ sharedopts = threadargs->sharedopts;
+
+ testutil_check(opts->conn->open_session(
+ opts->conn, NULL, NULL, &session));
+
+ testutil_check(session->open_cursor(
+ session, sharedopts->posturi, NULL, NULL, &postcur));
+ testutil_check(session->open_cursor(
+ session, sharedopts->baluri, NULL, NULL, &balcur));
+ testutil_check(session->open_cursor(
+ session, sharedopts->flaguri, NULL, NULL, &flagcur));
+
+ for (threadargs->joins = 0; threadargs->done == 0;
+ threadargs->joins++) {
+ testutil_check(session->open_cursor(
+ session, sharedopts->joinuri, NULL, NULL, &joincur));
+ postcur->set_key(postcur, 54321);
+ testutil_check(postcur->search(postcur));
+ testutil_check(session->join(session, joincur, postcur,
+ "compare=eq"));
+
+ balcur->set_key(balcur, 0);
+ testutil_check(balcur->search(balcur));
+ if (sharedopts->bloom)
+ sprintf(cfg, "compare=lt,strategy=bloom,count=%d",
+ N_RECORDS);
+ else
+ sprintf(cfg, "compare=lt");
+ testutil_check(session->join(session, joincur, balcur, cfg));
+
+ flagcur->set_key(flagcur, 0);
+ testutil_check(flagcur->search(flagcur));
+ if (sharedopts->bloom)
+ sprintf(cfg, "compare=eq,strategy=bloom,count=%d",
+ N_RECORDS);
+ else
+ sprintf(cfg, "compare=eq");
+ testutil_check(session->join(session, joincur, flagcur, cfg));
+
+ /* Expect no values returned */
+ ret = joincur->next(joincur);
+ if (ret == 0) {
+ /*
+ * The values may already have been changed, but
+ * print them for informational purposes.
+ */
+ testutil_check(joincur->get_key(joincur, &key));
+ testutil_check(joincur->get_value(joincur, &post,
+ &bal, &extra, &flag, &key2));
+ fprintf(stderr, "FAIL: iteration %d: "
+ "key=%d/%d, postal_code=%d, balance=%d, flag=%d\n",
+ threadargs->joins, key, key2, post, bal, flag);
+ /* Save the results. */
+ testutil_check(opts->conn->close(opts->conn, NULL));
+ opts->conn = NULL;
+ return (NULL);
+ }
+ testutil_assert(ret == WT_NOTFOUND);
+ testutil_check(joincur->close(joincur));
+
+ /*
+ * Reset the cursors, potentially allowing the insert
+ * threads to proceed.
+ */
+ testutil_check(postcur->reset(postcur));
+ testutil_check(balcur->reset(balcur));
+ testutil_check(flagcur->reset(flagcur));
+ if (threadargs->joins % 100 == 0)
+ fprintf(stderr, "J");
+ }
+ testutil_check(postcur->close(postcur));
+ testutil_check(balcur->close(balcur));
+ testutil_check(flagcur->close(flagcur));
+ testutil_check(session->close(session, NULL));
+ return (NULL);
+}
diff --git a/test/suite/test_join06.py b/test/suite/test_join06.py
index 5fedd365712..a6681cdccd0 100644
--- a/test/suite/test_join06.py
+++ b/test/suite/test_join06.py
@@ -37,8 +37,10 @@ class test_join06(wttest.WiredTigerTestCase):
nentries = 1000
isoscen = [
- ('isolation_read_uncommitted', dict(uncommitted=True)),
- ('isolation_default', dict(uncommitted=False))
+ ('isolation_read_uncommitted', dict(isolation='read-uncommitted')),
+ ('isolation_read_committed', dict(isolation='read-committed')),
+ ('isolation_default', dict(isolation='')),
+ ('isolation_snapshot', dict(isolation='snapshot'))
]
bloomscen = [
@@ -79,8 +81,8 @@ class test_join06(wttest.WiredTigerTestCase):
# TODO: needed?
#self.reopen_conn()
- if self.uncommitted:
- self.session.begin_transaction('isolation=read-uncommitted')
+ if self.isolation != '':
+ self.session.begin_transaction('isolation=' + self.isolation)
jc = self.session.open_cursor('join:table:join06', None, None)
c0 = self.session.open_cursor('index:join06:index0', None, None)
@@ -96,7 +98,7 @@ class test_join06(wttest.WiredTigerTestCase):
self.assertEquals(0, c1.search())
self.session.join(jc, c1, joinconfig)
- if self.uncommitted and self.bloom:
+ if self.isolation == 'read-uncommitted' and self.bloom:
# Make sure that read-uncommitted with Bloom is not allowed.
# This is detected on the first next() operation.
msg = '/cannot be used with read-uncommitted/'
@@ -106,7 +108,7 @@ class test_join06(wttest.WiredTigerTestCase):
# Changes made in another session may or may not be visible to us,
# depending on the isolation level.
- if self.uncommitted:
+ if self.isolation == 'read-uncommitted':
# isolation level is read-uncommitted, so we will see
# additions deletions made in our other session.
mbr = set(range(525,1000,10)) | set(range(55,100,10)) | set([520])
@@ -116,12 +118,11 @@ class test_join06(wttest.WiredTigerTestCase):
mbr = set(range(520,600)) | set(range(53,60))
altered = False
-
while jc.next() == 0:
[k] = jc.get_keys()
[v0,v1] = jc.get_values()
#self.tty('GOT: ' + str(k) + ': ' + str(jc.get_values()))
- if altered and self.uncommitted:
+ if altered and self.isolation == 'read-uncommitted':
self.assertEquals(self.gen_values2(k), [v0, v1])
else:
self.assertEquals(self.gen_values(k), [v0, v1])
@@ -150,7 +151,7 @@ class test_join06(wttest.WiredTigerTestCase):
jc.close()
c1.close()
c0.close()
- if self.uncommitted:
+ if self.isolation != '':
self.session.commit_transaction()
self.session.drop('table:join06')