diff options
36 files changed, 889 insertions, 235 deletions
diff --git a/bench/wtperf/Makefile.am b/bench/wtperf/Makefile.am index 0630a27f640..15f151d84b2 100644 --- a/bench/wtperf/Makefile.am +++ b/bench/wtperf/Makefile.am @@ -5,7 +5,7 @@ LDADD = $(top_builddir)/libwiredtiger.la -lm noinst_PROGRAMS = wtperf wtperf_LDFLAGS = -static wtperf_SOURCES =\ - config.c misc.c track.c wtperf.c wtperf.h wtperf_opt.i + config.c misc.c track.c wtperf.c wtperf_truncate.c wtperf.h wtperf_opt.i TESTS = smoke.sh AM_TESTS_ENVIRONMENT = rm -rf WT_TEST ; mkdir WT_TEST ; diff --git a/bench/wtperf/config.c b/bench/wtperf/config.c index 47e052d6055..721b41432cb 100644 --- a/bench/wtperf/config.c +++ b/bench/wtperf/config.c @@ -95,6 +95,8 @@ config_assign(CONFIG *dest, const CONFIG *src) *pstr = newstr; } } + + STAILQ_INIT(&dest->stone_head); return (0); } @@ -122,6 +124,7 @@ config_free(CONFIG *cfg) free(cfg->uris); } + cleanup_truncate_config(cfg); free(cfg->ckptthreads); free(cfg->popthreads); free(cfg->base_uri); @@ -243,6 +246,28 @@ config_threads(CONFIG *cfg, const char *config, size_t len) goto err; continue; } + if (STRING_MATCH("truncate", k.str, k.len)) { + if ((workp->truncate = v.val) != 1) + goto err; + /* There can only be one Truncate thread. */ + if (cfg->has_truncate != 0) { + goto err; + } + cfg->has_truncate = 1; + continue; + } + if (STRING_MATCH("truncate_pct", k.str, k.len)) { + if (v.val <= 0) + goto err; + workp->truncate_pct = (uint64_t)v.val; + continue; + } + if (STRING_MATCH("truncate_count", k.str, k.len)) { + if (v.val <= 0) + goto err; + workp->truncate_count = (uint64_t)v.val; + continue; + } goto err; } if (ret == WT_NOTFOUND) @@ -253,9 +278,21 @@ config_threads(CONFIG *cfg, const char *config, size_t len) scan = NULL; if (ret != 0) goto err; - - if (workp->insert == 0 && - workp->read == 0 && workp->update == 0) + if (workp->insert == 0 && workp->read == 0 && + workp->update == 0 && workp->truncate == 0) + goto err; + /* Why run with truncate if we don't want any truncation. */ + if (workp->truncate != 0 && + workp->truncate_pct == 0 && workp->truncate_count == 0) + goto err; + if (workp->truncate != 0 && + (workp->truncate_pct < 1 || workp->truncate_pct > 99)) + goto err; + /* Truncate should have its own exclusive thread. */ + if (workp->truncate != 0 && workp->threads > 1) + goto err; + if (workp->truncate != 0 && + (workp->insert > 0 || workp->read > 0 || workp->update > 0)) goto err; cfg->workers_cnt += (u_int)workp->threads; } @@ -640,9 +677,11 @@ config_print(CONFIG *cfg) for (i = 0, workp = cfg->workload; i < cfg->workload_cnt; ++i, ++workp) printf("\t\t%" PRId64 " threads (inserts=%" PRId64 - ", reads=%" PRId64 ", updates=%" PRId64 ")\n", + ", reads=%" PRId64 ", updates=%" PRId64 + ", truncates=% " PRId64 ")\n", workp->threads, - workp->insert, workp->read, workp->update); + workp->insert, workp->read, + workp->update, workp->truncate); } printf("\t" "Checkpoint threads, interval: %" PRIu32 ", %" PRIu32 "\n", diff --git a/bench/wtperf/runners/mongodb-oplog.wtperf b/bench/wtperf/runners/mongodb-oplog.wtperf new file mode 100644 index 00000000000..34235f04518 --- /dev/null +++ b/bench/wtperf/runners/mongodb-oplog.wtperf @@ -0,0 +1,11 @@ +# wtperf options file to simulate populating a MongoDB oplog +conn_config="cache_size=2GB,checkpoint=(wait=60)" +table_config="type=file" +# Start with a small set of inserts in the populate phase. +icount=50000 +report_interval=5 +run_time=500 +populate_threads=1 +# Setup three threads to insert into the oplog +# Setup one thread to be doing truncates from the oplog +threads=((count=3,inserts=1,throttle=2000),(count=1,truncate=1,truncate_pct=10,truncate_count=50000)) diff --git a/bench/wtperf/runners/truncate-btree-populate.wtperf b/bench/wtperf/runners/truncate-btree-populate.wtperf new file mode 100644 index 00000000000..4e4ae7500f0 --- /dev/null +++ b/bench/wtperf/runners/truncate-btree-populate.wtperf @@ -0,0 +1,7 @@ +# Truncate workload population phase +conn_config="cache_size=2GB,checkpoint=(wait=60)" +table_config="type=file" +# Start with a small set of inserts in the populate phase. +icount=50000 +report_interval=5 +populate_threads=1 diff --git a/bench/wtperf/runners/truncate-btree-workload.wtperf b/bench/wtperf/runners/truncate-btree-workload.wtperf new file mode 100644 index 00000000000..55e01dcd0dc --- /dev/null +++ b/bench/wtperf/runners/truncate-btree-workload.wtperf @@ -0,0 +1,9 @@ +# truncate workload. work phase +conn_config="cache_size=2GB,checkpoint=(wait=60)" +table_config="type=file" +create=false +report_interval=5 +run_time=500 +# Setup three threads to insert into the oplog +# Setup one thread to be doing truncates from the oplog +threads=((count=3,inserts=1,throttle=2000),(count=1,truncate=1,truncate_pct=10,truncate_count=50000)) diff --git a/bench/wtperf/track.c b/bench/wtperf/track.c index 8ea4201246a..75f5a012a94 100644 --- a/bench/wtperf/track.c +++ b/bench/wtperf/track.c @@ -98,6 +98,11 @@ sum_read_ops(CONFIG *cfg) return (sum_ops(cfg, offsetof(CONFIG_THREAD, read))); } uint64_t +sum_truncate_ops(CONFIG *cfg) +{ + return (sum_ops(cfg, offsetof(CONFIG_THREAD, truncate))); +} +uint64_t sum_update_ops(CONFIG *cfg) { return (sum_ops(cfg, offsetof(CONFIG_THREAD, update))); diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c index 543a125d4d4..f079d6272d7 100644 --- a/bench/wtperf/wtperf.c +++ b/bench/wtperf/wtperf.c @@ -50,6 +50,7 @@ static const CONFIG default_cfg = { 0, /* checkpoint operations */ 0, /* insert operations */ 0, /* read operations */ + 0, /* truncate operations */ 0, /* update operations */ 0, /* insert key */ 0, /* checkpoint in progress */ @@ -57,6 +58,8 @@ static const CONFIG default_cfg = { 0, /* notify threads to stop */ 0, /* in warmup phase */ 0, /* total seconds running */ + 0, /* has truncate */ + {NULL, NULL}, /* the truncate queue */ #define OPT_DEFINE_DEFAULT #include "wtperf_opt.i" @@ -100,15 +103,6 @@ get_next_incr(CONFIG *cfg) return (WT_ATOMIC_ADD8(cfg->insert_key, 1)); } -static inline void -generate_key(CONFIG *cfg, char *key_buf, uint64_t keyno) -{ - /* - * Don't change to snprintf, sprintf is faster in some tests. - */ - sprintf(key_buf, "%0*" PRIu64, cfg->key_sz - 1, keyno); -} - static void randomize_value(CONFIG_THREAD *thread, char *value_buf) { @@ -258,6 +252,8 @@ op_name(uint8_t *op) return ("insert_rmw"); case WORKER_READ: return ("read"); + case WORKER_TRUNCATE: + return ("truncate"); case WORKER_UPDATE: return ("update"); default: @@ -389,7 +385,7 @@ worker(void *arg) size_t i; uint64_t next_val, usecs; uint8_t *op, *op_end; - int measure_latency, ret; + int measure_latency, ret, truncated; char *value_buf, *key_buf, *value; char buf[512]; @@ -444,6 +440,11 @@ worker(void *arg) goto err; } + /* Setup for truncate */ + if (thread->workload->truncate != 0) + if ((ret = setup_truncate(cfg, thread, session)) != 0) + goto err; + key_buf = thread->key_buf; value_buf = thread->value_buf; @@ -486,6 +487,10 @@ worker(void *arg) if (wtperf_value_range(cfg) < next_val) continue; break; + case WORKER_TRUNCATE: + /* Required but not used. */ + next_val = wtperf_rand(thread); + break; default: goto err; /* can't happen */ } @@ -502,10 +507,9 @@ worker(void *arg) * is 0, to avoid first time latency spikes. */ measure_latency = - cfg->sample_interval != 0 && trk->ops != 0 && ( - trk->ops % cfg->sample_rate == 0); - if (measure_latency && - (ret = __wt_epoch(NULL, &start)) != 0) { + cfg->sample_interval != 0 && trk != NULL && + trk->ops != 0 && (trk->ops % cfg->sample_rate == 0); + if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) { lprintf(cfg, ret, 0, "Get time call failed"); goto err; } @@ -548,6 +552,18 @@ worker(void *arg) if ((ret = cursor->insert(cursor)) == 0) break; goto op_err; + case WORKER_TRUNCATE: + if ((ret = run_truncate( + cfg, thread, cursor, session, &truncated)) == 0) { + if (truncated) + trk = &thread->truncate; + else + trk = &thread->truncate_sleep; + /* Pause between truncate attempts */ + (void)usleep(1000); + break; + } + goto op_err; case WORKER_UPDATE: if ((ret = cursor->search(cursor)) == 0) { if ((ret = cursor->get_value( @@ -711,16 +727,33 @@ run_mix_schedule(CONFIG *cfg, WORKLOAD *workp) { int64_t pct; - /* Confirm reads, inserts and updates cannot all be zero. */ - if (workp->insert == 0 && workp->read == 0 && workp->update == 0) { + /* Confirm reads, inserts, truncates and updates cannot all be zero. */ + if (workp->insert == 0 && workp->read == 0 && + workp->truncate == 0 && workp->update == 0) { lprintf(cfg, EINVAL, 0, "no operations scheduled"); return (EINVAL); } /* + * Handle truncate first - it's a special case that can't be used in + * a mixed workload. + */ + if (workp->truncate != 0) { + if (workp->insert != 0 || + workp->read != 0 || workp->update != 0) { + lprintf(cfg, EINVAL, 0, + "Can't configure truncate in a mixed workload"); + return (EINVAL); + } + memset(workp->ops, WORKER_TRUNCATE, sizeof(workp->ops)); + return (0); + } + + /* * Check for a simple case where the thread is only doing insert or - * update operations (because the default operation for a job-mix is - * read, the subsequent code works fine if only reads are specified). + * update operations (because the default operation for a + * job-mix is read, the subsequent code works fine if only reads are + * specified). */ if (workp->insert != 0 && workp->read == 0 && workp->update == 0) { memset(workp->ops, @@ -840,10 +873,9 @@ populate_thread(void *arg) cursor = cursors[op % cfg->table_count]; generate_key(cfg, key_buf, op); measure_latency = - cfg->sample_interval != 0 && trk->ops != 0 && ( - trk->ops % cfg->sample_rate == 0); - if (measure_latency && - (ret = __wt_epoch(NULL, &start)) != 0) { + cfg->sample_interval != 0 && + trk->ops != 0 && (trk->ops % cfg->sample_rate == 0); + if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) { lprintf(cfg, ret, 0, "Get time call failed"); goto err; } @@ -961,10 +993,9 @@ populate_async(void *arg) * the time to process by workers. */ measure_latency = - cfg->sample_interval != 0 && trk->ops != 0 && ( - trk->ops % cfg->sample_rate == 0); - if (measure_latency && - (ret = __wt_epoch(NULL, &start)) != 0) { + cfg->sample_interval != 0 && + trk->ops != 0 && (trk->ops % cfg->sample_rate == 0); + if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) { lprintf(cfg, ret, 0, "Get time call failed"); goto err; } @@ -1006,8 +1037,7 @@ populate_async(void *arg) goto err; if (measure_latency) { if ((ret = __wt_epoch(NULL, &stop)) != 0) { - lprintf(cfg, ret, 0, - "Get time call failed"); + lprintf(cfg, ret, 0, "Get time call failed"); goto err; } ++trk->latency_ops; @@ -1433,16 +1463,19 @@ execute_workload(CONFIG *cfg) { CONFIG_THREAD *threads; WORKLOAD *workp; - uint64_t last_ckpts, last_inserts, last_reads, last_updates; + uint64_t last_ckpts, last_inserts, last_reads, last_truncates; + uint64_t last_updates; uint32_t interval, run_ops, run_time; u_int i; int ret, t_ret; void *(*pfunc)(void *); cfg->insert_key = 0; - cfg->insert_ops = cfg->read_ops = cfg->update_ops = 0; + cfg->insert_ops = cfg->read_ops = cfg->truncate_ops = 0; + cfg->update_ops = 0; - last_ckpts = last_inserts = last_reads = last_updates = 0; + last_ckpts = last_inserts = last_reads = last_truncates = 0; + last_updates = 0; ret = 0; if (cfg->warmup != 0) @@ -1467,9 +1500,9 @@ execute_workload(CONFIG *cfg) workp = cfg->workload; i < cfg->workload_cnt; ++i, ++workp) { lprintf(cfg, 0, 1, "Starting workload #%d: %" PRId64 " threads, inserts=%" - PRId64 ", reads=%" PRId64 ", updates=%" PRId64, - i + 1, - workp->threads, workp->insert, workp->read, workp->update); + PRId64 ", reads=%" PRId64 ", updates=%" PRId64 + ", truncate=%" PRId64, i + 1, workp->threads, workp->insert, + workp->read, workp->update, workp->truncate); /* Figure out the workload's schedule. */ if ((ret = run_mix_schedule(cfg, workp)) != 0) @@ -1509,6 +1542,7 @@ execute_workload(CONFIG *cfg) cfg->insert_ops = sum_insert_ops(cfg); cfg->read_ops = sum_read_ops(cfg); cfg->update_ops = sum_update_ops(cfg); + cfg->truncate_ops = sum_truncate_ops(cfg); /* If we're checking total operations, see if we're done. */ if (run_ops != 0 && run_ops <= @@ -1523,16 +1557,18 @@ execute_workload(CONFIG *cfg) lprintf(cfg, 0, 1, "%" PRIu64 " reads, %" PRIu64 " inserts, %" PRIu64 - " updates, %" PRIu64 " checkpoints in %" PRIu32 - " secs (%" PRIu32 " total secs)", + " updates, %" PRIu64 " truncates, %" PRIu64 + " checkpoints in %" PRIu32 " secs (%" PRIu32 " total secs)", cfg->read_ops - last_reads, cfg->insert_ops - last_inserts, cfg->update_ops - last_updates, + cfg->truncate_ops - last_truncates, cfg->ckpt_ops - last_ckpts, cfg->report_interval, cfg->totalsec); last_reads = cfg->read_ops; last_inserts = cfg->insert_ops; last_updates = cfg->update_ops; + last_truncates = cfg->truncate_ops; last_ckpts = cfg->ckpt_ops; } @@ -1915,6 +1951,7 @@ start_run(CONFIG *cfg) /* One final summation of the operations we've completed. */ cfg->read_ops = sum_read_ops(cfg); cfg->insert_ops = sum_insert_ops(cfg); + cfg->truncate_ops = sum_truncate_ops(cfg); cfg->update_ops = sum_update_ops(cfg); cfg->ckpt_ops = sum_ckpt_ops(cfg); total_ops = cfg->read_ops + cfg->insert_ops + cfg->update_ops; @@ -1930,6 +1967,11 @@ start_run(CONFIG *cfg) cfg->insert_ops, (cfg->insert_ops * 100) / total_ops, cfg->insert_ops / cfg->run_time); lprintf(cfg, 0, 1, + "Executed %" PRIu64 " truncate operations (%" PRIu64 + "%%) %" PRIu64 " ops/sec", + cfg->truncate_ops, (cfg->truncate_ops * 100) / total_ops, + cfg->truncate_ops / cfg->run_time); + lprintf(cfg, 0, 1, "Executed %" PRIu64 " update operations (%" PRIu64 "%%) %" PRIu64 " ops/sec", cfg->update_ops, (cfg->update_ops * 100) / total_ops, @@ -2087,8 +2129,13 @@ main(int argc, char *argv[]) * If the user wants compaction, then we also enable async for * the compact operation, but not for the workloads. */ - if (cfg->async_threads > 0) + if (cfg->async_threads > 0) { + if (cfg->has_truncate > 0) { + lprintf(cfg, 1, 0, "Cannot run truncate and async\n"); + goto err; + } cfg->use_asyncops = 1; + } if (cfg->compact && cfg->async_threads == 0) cfg->async_threads = 2; if (cfg->async_threads > 0) { @@ -2110,6 +2157,18 @@ main(int argc, char *argv[]) if ((ret = config_compress(cfg)) != 0) goto err; + /* You can't have truncate on a random collection. */ + if (cfg->has_truncate && cfg->random_range) { + lprintf(cfg, 1, 0, "Cannot run truncate and random_range\n"); + goto err; + } + + /* We can't run truncate with more than one table. */ + if (cfg->has_truncate && cfg->table_count > 1) { + lprintf(cfg, 1, 0, "Cannot truncate more than 1 table\n"); + goto err; + } + /* Build the URI from the table name. */ req_len = strlen("table:") + strlen(HELIUM_NAME) + strlen(cfg->table_name) + 2; diff --git a/bench/wtperf/wtperf.h b/bench/wtperf/wtperf.h index 874cdc499b1..58dc65388ae 100644 --- a/bench/wtperf/wtperf.h +++ b/bench/wtperf/wtperf.h @@ -26,6 +26,9 @@ * OTHER DEALINGS IN THE SOFTWARE. */ +#ifndef HAVE_WTPERF_H +#define HAVE_WTPERF_H + #ifndef _WIN32 #include <sys/time.h> #endif @@ -90,14 +93,39 @@ typedef struct { int64_t throttle; /* Maximum operations/second */ /* Number of operations per transaction. Zero for autocommit */ int64_t ops_per_txn; + int64_t truncate; /* Truncate ratio */ + uint64_t truncate_pct; /* Truncate Percent */ + uint64_t truncate_count; /* Truncate Count */ #define WORKER_INSERT 1 /* Insert */ #define WORKER_INSERT_RMW 2 /* Insert with read-modify-write */ #define WORKER_READ 3 /* Read */ -#define WORKER_UPDATE 4 /* Update */ +#define WORKER_TRUNCATE 4 /* Truncate */ +#define WORKER_UPDATE 5 /* Update */ uint8_t ops[100]; /* Operation schedule */ } WORKLOAD; +/* Steering items for the truncate workload */ +typedef struct __truncate_struct TRUNCATE_CONFIG; +struct __truncate_struct { + uint64_t stone_gap; + uint64_t needed_stones; + uint64_t final_stone_gap; + uint64_t expected_total; + uint64_t total_inserts; + uint64_t last_total_inserts; + uint64_t num_stones; + uint64_t last_key; +}; + +/* Queue entry for use with the Truncate Logic */ +struct __truncate_queue_entry { + char *key; /* Truncation point */ + uint64_t diff; /* Number of items to be truncated*/ + STAILQ_ENTRY(__truncate_queue_entry) q; +}; +typedef struct __truncate_queue_entry TRUNCATE_QUEUE_ENTRY; + #define LOG_PARTIAL_CONFIG ",log=(enabled=false)" /* * NOTE: If you add any fields to this structure here, you must also add @@ -135,6 +163,7 @@ struct __config { /* Configuration structure */ uint64_t ckpt_ops; /* checkpoint operations */ uint64_t insert_ops; /* insert operations */ uint64_t read_ops; /* read operations */ + uint64_t truncate_ops; /* truncate operations */ uint64_t update_ops; /* update operations */ uint64_t insert_key; /* insert key */ @@ -146,6 +175,11 @@ struct __config { /* Configuration structure */ volatile uint32_t totalsec; /* total seconds running */ + u_int has_truncate; /* if there is a truncate workload */ + + /* Queue head for use with the Truncate Logic */ + STAILQ_HEAD(__truncate_qh, __truncate_queue_entry) stone_head; + /* Fields changeable on command line are listed in wtperf_opt.i */ #define OPT_DECLARE_STRUCT #include "wtperf_opt.i" @@ -211,7 +245,7 @@ typedef struct { struct __config_thread { /* Per-thread structure */ CONFIG *cfg; /* Enclosing configuration */ - uint64_t rnd; /* Random number generation state */ + WT_RAND_STATE rnd; /* Random number generation state */ pthread_t handle; /* Handle */ @@ -223,8 +257,13 @@ struct __config_thread { /* Per-thread structure */ TRACK insert; /* Insert operations */ TRACK read; /* Read operations */ TRACK update; /* Update operations */ + TRACK truncate; /* Truncate operations */ + TRACK truncate_sleep; /* Truncate sleep operations */ + TRUNCATE_CONFIG trunc_cfg; /* Truncate configuration */ + }; +void cleanup_truncate_config(CONFIG *); int config_assign(CONFIG *, const CONFIG *); int config_compress(CONFIG *); void config_free(CONFIG *); @@ -238,11 +277,15 @@ void latency_read(CONFIG *, uint32_t *, uint32_t *, uint32_t *); void latency_update(CONFIG *, uint32_t *, uint32_t *, uint32_t *); void latency_print(CONFIG *); int enomem(const CONFIG *); +int run_truncate( + CONFIG *, CONFIG_THREAD *, WT_CURSOR *, WT_SESSION *, int *); int setup_log_file(CONFIG *); +int setup_truncate(CONFIG *, CONFIG_THREAD *, WT_SESSION *); uint64_t sum_ckpt_ops(CONFIG *); uint64_t sum_insert_ops(CONFIG *); uint64_t sum_pop_ops(CONFIG *); uint64_t sum_read_ops(CONFIG *); +uint64_t sum_truncate_ops(CONFIG *); uint64_t sum_update_ops(CONFIG *); void usage(void); @@ -251,3 +294,14 @@ void lprintf(const CONFIG *, int err, uint32_t, const char *, ...) __attribute__((format (printf, 4, 5))) #endif ; + +static inline void +generate_key(CONFIG *cfg, char *key_buf, uint64_t keyno) +{ + /* + * Don't change to snprintf, sprintf is faster in some tests. + */ + sprintf(key_buf, "%0*" PRIu64, cfg->key_sz - 1, keyno); +} + +#endif diff --git a/bench/wtperf/wtperf_opt.i b/bench/wtperf/wtperf_opt.i index 6cb39ac3cc4..7e29aa0f3c2 100644 --- a/bench/wtperf/wtperf_opt.i +++ b/bench/wtperf/wtperf_opt.i @@ -167,7 +167,8 @@ DEF_OPT_AS_STRING(threads, "", "workload configuration: each 'count' " "'threads=((count=2,reads=1)(count=8,reads=1,inserts=2,updates=1))' " "which would create 2 threads doing nothing but reads and 8 threads " "each doing 50% inserts and 25% reads and updates. Allowed configuration " - "values are 'count', 'throttle', 'reads', 'inserts', 'updates'. There are " + "values are 'count', 'throttle', 'reads', 'inserts', 'updates', 'truncate'," + " 'truncate_pct' and 'truncate_count'. There are " "also behavior modifiers, supported modifiers are 'ops_per_txn'") DEF_OPT_AS_CONFIG_STRING(transaction_config, "", "transaction configuration string, relevant when populate_opts_per_txn " diff --git a/bench/wtperf/wtperf_truncate.c b/bench/wtperf/wtperf_truncate.c new file mode 100644 index 00000000000..0d5d1045e1e --- /dev/null +++ b/bench/wtperf/wtperf_truncate.c @@ -0,0 +1,216 @@ +/*- + * Public Domain 2014-2015 MongoDB, Inc. + * Public Domain 2008-2014 WiredTiger, Inc. + * + * This is free and unencumbered software released into the public domain. + * + * Anyone is free to copy, modify, publish, use, compile, sell, or + * distribute this software, either in source code form or as a compiled + * binary, for any purpose, commercial or non-commercial, and by any + * means. + * + * In jurisdictions that recognize copyright laws, the author or authors + * of this software dedicate any and all copyright interest in the + * software to the public domain. We make this dedication for the benefit + * of the public at large and to the detriment of our heirs and + * successors. We intend this dedication to be an overt act of + * relinquishment in perpetuity of all present and future rights to this + * software under copyright law. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR + * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +#include "wtperf.h" + +static inline uint64_t +decode_key(char *key_buf) +{ + return (strtoull(key_buf, NULL, 10)); +} + +int +setup_truncate(CONFIG *cfg, CONFIG_THREAD *thread, WT_SESSION *session) { + + TRUNCATE_CONFIG *trunc_cfg; + TRUNCATE_QUEUE_ENTRY *truncate_item; + WORKLOAD *workload; + WT_CURSOR *cursor; + char *key, *truncate_key; + int ret; + uint64_t end_point, final_stone_gap, i, start_point; + + end_point = final_stone_gap = start_point = 0; + trunc_cfg = &thread->trunc_cfg; + workload = thread->workload; + + /* We are limited to only one table when running truncate. */ + if ((ret = session->open_cursor( + session, cfg->uris[0], NULL, NULL, &cursor)) != 0) + goto err; + + /* How many entries between each stone. */ + trunc_cfg->stone_gap = + (workload->truncate_count * workload->truncate_pct) / 100; + /* How many stones we need. */ + trunc_cfg->needed_stones = + workload->truncate_count / trunc_cfg->stone_gap; + + final_stone_gap = trunc_cfg->stone_gap; + + /* Reset this value for use again. */ + trunc_cfg->stone_gap = 0; + + /* + * Here we check if there is data in the collection. If there is + * data available, then we need to setup some initial truncation + * stones. + */ + if ((ret = cursor->next(cursor)) != 0 || + (ret = cursor->get_key(cursor, &key)) != 0) { + lprintf(cfg, ret, 0, "truncate setup start: failed"); + goto err; + } + + start_point = decode_key(key); + if ((cursor->reset(cursor)) != 0 || (ret = cursor->prev(cursor)) != 0 || + (ret = cursor->get_key(cursor, &key)) != 0) { + lprintf(cfg, ret, 0, "truncate setup end: failed"); + goto err; + } + end_point = decode_key(key); + + /* Assign stones if there are enough documents. */ + if (start_point + trunc_cfg->needed_stones > end_point) + trunc_cfg->stone_gap = 0; + else + trunc_cfg->stone_gap = + (end_point - start_point) / trunc_cfg->needed_stones; + + /* If we have enough data allocate some stones. */ + if (trunc_cfg->stone_gap != 0) { + trunc_cfg->expected_total = (end_point - start_point); + for (i = 1; i <= trunc_cfg->needed_stones; i++) { + truncate_key = calloc(cfg->key_sz, 1); + if (truncate_key == NULL) { + ret = enomem(cfg); + goto err; + } + truncate_item = calloc(sizeof(TRUNCATE_QUEUE_ENTRY), 1); + if (truncate_item == NULL) { + free(truncate_key); + ret = enomem(cfg); + goto err; + } + generate_key( + cfg, truncate_key, trunc_cfg->stone_gap * i); + truncate_item->key = truncate_key; + truncate_item->diff = + (trunc_cfg->stone_gap * i) - trunc_cfg->last_key; + STAILQ_INSERT_TAIL( &cfg->stone_head, truncate_item, q); + trunc_cfg->last_key = trunc_cfg->stone_gap * i; + trunc_cfg->num_stones++; + } + } + trunc_cfg->stone_gap = final_stone_gap; + +err: if ((ret = cursor->close(cursor)) != 0) { + lprintf(cfg, ret, 0, "truncate setup: cursor close failed"); + } + return (ret); +} + +int +run_truncate(CONFIG *cfg, CONFIG_THREAD *thread, + WT_CURSOR *cursor, WT_SESSION *session, int *truncatedp) { + + TRUNCATE_CONFIG *trunc_cfg; + TRUNCATE_QUEUE_ENTRY *truncate_item; + char *truncate_key; + int ret, t_ret; + + ret = 0; + trunc_cfg = &thread->trunc_cfg; + + *truncatedp = 0; + /* Update the total inserts */ + trunc_cfg->total_inserts = sum_insert_ops(cfg); + trunc_cfg->expected_total += + (trunc_cfg->total_inserts - trunc_cfg->last_total_inserts); + trunc_cfg->last_total_inserts = trunc_cfg->total_inserts; + + /* We are done if there isn't enough data to trigger a new milestone. */ + if (trunc_cfg->expected_total <= trunc_cfg->needed_stones) + return (0); + + while (trunc_cfg->num_stones < trunc_cfg->needed_stones) { + trunc_cfg->last_key += trunc_cfg->stone_gap; + truncate_key = calloc(cfg->key_sz, 1); + if (truncate_key == NULL) { + lprintf(cfg, ENOMEM, 0, + "truncate: couldn't allocate key array"); + return (ENOMEM); + } + truncate_item = calloc(sizeof(TRUNCATE_QUEUE_ENTRY), 1); + if (truncate_item == NULL) { + free(truncate_key); + lprintf(cfg, ENOMEM, 0, + "truncate: couldn't allocate item"); + return (ENOMEM); + } + generate_key(cfg, truncate_key, trunc_cfg->last_key); + truncate_item->key = truncate_key; + truncate_item->diff = trunc_cfg->stone_gap; + STAILQ_INSERT_TAIL(&cfg->stone_head, truncate_item, q); + trunc_cfg->num_stones++; + } + + /* We are done if there isn't enough data to trigger a truncate. */ + if (trunc_cfg->num_stones == 0 || + trunc_cfg->expected_total <= thread->workload->truncate_count) + return (0); + + truncate_item = STAILQ_FIRST(&cfg->stone_head); + trunc_cfg->num_stones--; + STAILQ_REMOVE_HEAD(&cfg->stone_head, q); + cursor->set_key(cursor,truncate_item->key); + if ((ret = cursor->search(cursor)) != 0) { + lprintf(cfg, ret, 0, "Truncate search: failed"); + goto err; + } + + if ((ret = session->truncate(session, NULL, NULL, cursor, NULL)) != 0) { + lprintf(cfg, ret, 0, "Truncate: failed"); + goto err; + } + + + *truncatedp = 1; + trunc_cfg->expected_total -= truncate_item->diff; + +err: free(truncate_item->key); + free(truncate_item); + t_ret = cursor->reset(cursor); + if (t_ret != 0) + lprintf(cfg, t_ret, 0, "Cursor reset failed"); + if (ret == 0 && t_ret != 0) + ret = t_ret; + return (ret); +} + +void +cleanup_truncate_config(CONFIG *cfg) { + TRUNCATE_QUEUE_ENTRY *truncate_item; + + while (!STAILQ_EMPTY(&cfg->stone_head)) { + truncate_item = STAILQ_FIRST(&cfg->stone_head); + STAILQ_REMOVE_HEAD(&cfg->stone_head, q); + free(truncate_item->key); + free(truncate_item); + } +} diff --git a/dist/s_define.list b/dist/s_define.list index 623a34447a8..a9ae2a10006 100644 --- a/dist/s_define.list +++ b/dist/s_define.list @@ -21,8 +21,8 @@ WT_ATOMIC_ADD2 WT_ATOMIC_CAS1 WT_ATOMIC_CAS2 WT_ATOMIC_FETCH_ADD1 -WT_ATOMIC_FETCH_ADD2 WT_ATOMIC_FETCH_ADD4 +WT_ATOMIC_FETCH_ADD8 WT_ATOMIC_STORE1 WT_ATOMIC_STORE2 WT_ATOMIC_SUB1 diff --git a/dist/s_string.ok b/dist/s_string.ok index 91407361bcb..0853ce971b8 100644 --- a/dist/s_string.ok +++ b/dist/s_string.ok @@ -746,6 +746,7 @@ nop noraw notfound notsup +notused nset nsnap nul diff --git a/examples/c/ex_cursor.c b/examples/c/ex_cursor.c index 112e38f9a2e..35597ca7b8e 100644 --- a/examples/c/ex_cursor.c +++ b/examples/c/ex_cursor.c @@ -93,7 +93,7 @@ cursor_search(WT_CURSOR *cursor) cursor->set_key(cursor, "foo"); - if ((ret = cursor->search(cursor)) != 0) + if ((ret = cursor->search(cursor)) == 0) ret = cursor->get_value(cursor, &value); return (ret); diff --git a/src/btree/bt_walk.c b/src/btree/bt_walk.c index f257a955801..2705f371fb5 100644 --- a/src/btree/bt_walk.c +++ b/src/btree/bt_walk.c @@ -81,10 +81,11 @@ __wt_tree_walk(WT_SESSION_IMPL *session, WT_PAGE *page; WT_PAGE_INDEX *pindex; WT_REF *couple, *couple_orig, *ref; - int prev, skip; + int empty_internal, prev, skip; uint32_t slot; btree = S2BT(session); + empty_internal = 0; /* * Tree walks are special: they look inside page structures that splits @@ -171,6 +172,15 @@ ascend: /* (!prev && slot == pindex->entries - 1)) { ref = ref->home->pg_intl_parent_ref; + /* + * If we got all the way through an internal page and + * all of the child pages were deleted, evict it. + */ + if (empty_internal) { + __wt_page_evict_soon(ref->page); + empty_internal = 0; + } + /* Optionally skip internal pages. */ if (LF_ISSET(WT_READ_SKIP_INTL)) goto ascend; @@ -226,6 +236,13 @@ ascend: /* if (ref->pindex_hint != slot) ref->pindex_hint = slot; + /* + * If we see any child states other than deleted, the + * page isn't empty. + */ + if (ref->state != WT_REF_DELETED) + empty_internal = 0; + if (LF_ISSET(WT_READ_CACHE)) { /* * Only look at unlocked pages in memory: @@ -338,10 +355,10 @@ ascend: /* */ descend: couple = ref; page = ref->page; - if (page->type == WT_PAGE_ROW_INT || - page->type == WT_PAGE_COL_INT) { + if (WT_PAGE_IS_INTERNAL(page)) { WT_INTL_INDEX_GET(session, page, pindex); slot = prev ? pindex->entries - 1 : 0; + empty_internal = 1; } else { *refp = ref; goto done; diff --git a/src/btree/row_modify.c b/src/btree/row_modify.c index f0a10cdf528..62177b7e4c7 100644 --- a/src/btree/row_modify.c +++ b/src/btree/row_modify.c @@ -263,7 +263,6 @@ int __wt_update_alloc( WT_SESSION_IMPL *session, WT_ITEM *value, WT_UPDATE **updp, size_t *sizep) { - WT_UPDATE *upd; size_t size; /* @@ -271,16 +270,15 @@ __wt_update_alloc( * the value into place. */ size = value == NULL ? 0 : value->size; - WT_RET(__wt_calloc(session, 1, sizeof(WT_UPDATE) + size, &upd)); + WT_RET(__wt_calloc(session, 1, sizeof(WT_UPDATE) + size, updp)); if (value == NULL) - WT_UPDATE_DELETED_SET(upd); + WT_UPDATE_DELETED_SET(*updp); else { - upd->size = WT_STORE_SIZE(size); - memcpy(WT_UPDATE_DATA(upd), value->data, size); + (*updp)->size = WT_STORE_SIZE(size); + memcpy(WT_UPDATE_DATA(*updp), value->data, size); } - *updp = upd; - *sizep = WT_UPDATE_MEMSIZE(upd); + *sizep = WT_UPDATE_MEMSIZE(*updp); return (0); } diff --git a/src/docs/wtperf.dox b/src/docs/wtperf.dox index 100857a985d..e541d8df873 100644 --- a/src/docs/wtperf.dox +++ b/src/docs/wtperf.dox @@ -233,8 +233,8 @@ be 'threads=((count=2,reads=1)(count=8,reads=1,inserts=2,updates=1))' which would create 2 threads doing nothing but reads and 8 threads each doing 50% inserts and 25% reads and updates. Allowed configuration values are 'count', 'throttle', 'reads', 'inserts', -'updates'. There are also behavior modifiers, supported modifiers are -'ops_per_txn' +'updates', 'truncate', 'truncate_pct' and 'truncate_count'. There are +also behavior modifiers, supported modifiers are 'ops_per_txn' @par transaction_config (string, default=) transaction configuration string, relevant when populate_opts_per_txn is nonzero diff --git a/src/evict/evict_lru.c b/src/evict/evict_lru.c index 6aa61b4137b..dc46c1abf1e 100644 --- a/src/evict/evict_lru.c +++ b/src/evict/evict_lru.c @@ -1245,6 +1245,9 @@ fast: /* If the page can't be evicted, give up. */ WT_RET(__wt_verbose(session, WT_VERB_EVICTSERVER, "select: %p, size %" PRIu64, page, page->memory_footprint)); } + WT_RET_NOTFOUND_OK(ret); + + *slotp += (u_int)(evict - start); /* * If we happen to end up on the root page, clear it. We have to track @@ -1257,16 +1260,12 @@ fast: /* If the page can't be evicted, give up. */ if ((ref = btree->evict_ref) != NULL && (__wt_ref_is_root(ref) || ref->page->read_gen == WT_READGEN_OLDEST)) { btree->evict_ref = NULL; - __wt_page_release(session, ref, WT_READ_NO_EVICT); + WT_RET(__wt_page_release(session, ref, WT_READ_NO_EVICT)); } - /* If the walk was interrupted by a locked page, that's okay. */ - if (ret == WT_NOTFOUND) - ret = 0; - - *slotp += (u_int)(evict - start); WT_STAT_FAST_CONN_INCRV(session, cache_eviction_walk, pages_walked); - return (ret); + + return (0); } /* diff --git a/src/include/extern.h b/src/include/extern.h index 992e65851eb..df76c5b8ff6 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -643,8 +643,8 @@ extern uint32_t __wt_nlpo2(uint32_t v); extern uint32_t __wt_log2_int(uint32_t n); extern int __wt_ispo2(uint32_t v); extern uint32_t __wt_rduppo2(uint32_t n, uint32_t po2); -extern void __wt_random_init(uint64_t volatile *rnd_state); -extern uint32_t __wt_random(uint64_t volatile *rnd_state); +extern void __wt_random_init(WT_RAND_STATE volatile *rnd_state); +extern uint32_t __wt_random(WT_RAND_STATE volatile *rnd_state); extern int __wt_buf_grow_worker(WT_SESSION_IMPL *session, WT_ITEM *buf, size_t size); extern int __wt_buf_fmt(WT_SESSION_IMPL *session, WT_ITEM *buf, const char *fmt, ...) WT_GCC_FUNC_DECL_ATTRIBUTE((format (printf, 3, 4))); extern int __wt_buf_catfmt(WT_SESSION_IMPL *session, WT_ITEM *buf, const char *fmt, ...) WT_GCC_FUNC_DECL_ATTRIBUTE((format (printf, 3, 4))); diff --git a/src/include/misc.h b/src/include/misc.h index bcb5e054865..1b2cbf11fc2 100644 --- a/src/include/misc.h +++ b/src/include/misc.h @@ -256,3 +256,11 @@ #define __wt_page_swap(session, held, want, flags) \ __wt_page_swap_func(session, held, want, flags) #endif + +/* Random number generator state. */ +union __wt_rand_state { + uint64_t v; + struct { + uint32_t w, z; + } x; +}; diff --git a/src/include/mutex.h b/src/include/mutex.h index 8e0a2e2d5a6..a08d504a054 100644 --- a/src/include/mutex.h +++ b/src/include/mutex.h @@ -24,24 +24,20 @@ struct __wt_condvar { /* * !!! - * Don't touch this structure without understanding the read/write - * locking functions. + * Don't modify this structure without understanding the read/write locking + * functions. */ -typedef union { /* Read/write lock */ -#ifdef WORDS_BIGENDIAN - WiredTiger read/write locks require modification for big-endian systems. -#else +typedef union { /* Read/write lock */ uint64_t u; struct { - uint32_t us; + uint32_t wr; /* Writers and readers */ } i; struct { - uint16_t writers; - uint16_t readers; - uint16_t users; - uint16_t pad; + uint16_t writers; /* Now serving for writers */ + uint16_t readers; /* Now serving for readers */ + uint16_t users; /* Next available ticket number */ + uint16_t __notused; /* Padding */ } s; -#endif } wt_rwlock_t; /* diff --git a/src/include/serial.i b/src/include/serial.i index 9e6b0f7916c..9e5acde9616 100644 --- a/src/include/serial.i +++ b/src/include/serial.i @@ -30,6 +30,32 @@ __page_write_gen_wrapped_check(WT_PAGE *page) } /* + * __insert_simple_func -- + * Worker function to add a WT_INSERT entry to the middle of a skiplist. + */ +static inline int +__insert_simple_func(WT_SESSION_IMPL *session, + WT_INSERT ***ins_stack, WT_INSERT *new_ins, u_int skipdepth) +{ + u_int i; + + WT_UNUSED(session); + + /* + * Update the skiplist elements referencing the new WT_INSERT item. + * If we fail connecting one of the upper levels in the skiplist, + * return success: the levels we updated are correct and sufficient. + * Even though we don't get the benefit of the memory we allocated, + * we can't roll back. + */ + for (i = 0; i < skipdepth; i++) + if (!WT_ATOMIC_CAS8(*ins_stack[i], new_ins->next[i], new_ins)) + return (i == 0 ? WT_RESTART : 0); + + return (0); +} + +/* * __insert_serial_func -- * Worker function to add a WT_INSERT entry to a skiplist. */ @@ -42,31 +68,20 @@ __insert_serial_func(WT_SESSION_IMPL *session, WT_INSERT_HEAD *ins_head, WT_UNUSED(session); /* - * Confirm we are still in the expected position, and no item has been - * added where our insert belongs. Take extra care at the beginning - * and end of the list (at each level): retry if we race there. + * Update the skiplist elements referencing the new WT_INSERT item. * - * !!! - * Note the test for ins_stack[0] == NULL: that's the test for an - * uninitialized cursor, ins_stack[0] is cleared as part of - * initializing a cursor for a search. + * Confirm we are still in the expected position, and no item has been + * added where our insert belongs. If we fail connecting one of the + * upper levels in the skiplist, return success: the levels we updated + * are correct and sufficient. Even though we don't get the benefit of + * the memory we allocated, we can't roll back. */ for (i = 0; i < skipdepth; i++) { - if (ins_stack[i] == NULL || - *ins_stack[i] != new_ins->next[i]) - return (WT_RESTART); - if (new_ins->next[i] == NULL && - ins_head->tail[i] != NULL && - ins_stack[i] != &ins_head->tail[i]->next[i]) - return (WT_RESTART); - } - - /* Update the skiplist elements referencing the new WT_INSERT item. */ - for (i = 0; i < skipdepth; i++) { + if (!WT_ATOMIC_CAS8(*ins_stack[i], new_ins->next[i], new_ins)) + return (i == 0 ? WT_RESTART : 0); if (ins_head->tail[i] == NULL || ins_stack[i] == &ins_head->tail[i]->next[i]) ins_head->tail[i] = new_ins; - *ins_stack[i] = new_ins; } return (0); @@ -128,12 +143,19 @@ __wt_col_append_serial(WT_SESSION_IMPL *session, WT_PAGE *page, WT_INSERT *new_ins = *new_insp; WT_DECL_RET; - /* Clear references to memory we now own. */ - *new_insp = NULL; + /* !!! + * Test for an uninitialized cursor, ins_stack[0] is cleared as part of + * initializing a cursor for a search. + */ + if (ins_stack[0] == NULL) + return (WT_RESTART); /* Check for page write generation wrap. */ WT_RET(__page_write_gen_wrapped_check(page)); + /* Clear references to memory we now own and must free on error. */ + *new_insp = NULL; + /* Acquire the page's spinlock, call the worker function. */ WT_PAGE_LOCK(session, page); ret = __col_append_serial_func( @@ -171,21 +193,39 @@ __wt_insert_serial(WT_SESSION_IMPL *session, WT_PAGE *page, { WT_INSERT *new_ins = *new_insp; WT_DECL_RET; + int simple; + u_int i; - /* Clear references to memory we now own. */ - *new_insp = NULL; + /* !!! + * Test for an uninitialized cursor, ins_stack[0] is cleared as part of + * initializing a cursor for a search. + */ + if (ins_stack[0] == NULL) + return (WT_RESTART); /* Check for page write generation wrap. */ WT_RET(__page_write_gen_wrapped_check(page)); - /* Acquire the page's spinlock, call the worker function. */ - WT_PAGE_LOCK(session, page); - ret = __insert_serial_func( - session, ins_head, ins_stack, new_ins, skipdepth); - WT_PAGE_UNLOCK(session, page); + /* Clear references to memory we now own and must free on error. */ + *new_insp = NULL; + + simple = 1; + for (i = 0; i < skipdepth; i++) + if (new_ins->next[i] == NULL) + simple = 0; + + if (simple) + ret = __insert_simple_func( + session, ins_stack, new_ins, skipdepth); + else { + WT_PAGE_LOCK(session, page); + ret = __insert_serial_func( + session, ins_head, ins_stack, new_ins, skipdepth); + WT_PAGE_UNLOCK(session, page); + } - /* Free unused memory on error. */ if (ret != 0) { + /* Free unused memory on error. */ __wt_free(session, new_ins); return (ret); } @@ -215,17 +255,15 @@ __wt_update_serial(WT_SESSION_IMPL *session, WT_PAGE *page, WT_DECL_RET; WT_UPDATE *obsolete, *upd = *updp; - /* Clear references to memory we now own. */ - *updp = NULL; - /* Check for page write generation wrap. */ WT_RET(__page_write_gen_wrapped_check(page)); + /* Clear references to memory we now own and must free on error. */ + *updp = NULL; + /* * Swap the update into place. If that fails, a new update was added - * after our search, we raced. Check if our update is still permitted, - * and if it is, do a full-barrier to ensure the update's next pointer - * is set before we update the linked list and try again. + * after our search, we raced. Check if our update is still permitted. */ while (!WT_ATOMIC_CAS8(*srch_upd, upd->next, upd)) { if ((ret = __wt_txn_update_check( @@ -234,7 +272,6 @@ __wt_update_serial(WT_SESSION_IMPL *session, WT_PAGE *page, __wt_free(session, upd); return (ret); } - WT_WRITE_BARRIER(); } /* diff --git a/src/include/session.h b/src/include/session.h index bf1aa98d8d3..f32da177bf9 100644 --- a/src/include/session.h +++ b/src/include/session.h @@ -148,7 +148,7 @@ struct WT_COMPILER_TYPE_ALIGN(WT_CACHE_LINE_ALIGNMENT) __wt_session_impl { #define WT_SESSION_CLEAR_SIZE(s) \ (WT_PTRDIFF(&(s)->rnd, s)) - uint64_t rnd; /* Random number generation state */ + WT_RAND_STATE rnd; /* Random number generation state */ /* Hashed handle reference list array */ SLIST_HEAD(__dhandles_hash, __wt_data_handle_cache) *dhhash; diff --git a/src/include/txn.i b/src/include/txn.i index a9b54d26e47..a8e052ec5eb 100644 --- a/src/include/txn.i +++ b/src/include/txn.i @@ -377,7 +377,8 @@ __wt_txn_id_check(WT_SESSION_IMPL *session) do { txn_state->id = txn->id = txn_global->current; } while (!WT_ATOMIC_CAS8( - txn_global->current, txn->id, txn->id + 1)); + txn_global->current, txn->id, txn->id + 1) || + WT_TXNID_LT(txn->id, txn_global->last_running)); /* * If we have used 64-bits of transaction IDs, there is nothing diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h index b876a2d032d..64e29e104bc 100644 --- a/src/include/wt_internal.h +++ b/src/include/wt_internal.h @@ -266,6 +266,8 @@ struct __wt_upd_skipped; typedef struct __wt_upd_skipped WT_UPD_SKIPPED; struct __wt_update; typedef struct __wt_update WT_UPDATE; +union __wt_rand_state; + typedef union __wt_rand_state WT_RAND_STATE; /* * Forward type declarations for internal types: END * DO NOT EDIT: automatically built by dist/s_typedef. diff --git a/src/os_posix/os_mtx_rw.c b/src/os_posix/os_mtx_rw.c index cdd4f8a24e1..df558b12bef 100644 --- a/src/os_posix/os_mtx_rw.c +++ b/src/os_posix/os_mtx_rw.c @@ -38,6 +38,78 @@ * Joseph Seigh. Note that a similar (but not identical) algorithm was published * by John Mellor-Crummey and Michael Scott in their landmark paper "Scalable * Reader-Writer Synchronization for Shared-Memory Multiprocessors". + * + * The following is an explanation of this code. First, the underlying lock + * structure. + * + * struct { + * uint16_t writers; Now serving for writers + * uint16_t readers; Now serving for readers + * uint16_t users; Next available ticket number + * uint16_t __notused; Padding + * } + * + * First, imagine a store's 'take a number' ticket algorithm. A customer takes + * a unique ticket number and customers are served in ticket order. In the data + * structure, 'writers' is the next writer to be served, 'readers' is the next + * reader to be served, and 'users' is the next available ticket number. + * + * Next, consider exclusive (write) locks. The 'now serving' number for writers + * is 'writers'. To lock, 'take a number' and wait until that number is being + * served; more specifically, atomically copy and increment the current value of + * 'users', and then wait until 'writers' equals that copied number. + * + * Shared (read) locks are similar. Like writers, readers atomically get the + * next number available. However, instead of waiting for 'writers' to equal + * their number, they wait for 'readers' to equal their number. + * + * This has the effect of queuing lock requests in the order they arrive + * (incidentally avoiding starvation). + * + * Each lock/unlock pair requires incrementing both 'readers' and 'writers'. + * In the case of a reader, the 'readers' increment happens when the reader + * acquires the lock (to allow read-lock sharing), and the 'writers' increment + * happens when the reader releases the lock. In the case of a writer, both + * 'readers' and 'writers' are incremented when the writer releases the lock. + * + * For example, consider the following read (R) and write (W) lock requests: + * + * writers readers users + * 0 0 0 + * R: ticket 0, readers match OK 0 1 1 + * R: ticket 1, readers match OK 0 2 2 + * R: ticket 2, readers match OK 0 3 3 + * W: ticket 3, writers no match block 0 3 4 + * R: ticket 2, unlock 1 3 4 + * R: ticket 0, unlock 2 3 4 + * R: ticket 1, unlock 3 3 4 + * W: ticket 3, writers match OK 3 3 4 + * + * Note the writer blocks until 'writers' equals its ticket number and it does + * not matter if readers unlock in order or not. + * + * Readers or writers entering the system after the write lock is queued block, + * and the next ticket holder (reader or writer) will unblock when the writer + * unlocks. An example, continuing from the last line of the above example: + * + * writers readers users + * W: ticket 3, writers match OK 3 3 4 + * R: ticket 4, readers no match block 3 3 5 + * R: ticket 5, readers no match block 3 3 6 + * W: ticket 6, writers no match block 3 3 7 + * W: ticket 3, unlock 4 4 7 + * R: ticket 4, readers match OK 4 5 7 + * R: ticket 5, readers match OK 4 6 7 + * + * The 'users' field is a 2-byte value so the available ticket number wraps at + * 64K requests. If a thread's lock request is not granted until the 'users' + * field cycles and the same ticket is taken by another thread, we could grant + * a lock to two separate threads at the same time, and bad things happen: two + * writer threads or a reader thread and a writer thread would run in parallel, + * and lock waiters could be skipped if the unlocks race. This is unlikely, it + * only happens if a lock request is blocked by 64K other requests. The fix is + * to grow the lock structure fields, but the largest atomic instruction we have + * is 8 bytes, the structure has no room to grow. */ #include "wt_internal.h" @@ -69,20 +141,31 @@ __wt_rwlock_alloc( int __wt_try_readlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock) { - wt_rwlock_t *l; - uint64_t old, new, pad, users, writers; + wt_rwlock_t *l, new, old; WT_RET(__wt_verbose( session, WT_VERB_MUTEX, "rwlock: try_readlock %s", rwlock->name)); WT_STAT_FAST_CONN_INCR(session, rwlock_read); l = &rwlock->rwlock; - pad = l->s.pad; - users = l->s.users; - writers = l->s.writers; - old = (pad << 48) + (users << 32) + (users << 16) + writers; - new = (pad << 48) + ((users + 1) << 32) + ((users + 1) << 16) + writers; - return (WT_ATOMIC_CAS8(l->u, old, new) ? 0 : EBUSY); + new = old = *l; + + /* + * This read lock can only be granted if the lock was last granted to + * a reader and there are no readers or writers blocked on the lock, + * that is, if this thread's ticket would be the next ticket granted. + * Do the cheap test to see if this can possibly succeed (and confirm + * the lock is in the correct state to grant this read lock). + */ + if (old.s.readers != old.s.users) + return (EBUSY); + + /* + * The replacement lock value is a result of allocating a new ticket and + * incrementing the reader value to match it. + */ + new.s.readers = new.s.users = old.s.users + 1; + return (WT_ATOMIC_CAS8(l->u, old.u, new.u) ? 0 : EBUSY); } /* @@ -93,8 +176,7 @@ int __wt_readlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock) { wt_rwlock_t *l; - uint64_t me; - uint16_t val; + uint16_t ticket; int pause_cnt; WT_RET(__wt_verbose( @@ -102,17 +184,22 @@ __wt_readlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock) WT_STAT_FAST_CONN_INCR(session, rwlock_read); l = &rwlock->rwlock; - me = WT_ATOMIC_FETCH_ADD8(l->u, (uint64_t)1 << 32); - val = (uint16_t)(me >> 32); - for (pause_cnt = 0; val != l->s.readers;) { + + /* + * Possibly wrap: if we have more than 64K lockers waiting, the ticket + * value will wrap and two lockers will simultaneously be granted the + * lock. + */ + ticket = WT_ATOMIC_FETCH_ADD2(l->s.users, 1); + for (pause_cnt = 0; ticket != l->s.readers;) { /* * We failed to get the lock; pause before retrying and if we've * paused enough, sleep so we don't burn CPU to no purpose. This * situation happens if there are more threads than cores in the - * system and we're thrashing on shared resources. Regardless, - * don't sleep long, all we need is to schedule the other reader - * threads to complete a few more instructions and increment the - * reader count. + * system and we're thrashing on shared resources. + * + * Don't sleep long when waiting on a read lock, hopefully we're + * waiting on another read thread to increment the reader count. */ if (++pause_cnt < 1000) WT_PAUSE(); @@ -120,6 +207,10 @@ __wt_readlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock) __wt_sleep(0, 10); } + /* + * We're the only writer of the readers field, so the update does not + * need to be atomic. + */ ++l->s.readers; return (0); @@ -138,6 +229,11 @@ __wt_readunlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock) session, WT_VERB_MUTEX, "rwlock: read unlock %s", rwlock->name)); l = &rwlock->rwlock; + + /* + * Increment the writers value (other readers are doing the same, make + * sure we don't race). + */ WT_ATOMIC_ADD2(l->s.writers, 1); return (0); @@ -150,20 +246,28 @@ __wt_readunlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock) int __wt_try_writelock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock) { - wt_rwlock_t *l; - uint64_t old, new, pad, readers, users; + wt_rwlock_t *l, new, old; WT_RET(__wt_verbose( session, WT_VERB_MUTEX, "rwlock: try_writelock %s", rwlock->name)); WT_STAT_FAST_CONN_INCR(session, rwlock_write); l = &rwlock->rwlock; - pad = l->s.pad; - readers = l->s.readers; - users = l->s.users; - old = (pad << 48) + (users << 32) + (readers << 16) + users; - new = (pad << 48) + ((users + 1) << 32) + (readers << 16) + users; - return (WT_ATOMIC_CAS8(l->u, old, new) ? 0 : EBUSY); + old = new = *l; + + /* + * This write lock can only be granted if the lock was last granted to + * a writer and there are no readers or writers blocked on the lock, + * that is, if this thread's ticket would be the next ticket granted. + * Do the cheap test to see if this can possibly succeed (and confirm + * the lock is in the correct state to grant this write lock). + */ + if (old.s.writers != old.s.users) + return (EBUSY); + + /* The replacement lock value is a result of allocating a new ticket. */ + ++new.s.users; + return (WT_ATOMIC_CAS8(l->u, old.u, new.u) ? 0 : EBUSY); } /* @@ -174,23 +278,33 @@ int __wt_writelock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock) { wt_rwlock_t *l; - uint64_t me; - uint16_t val; + uint16_t ticket; + int pause_cnt; WT_RET(__wt_verbose( session, WT_VERB_MUTEX, "rwlock: writelock %s", rwlock->name)); WT_STAT_FAST_CONN_INCR(session, rwlock_write); + l = &rwlock->rwlock; + /* - * Possibly wrap: if we have more than 64K lockers waiting, the count - * of writers will wrap and two lockers will simultaneously be granted - * the write lock. + * Possibly wrap: if we have more than 64K lockers waiting, the ticket + * value will wrap and two lockers will simultaneously be granted the + * lock. */ - l = &rwlock->rwlock; - me = WT_ATOMIC_FETCH_ADD8(l->u, (uint64_t)1 << 32); - val = (uint16_t)(me >> 32); - while (val != l->s.writers) - WT_PAUSE(); + ticket = WT_ATOMIC_FETCH_ADD2(l->s.users, 1); + for (pause_cnt = 0; ticket != l->s.writers;) { + /* + * We failed to get the lock; pause before retrying and if we've + * paused enough, sleep so we don't burn CPU to no purpose. This + * situation happens if there are more threads than cores in the + * system and we're thrashing on shared resources. + */ + if (++pause_cnt < 1000) + WT_PAUSE(); + else + __wt_sleep(0, 10); + } return (0); } @@ -211,12 +325,23 @@ __wt_writeunlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock) copy = *l; + /* + * We're the only writer of the writers/readers fields, so the update + * does not need to be atomic; we have to update both values at the + * same time though, otherwise we'd potentially race with the thread + * next granted the lock. + * + * Use a memory barrier to ensure the compiler doesn't mess with these + * instructions and rework the code in a way that avoids the update as + * a unit. + */ WT_BARRIER(); ++copy.s.writers; ++copy.s.readers; - l->i.us = copy.i.us; + l->i.wr = copy.i.wr; + return (0); } diff --git a/src/reconcile/rec_write.c b/src/reconcile/rec_write.c index 53a73b44feb..37acb28a00b 100644 --- a/src/reconcile/rec_write.c +++ b/src/reconcile/rec_write.c @@ -343,11 +343,12 @@ __wt_reconcile(WT_SESSION_IMPL *session, WT_PAGE *page; WT_PAGE_MODIFY *mod; WT_RECONCILE *r; - int locked; + int page_lock, scan_lock, split_lock; conn = S2C(session); page = ref->page; mod = page->modify; + page_lock = scan_lock = split_lock = 0; /* We're shouldn't get called with a clean page, that's an error. */ if (!__wt_page_is_modified(page)) @@ -386,22 +387,38 @@ __wt_reconcile(WT_SESSION_IMPL *session, /* * The compaction process looks at the page's modification information; - * if compaction is running, lock the page down. - * - * Otherwise, flip on the scanning flag: obsolete updates cannot be - * freed while reconciliation is in progress. + * if compaction is running, acquire the page's lock. */ - locked = 0; if (conn->compact_in_memory_pass) { - locked = 1; WT_PAGE_LOCK(session, page); - } else + page_lock = 1; + } + + /* + * Reconciliation reads the lists of updates, so obsolete updates cannot + * be discarded while reconciliation is in progress. + */ + for (;;) { + F_CAS_ATOMIC(page, WT_PAGE_SCANNING, ret); + if (ret == 0) + break; + __wt_yield(); + } + scan_lock = 1; + + /* + * Mark internal pages as splitting to ensure we don't deadlock when + * performing an in-memory split during a checkpoint. + */ + if (WT_PAGE_IS_INTERNAL(page)) { for (;;) { - F_CAS_ATOMIC(page, WT_PAGE_SCANNING, ret); + F_CAS_ATOMIC(page, WT_PAGE_SPLIT_LOCKED, ret); if (ret == 0) break; __wt_yield(); } + split_lock = 1; + } /* Reconcile the page. */ switch (page->type) { @@ -434,11 +451,13 @@ __wt_reconcile(WT_SESSION_IMPL *session, else WT_TRET(__rec_write_wrapup_err(session, r, page)); - /* Release the page lock if we're holding one. */ - if (locked) - WT_PAGE_UNLOCK(session, page); - else + /* Release the locks we're holding. */ + if (split_lock) + F_CLR_ATOMIC(page, WT_PAGE_SPLIT_LOCKED); + if (scan_lock) F_CLR_ATOMIC(page, WT_PAGE_SCANNING); + if (page_lock) + WT_PAGE_UNLOCK(session, page); /* * Clean up the boundary structures: some workloads result in millions @@ -3266,18 +3285,6 @@ __rec_col_int(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page) WT_RET(__rec_split_init( session, r, page, page->pg_intl_recno, btree->maxintlpage)); - /* - * We need to mark this page as splitting, as this may be an in-memory - * split during a checkpoint. - */ - for (;;) { - F_CAS_ATOMIC(page, WT_PAGE_SPLIT_LOCKED, ret); - if (ret == 0) { - break; - } - __wt_yield(); - } - /* For each entry in the in-memory page... */ WT_INTL_FOREACH_BEGIN(session, page, ref) { /* Update the starting record number in case we split. */ @@ -3360,8 +3367,6 @@ __rec_col_int(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page) __rec_copy_incr(session, r, val); } WT_INTL_FOREACH_END; - F_CLR_ATOMIC(page, WT_PAGE_SPLIT_LOCKED); - /* Write the remnant page. */ return (__rec_split_finish(session, r)); @@ -4094,18 +4099,6 @@ __rec_row_int(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page) */ r->cell_zero = 1; - /* - * We need to mark this page as splitting in order to ensure we don't - * deadlock when performing an in-memory split during a checkpoint. - */ - for (;;) { - F_CAS_ATOMIC(page, WT_PAGE_SPLIT_LOCKED, ret); - if (ret == 0) { - break; - } - __wt_yield(); - } - /* For each entry in the in-memory page... */ WT_INTL_FOREACH_BEGIN(session, page, ref) { /* @@ -4264,8 +4257,6 @@ __rec_row_int(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page) __rec_key_state_update(r, ovfl_key); } WT_INTL_FOREACH_END; - F_CLR_ATOMIC(page, WT_PAGE_SPLIT_LOCKED); - /* Write the remnant page. */ return (__rec_split_finish(session, r)); diff --git a/src/session/session_api.c b/src/session/session_api.c index ef9735a8b98..e7ac9d9b365 100644 --- a/src/session/session_api.c +++ b/src/session/session_api.c @@ -800,7 +800,7 @@ __session_commit_transaction(WT_SESSION *wt_session, const char *config) WT_STAT_FAST_CONN_INCR(session, txn_commit); txn = &session->txn; - if (F_ISSET(txn, WT_TXN_ERROR)) { + if (F_ISSET(txn, WT_TXN_ERROR) && txn->mod_count != 0) { __wt_errx(session, "failed transaction requires rollback"); ret = EINVAL; } diff --git a/src/support/rand.c b/src/support/rand.c index 7dfb98c5ca4..caac04d3529 100644 --- a/src/support/rand.c +++ b/src/support/rand.c @@ -41,18 +41,18 @@ * of the values to avoid that, and read/write in atomic, 8B chunks. */ #undef M_W -#define M_W(p) ((uint32_t *)&(p))[0] +#define M_W(r) r.x.w #undef M_Z -#define M_Z(p) ((uint32_t *)&(p))[1] +#define M_Z(r) r.x.z /* * __wt_random_init -- * Initialize return of a 32-bit pseudo-random number. */ void -__wt_random_init(uint64_t volatile * rnd_state) +__wt_random_init(WT_RAND_STATE volatile * rnd_state) { - uint64_t rnd; + WT_RAND_STATE rnd; M_W(rnd) = 521288629; M_Z(rnd) = 362436069; @@ -64,9 +64,9 @@ __wt_random_init(uint64_t volatile * rnd_state) * Return a 32-bit pseudo-random number. */ uint32_t -__wt_random(uint64_t volatile * rnd_state) +__wt_random(WT_RAND_STATE volatile * rnd_state) { - uint64_t rnd; + WT_RAND_STATE rnd; uint32_t w, z; /* diff --git a/src/txn/txn.c b/src/txn/txn.c index 7f8a944acbd..a9566876c8a 100644 --- a/src/txn/txn.c +++ b/src/txn/txn.c @@ -182,10 +182,6 @@ __wt_txn_get_snapshot(WT_SESSION_IMPL *session) WT_ASSERT(session, prev_oldest_id == txn_global->oldest_id); txn_state->snap_min = snap_min; - /* Update the last running ID if we have a much newer value. */ - if (snap_min > txn_global->last_running + 100) - txn_global->last_running = snap_min; - WT_ASSERT(session, txn_global->scan_count > 0); (void)WT_ATOMIC_SUB4(txn_global->scan_count, 1); @@ -287,30 +283,38 @@ __wt_txn_update_oldest(WT_SESSION_IMPL *session, int force) oldest_id = id; /* Update the last running ID. */ - if (WT_TXNID_LT(txn_global->last_running, snap_min)) { - txn_global->last_running = snap_min; - last_running_moved = 1; - } else - last_running_moved = 0; + last_running_moved = WT_TXNID_LT(txn_global->last_running, snap_min); /* Update the oldest ID. */ - if (WT_TXNID_LT(prev_oldest_id, oldest_id) && + if ((WT_TXNID_LT(prev_oldest_id, oldest_id) || last_running_moved) && WT_ATOMIC_CAS4(txn_global->scan_count, 1, -1)) { WT_ORDERED_READ(session_cnt, conn->session_cnt); for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) { if ((id = s->id) != WT_TXN_NONE && - WT_TXNID_LT(id, oldest_id)) + WT_TXNID_LT(id, snap_min)) oldest_id = id; if ((id = s->snap_min) != WT_TXN_NONE && WT_TXNID_LT(id, oldest_id)) oldest_id = id; } - /* Make sure the ID doesn't move past any named snapshots. */ - WT_ASSERT(session, - (id = txn_global->nsnap_oldest_id) == WT_TXN_NONE || - !WT_TXNID_LT(id, oldest_id)); + if (WT_TXNID_LT(snap_min, oldest_id)) + oldest_id = snap_min; +#ifdef HAVE_DIAGNOSTIC + /* + * Make sure the ID doesn't move past any named snapshots. + * + * Don't include the read/assignment in the assert statement. + * Coverity complains if there are assignments only done in + * diagnostic builds, and when the read is from a volatile. + */ + id = txn_global->nsnap_oldest_id; + WT_ASSERT(session, + id == WT_TXN_NONE || !WT_TXNID_LT(id, oldest_id)); +#endif + if (WT_TXNID_LT(txn_global->last_running, snap_min)) + txn_global->last_running = snap_min; if (WT_TXNID_LT(txn_global->oldest_id, oldest_id)) txn_global->oldest_id = oldest_id; txn_global->scan_count = 0; @@ -412,6 +416,9 @@ __wt_txn_release(WT_SESSION_IMPL *session) txn_global->checkpoint_id = 0; txn_global->checkpoint_pinned = WT_TXN_NONE; } else if (F_ISSET(txn, WT_TXN_HAS_ID)) { + WT_ASSERT(session, + !WT_TXNID_LT(txn->id, txn_global->last_running)); + WT_ASSERT(session, txn_state->id != WT_TXN_NONE && txn->id != WT_TXN_NONE); WT_PUBLISH(txn_state->id, WT_TXN_NONE); @@ -462,7 +469,7 @@ __wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[]) txn = &session->txn; conn = S2C(session); - WT_ASSERT(session, !F_ISSET(txn, WT_TXN_ERROR)); + WT_ASSERT(session, !F_ISSET(txn, WT_TXN_ERROR) || txn->mod_count == 0); if (!F_ISSET(txn, WT_TXN_RUNNING)) WT_RET_MSG(session, EINVAL, "No transaction is active"); @@ -586,6 +593,7 @@ __wt_txn_rollback(WT_SESSION_IMPL *session, const char *cfg[]) switch (op->type) { case WT_TXN_OP_BASIC: case WT_TXN_OP_INMEM: + WT_ASSERT(session, op->u.upd->txnid == txn->id); op->u.upd->txnid = WT_TXN_ABORTED; break; case WT_TXN_OP_REF: diff --git a/test/checkpoint/workers.c b/test/checkpoint/workers.c index 664acd6183d..5cd2ef4e97b 100644 --- a/test/checkpoint/workers.c +++ b/test/checkpoint/workers.c @@ -172,7 +172,7 @@ real_worker(void) { WT_CURSOR **cursors; WT_SESSION *session; - uint64_t rnd; + WT_RAND_STATE rnd; u_int i, keyno; int j, ret, t_ret; diff --git a/test/fops/fops.c b/test/fops/fops.c index e86795c4fc0..0a83e8511f4 100644 --- a/test/fops/fops.c +++ b/test/fops/fops.c @@ -98,7 +98,7 @@ fop(void *arg) { STATS *s; uintptr_t id; - uint64_t rnd; + WT_RAND_STATE rnd; u_int i; id = (uintptr_t)arg; diff --git a/test/format/format.h b/test/format/format.h index 6cbfd882616..4ec2734aee9 100644 --- a/test/format/format.h +++ b/test/format/format.h @@ -151,7 +151,7 @@ typedef struct { pthread_rwlock_t backup_lock; /* Hot backup running */ - uint64_t rnd; /* Global RNG state */ + WT_RAND_STATE rnd; /* Global RNG state */ /* * We have a list of records that are appended, but not yet "resolved", @@ -269,7 +269,7 @@ typedef struct { extern GLOBAL g; typedef struct { - uint64_t rnd; /* thread RNG state */ + WT_RAND_STATE rnd; /* thread RNG state */ uint64_t search; /* operations */ uint64_t insert; @@ -312,15 +312,15 @@ void config_setup(void); void config_single(const char *, int); void fclose_and_clear(FILE **); void key_gen(uint8_t *, size_t *, uint64_t); -void key_gen_insert(uint64_t *, uint8_t *, size_t *, uint64_t); +void key_gen_insert(WT_RAND_STATE *, uint8_t *, size_t *, uint64_t); void key_gen_setup(uint8_t **); void key_len_setup(void); void *lrt(void *); void path_setup(const char *); -uint32_t rng(uint64_t *); +uint32_t rng(WT_RAND_STATE *); void track(const char *, uint64_t, TINFO *); -void val_gen(uint64_t *, uint8_t *, size_t *, uint64_t); -void val_gen_setup(uint64_t *, uint8_t **); +void val_gen(WT_RAND_STATE *, uint8_t *, size_t *, uint64_t); +void val_gen_setup(WT_RAND_STATE *, uint8_t **); void wts_close(void); void wts_create(void); void wts_dump(const char *, int); @@ -343,7 +343,7 @@ __attribute__((__noreturn__)) * Return a random value between a min/max pair. */ static inline uint32_t -mmrand(uint64_t *rnd, u_int min, u_int max) +mmrand(WT_RAND_STATE *rnd, u_int min, u_int max) { return (rng(rnd) % (((max) + 1) - (min)) + (min)); } diff --git a/test/format/ops.c b/test/format/ops.c index b9606ad8f36..7d3b22175ca 100644 --- a/test/format/ops.c +++ b/test/format/ops.c @@ -202,7 +202,7 @@ wts_ops(int lastrun) * Return the current session configuration. */ static const char * -ops_session_config(uint64_t *rnd) +ops_session_config(WT_RAND_STATE *rnd) { u_int v; diff --git a/test/format/util.c b/test/format/util.c index 8d077f6caa7..9d28b7a81bc 100644 --- a/test/format/util.c +++ b/test/format/util.c @@ -33,7 +33,7 @@ #endif static inline uint32_t -kv_len(uint64_t *rnd, uint64_t keyno, uint32_t min, uint32_t max) +kv_len(WT_RAND_STATE *rnd, uint64_t keyno, uint32_t min, uint32_t max) { /* * Focus on relatively small key/value items, admitting the possibility @@ -116,7 +116,7 @@ key_gen(uint8_t *key, size_t *sizep, uint64_t keyno) } void -key_gen_insert(uint64_t *rnd, uint8_t *key, size_t *sizep, uint64_t keyno) +key_gen_insert(WT_RAND_STATE *rnd, uint8_t *key, size_t *sizep, uint64_t keyno) { key_gen_common(key, sizep, keyno, (int)mmrand(rnd, 1, 15)); } @@ -124,7 +124,7 @@ key_gen_insert(uint64_t *rnd, uint8_t *key, size_t *sizep, uint64_t keyno) static uint32_t val_dup_data_len; /* Length of duplicate data items */ void -val_gen_setup(uint64_t *rnd, uint8_t **valp) +val_gen_setup(WT_RAND_STATE *rnd, uint8_t **valp) { uint8_t *val; size_t i, len; @@ -151,7 +151,7 @@ val_gen_setup(uint64_t *rnd, uint8_t **valp) } void -val_gen(uint64_t *rnd, uint8_t *val, size_t *sizep, uint64_t keyno) +val_gen(WT_RAND_STATE *rnd, uint8_t *val, size_t *sizep, uint64_t keyno) { /* * Fixed-length records: take the low N bits from the last digit of @@ -361,7 +361,7 @@ path_setup(const char *home) * Return a random number. */ uint32_t -rng(uint64_t *rnd) +rng(WT_RAND_STATE *rnd) { char buf[64]; uint32_t r; diff --git a/test/suite/test_txn12.py b/test/suite/test_txn12.py new file mode 100644 index 00000000000..0901811535e --- /dev/null +++ b/test/suite/test_txn12.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2015 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import wiredtiger, wttest +from suite_subprocess import suite_subprocess +from wiredtiger import stat +from wtscenario import multiply_scenarios, number_scenarios + +# test_txn12.py +# test of commit following failed op in a read only transaction. +class test_txn12(wttest.WiredTigerTestCase, suite_subprocess): + name = 'test_txn12' + uri = 'table:' + name + create_params = 'key_format=i,value_format=i' + + # Test that read-only transactions can commit following a failure. + def test_txn12(self): + + # Setup the session and table. + session = self.conn.open_session(None) + session.create(self.uri, self.create_params) + session.begin_transaction("isolation=snapshot") + + # Create a read only transaction. + c = session.open_cursor(self.uri, None) + c.next() + msg = '/next_random.*boolean/' + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda:session.open_cursor(self.uri, None, "next_random=bar"), msg) + # This commit should succeed as we have done no writes. + session.commit_transaction() + + # Create a read/write transaction. + session.begin_transaction("isolation=snapshot") + c = session.open_cursor(self.uri, None) + c[123] = 123 + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda:session.open_cursor(self.uri, None, "next_random=bar"), msg) + # This commit should fail as we have written something + self.assertRaisesWithMessage(wiredtiger.WiredTigerError, + lambda:session.commit_transaction(), '/requires rollback/') + +if __name__ == '__main__': + wttest.run() + diff --git a/test/thread/rw.c b/test/thread/rw.c index 402789dd2a5..c9e2e78ec35 100644 --- a/test/thread/rw.c +++ b/test/thread/rw.c @@ -36,7 +36,7 @@ typedef struct { char *name; /* object name */ u_int nops; /* Thread op count */ - uint64_t rnd; /* RNG */ + WT_RAND_STATE rnd; /* RNG */ int remove; /* cursor.remove */ int update; /* cursor.update */ |