diff options
author | sueloverso <sue@mongodb.com> | 2016-06-14 00:39:20 -0400 |
---|---|---|
committer | Michael Cahill <michael.cahill@mongodb.com> | 2016-06-23 17:30:02 +1000 |
commit | 30e49acc9036721491e78af9d0c13d5816aea7ca (patch) | |
tree | 885c3c0fb1b59eceae7a5b63122c1c74a07913aa | |
parent | 063dbdfe456455dea1159f4d3a6d82a4c275958b (diff) | |
download | mongo-30e49acc9036721491e78af9d0c13d5816aea7ca.tar.gz |
WT-2696 Wait if we find an unbuffered flag without the size set yet. (#2794)
* Modify recovery test to use multiple threads to reproduce this issue.
(cherry picked from commit 0d4c83daf7174788d2a83321c33a215a9bdfa89f)
-rw-r--r-- | src/log/log_slot.c | 11 | ||||
-rw-r--r-- | test/recovery/random-abort.c | 185 |
2 files changed, 133 insertions, 63 deletions
diff --git a/src/log/log_slot.c b/src/log/log_slot.c index b7efb1d9018..47071211450 100644 --- a/src/log/log_slot.c +++ b/src/log/log_slot.c @@ -94,6 +94,17 @@ retry: if (WT_LOG_SLOT_DONE(new_state)) *releasep = 1; slot->slot_end_lsn = slot->slot_start_lsn; + /* + * A thread setting the unbuffered flag sets the unbuffered size after + * setting the flag. There could be a delay between a thread setting + * the flag, a thread closing the slot, and the original thread setting + * that value. If the state is unbuffered, wait for the unbuffered + * size to be set. + */ + while (WT_LOG_SLOT_UNBUFFERED_ISSET(old_state) && + slot->slot_unbuffered == 0) + __wt_yield(); + end_offset = WT_LOG_SLOT_JOINED_BUFFERED(old_state) + slot->slot_unbuffered; slot->slot_end_lsn.l.offset += (uint32_t)end_offset; diff --git a/test/recovery/random-abort.c b/test/recovery/random-abort.c index cd7d1b08708..2454aa6056b 100644 --- a/test/recovery/random-abort.c +++ b/test/recovery/random-abort.c @@ -36,7 +36,7 @@ #include <unistd.h> #endif -#include <wiredtiger.h> +#include <wt_internal.h> #include "test_util.i" @@ -44,7 +44,8 @@ static char home[512]; /* Program working dir */ static const char *progname; /* Program name */ static const char * const uri = "table:main"; -#define RECORDS_FILE "records" +#define NTHREADS 5 +#define RECORDS_FILE "records-%u" #define ENV_CONFIG \ "create,log=(file_max=10M,archive=false,enabled)," \ @@ -55,71 +56,66 @@ static const char * const uri = "table:main"; static void usage(void) { - fprintf(stderr, "usage: %s [-h dir]\n", progname); + fprintf(stderr, "usage: %s [-h dir] [-T threads]\n", progname); exit(EXIT_FAILURE); } +typedef struct { + WT_CONNECTION *conn; + uint64_t start; + uint32_t id; +} WT_THREAD_DATA; + /* * Child process creates the database and table, and then writes data into * the table until it is killed by the parent. */ -static void -fill_db(void) +static void * +thread_run(void *arg) { FILE *fp; - WT_CONNECTION *conn; WT_CURSOR *cursor; WT_ITEM data; WT_RAND_STATE rnd; WT_SESSION *session; + WT_THREAD_DATA *td; uint64_t i; int ret; - uint8_t buf[MAX_VAL]; + char buf[MAX_VAL], kname[64]; __wt_random_init(&rnd); memset(buf, 0, sizeof(buf)); - /* - * Initialize the first 25% to random values. Leave a bunch of data - * space at the end to emphasize zero data. - */ - for (i = 0; i < MAX_VAL/4; i++) - buf[i] = (uint8_t)__wt_random(&rnd); + memset(kname, 0, sizeof(kname)); + td = (WT_THREAD_DATA *)arg; /* - * Run in the home directory so that the records file is in there too. + * The value is the name of the record file with our id appended. */ - if (chdir(home) != 0) - testutil_die(errno, "chdir: %s", home); - if ((ret = wiredtiger_open(NULL, NULL, ENV_CONFIG, &conn)) != 0) - testutil_die(ret, "wiredtiger_open"); - if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) - testutil_die(ret, "WT_CONNECTION:open_session"); - if ((ret = session->create(session, - uri, "key_format=Q,value_format=u")) != 0) - testutil_die(ret, "WT_SESSION.create: %s", uri); - if ((ret = - session->open_cursor(session, uri, NULL, NULL, &cursor)) != 0) - testutil_die(ret, "WT_SESSION.open_cursor: %s", uri); - + snprintf(buf, sizeof(buf), RECORDS_FILE, td->id); /* * Keep a separate file with the records we wrote for checking. */ - (void)unlink(RECORDS_FILE); - if ((fp = fopen(RECORDS_FILE, "w")) == NULL) + (void)unlink(buf); + if ((fp = fopen(buf, "w")) == NULL) testutil_die(errno, "fopen"); /* * Set to no buffering. */ __wt_stream_set_no_buffer(fp); - + if ((ret = td->conn->open_session(td->conn, NULL, NULL, &session)) != 0) + testutil_die(ret, "WT_CONNECTION:open_session"); + if ((ret = + session->open_cursor(session, uri, NULL, NULL, &cursor)) != 0) + testutil_die(ret, "WT_SESSION.open_cursor: %s", uri); + data.data = buf; + data.size = sizeof(buf); /* - * Write data into the table until we are killed by the parent. - * The data in the buffer is already set to random content. + * Write our portion of the key space until we're killed. */ - data.data = buf; - for (i = 0;; ++i) { + for (i = td->start; ; ++i) { + snprintf(kname, sizeof(kname), "%" PRIu64, i); data.size = __wt_random(&rnd) % MAX_VAL; - cursor->set_key(cursor, i); + cursor->set_key(cursor, kname); cursor->set_value(cursor, &data); if ((ret = cursor->insert(cursor)) != 0) testutil_die(ret, "WT_CURSOR.insert"); @@ -128,9 +124,62 @@ fill_db(void) */ if (fprintf(fp, "%" PRIu64 "\n", i) == -1) testutil_die(errno, "fprintf"); - if (i % 5000) - __wt_yield(); } + return (NULL); +} + +/* + * Child process creates the database and table, and then creates worker + * threads to add data until it is killed by the parent. + */ +static void fill_db(uint32_t) + WT_GCC_FUNC_DECL_ATTRIBUTE((noreturn)); +static void +fill_db(uint32_t nth) +{ + pthread_t *thr; + WT_CONNECTION *conn; + WT_SESSION *session; + WT_THREAD_DATA *td; + uint32_t i; + int ret; + + thr = calloc(nth, sizeof(pthread_t)); + td = calloc(nth, sizeof(WT_THREAD_DATA)); + if (chdir(home) != 0) + testutil_die(errno, "Child chdir: %s", home); + if ((ret = wiredtiger_open(NULL, NULL, ENV_CONFIG, &conn)) != 0) + testutil_die(ret, "wiredtiger_open"); + if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) + testutil_die(ret, "WT_CONNECTION:open_session"); + if ((ret = session->create(session, + uri, "key_format=S,value_format=u")) != 0) + testutil_die(ret, "WT_SESSION.create: %s", uri); + if ((ret = session->close(session, NULL)) != 0) + testutil_die(ret, "WT_SESSION:close"); + + for (i = 0; i < nth; ++i) { + td[i].conn = conn; + td[i].start = (UINT64_MAX / nth) * i; + td[i].id = i; + if ((ret = pthread_create( + &thr[i], NULL, thread_run, &td[i])) != 0) + testutil_die(ret, "pthread_create"); + } + printf("Spawned %" PRIu32 " writer threads\n", nth); + fflush(stdout); + /* + * The threads never exit, so the child will just wait here until + * it is killed. + */ + for (i = 0; i < nth; ++i) + pthread_join(thr[i], NULL); + /* + * NOTREACHED + */ + free(thr); + free(td); + exit(EXIT_SUCCESS); } extern int __wt_optind; @@ -147,23 +196,28 @@ main(int argc, char *argv[]) WT_SESSION *session; WT_RAND_STATE rnd; uint64_t key; - uint32_t absent, count, timeout; + uint32_t absent, count, i, nth, timeout; int ch, status, ret; pid_t pid; const char *working_dir; + char fname[64], kname[64]; if ((progname = strrchr(argv[0], DIR_DELIM)) == NULL) progname = argv[0]; else ++progname; - working_dir = "WT_TEST.random-abort"; + working_dir = "WT_TEST.random-abort-many"; timeout = 10; - while ((ch = __wt_getopt(progname, argc, argv, "h:t:")) != EOF) + nth = NTHREADS; + while ((ch = __wt_getopt(progname, argc, argv, "h:T:t:")) != EOF) switch (ch) { case 'h': working_dir = __wt_optarg; break; + case 'T': + nth = (uint32_t)atoi(__wt_optarg); + break; case 't': timeout = (uint32_t)atoi(__wt_optarg); break; @@ -187,7 +241,7 @@ main(int argc, char *argv[]) testutil_die(errno, "fork"); if (pid == 0) { /* child */ - fill_db(); + fill_db(nth); return (EXIT_SUCCESS); } @@ -212,7 +266,7 @@ main(int argc, char *argv[]) * this is the place to do it. */ if (chdir(home) != 0) - testutil_die(errno, "chdir: %s", home); + testutil_die(errno, "parent chdir: %s", home); printf("Open database, run recovery and verify content\n"); if ((ret = wiredtiger_open(NULL, NULL, ENV_CONFIG_REC, &conn)) != 0) testutil_die(ret, "wiredtiger_open"); @@ -222,30 +276,35 @@ main(int argc, char *argv[]) session->open_cursor(session, uri, NULL, NULL, &cursor)) != 0) testutil_die(ret, "WT_SESSION.open_cursor: %s", uri); - if ((fp = fopen(RECORDS_FILE, "r")) == NULL) - testutil_die(errno, "fopen"); + absent = count = 0; + for (i = 0; i < nth; ++i) { + snprintf(fname, sizeof(fname), RECORDS_FILE, i); + if ((fp = fopen(fname, "r")) == NULL) + testutil_die(errno, "fopen"); - /* - * For every key in the saved file, verify that the key exists - * in the table after recovery. Since we did write-no-sync, we - * expect every key to have been recovered. - */ - for (absent = count = 0;; ++count) { - ret = fscanf(fp, "%" SCNu64 "\n", &key); - if (ret != EOF && ret != 1) - testutil_die(errno, "fscanf"); - if (ret == EOF) - break; - cursor->set_key(cursor, key); - if ((ret = cursor->search(cursor)) != 0) { - if (ret != WT_NOTFOUND) - testutil_die(ret, "search"); - printf("no record with key %" PRIu64 "\n", key); - ++absent; + /* + * For every key in the saved file, verify that the key exists + * in the table after recovery. Since we did write-no-sync, we + * expect every key to have been recovered. + */ + for (count = 0;; ++count) { + ret = fscanf(fp, "%" SCNu64 "\n", &key); + if (ret != EOF && ret != 1) + testutil_die(errno, "fscanf"); + if (ret == EOF) + break; + snprintf(kname, sizeof(kname), "%" PRIu64, key); + cursor->set_key(cursor, kname); + if ((ret = cursor->search(cursor)) != 0) { + if (ret != WT_NOTFOUND) + testutil_die(ret, "search"); + printf("no record with key %" PRIu64 "\n", key); + ++absent; + } } + if (fclose(fp) != 0) + testutil_die(errno, "fclose"); } - if (fclose(fp) != 0) - testutil_die(errno, "fclose"); if ((ret = conn->close(conn, NULL)) != 0) testutil_die(ret, "WT_CONNECTION:close"); if (absent) { |