summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bench/wtperf/Makefile.am2
-rw-r--r--bench/wtperf/config.c49
-rw-r--r--bench/wtperf/runners/mongodb-oplog.wtperf11
-rw-r--r--bench/wtperf/runners/truncate-btree-populate.wtperf7
-rw-r--r--bench/wtperf/runners/truncate-btree-workload.wtperf9
-rw-r--r--bench/wtperf/track.c5
-rw-r--r--bench/wtperf/wtperf.c133
-rw-r--r--bench/wtperf/wtperf.h58
-rw-r--r--bench/wtperf/wtperf_opt.i3
-rw-r--r--bench/wtperf/wtperf_truncate.c216
-rw-r--r--dist/s_define.list2
-rw-r--r--dist/s_string.ok1
-rw-r--r--examples/c/ex_cursor.c2
-rw-r--r--src/btree/bt_walk.c23
-rw-r--r--src/btree/row_modify.c12
-rw-r--r--src/docs/wtperf.dox4
-rw-r--r--src/evict/evict_lru.c13
-rw-r--r--src/include/extern.h4
-rw-r--r--src/include/misc.h8
-rw-r--r--src/include/mutex.h20
-rw-r--r--src/include/serial.i109
-rw-r--r--src/include/session.h2
-rw-r--r--src/include/txn.i3
-rw-r--r--src/include/wt_internal.h2
-rw-r--r--src/os_posix/os_mtx_rw.c197
-rw-r--r--src/reconcile/rec_write.c73
-rw-r--r--src/session/session_api.c2
-rw-r--r--src/support/rand.c12
-rw-r--r--src/txn/txn.c40
-rw-r--r--test/checkpoint/workers.c2
-rw-r--r--test/fops/fops.c2
-rw-r--r--test/format/format.h14
-rw-r--r--test/format/ops.c2
-rw-r--r--test/format/util.c10
-rw-r--r--test/suite/test_txn12.py70
-rw-r--r--test/thread/rw.c2
36 files changed, 889 insertions, 235 deletions
diff --git a/bench/wtperf/Makefile.am b/bench/wtperf/Makefile.am
index 0630a27f640..15f151d84b2 100644
--- a/bench/wtperf/Makefile.am
+++ b/bench/wtperf/Makefile.am
@@ -5,7 +5,7 @@ LDADD = $(top_builddir)/libwiredtiger.la -lm
noinst_PROGRAMS = wtperf
wtperf_LDFLAGS = -static
wtperf_SOURCES =\
- config.c misc.c track.c wtperf.c wtperf.h wtperf_opt.i
+ config.c misc.c track.c wtperf.c wtperf_truncate.c wtperf.h wtperf_opt.i
TESTS = smoke.sh
AM_TESTS_ENVIRONMENT = rm -rf WT_TEST ; mkdir WT_TEST ;
diff --git a/bench/wtperf/config.c b/bench/wtperf/config.c
index 47e052d6055..721b41432cb 100644
--- a/bench/wtperf/config.c
+++ b/bench/wtperf/config.c
@@ -95,6 +95,8 @@ config_assign(CONFIG *dest, const CONFIG *src)
*pstr = newstr;
}
}
+
+ STAILQ_INIT(&dest->stone_head);
return (0);
}
@@ -122,6 +124,7 @@ config_free(CONFIG *cfg)
free(cfg->uris);
}
+ cleanup_truncate_config(cfg);
free(cfg->ckptthreads);
free(cfg->popthreads);
free(cfg->base_uri);
@@ -243,6 +246,28 @@ config_threads(CONFIG *cfg, const char *config, size_t len)
goto err;
continue;
}
+ if (STRING_MATCH("truncate", k.str, k.len)) {
+ if ((workp->truncate = v.val) != 1)
+ goto err;
+ /* There can only be one Truncate thread. */
+ if (cfg->has_truncate != 0) {
+ goto err;
+ }
+ cfg->has_truncate = 1;
+ continue;
+ }
+ if (STRING_MATCH("truncate_pct", k.str, k.len)) {
+ if (v.val <= 0)
+ goto err;
+ workp->truncate_pct = (uint64_t)v.val;
+ continue;
+ }
+ if (STRING_MATCH("truncate_count", k.str, k.len)) {
+ if (v.val <= 0)
+ goto err;
+ workp->truncate_count = (uint64_t)v.val;
+ continue;
+ }
goto err;
}
if (ret == WT_NOTFOUND)
@@ -253,9 +278,21 @@ config_threads(CONFIG *cfg, const char *config, size_t len)
scan = NULL;
if (ret != 0)
goto err;
-
- if (workp->insert == 0 &&
- workp->read == 0 && workp->update == 0)
+ if (workp->insert == 0 && workp->read == 0 &&
+ workp->update == 0 && workp->truncate == 0)
+ goto err;
+ /* Why run with truncate if we don't want any truncation. */
+ if (workp->truncate != 0 &&
+ workp->truncate_pct == 0 && workp->truncate_count == 0)
+ goto err;
+ if (workp->truncate != 0 &&
+ (workp->truncate_pct < 1 || workp->truncate_pct > 99))
+ goto err;
+ /* Truncate should have its own exclusive thread. */
+ if (workp->truncate != 0 && workp->threads > 1)
+ goto err;
+ if (workp->truncate != 0 &&
+ (workp->insert > 0 || workp->read > 0 || workp->update > 0))
goto err;
cfg->workers_cnt += (u_int)workp->threads;
}
@@ -640,9 +677,11 @@ config_print(CONFIG *cfg)
for (i = 0, workp = cfg->workload;
i < cfg->workload_cnt; ++i, ++workp)
printf("\t\t%" PRId64 " threads (inserts=%" PRId64
- ", reads=%" PRId64 ", updates=%" PRId64 ")\n",
+ ", reads=%" PRId64 ", updates=%" PRId64
+ ", truncates=% " PRId64 ")\n",
workp->threads,
- workp->insert, workp->read, workp->update);
+ workp->insert, workp->read,
+ workp->update, workp->truncate);
}
printf("\t" "Checkpoint threads, interval: %" PRIu32 ", %" PRIu32 "\n",
diff --git a/bench/wtperf/runners/mongodb-oplog.wtperf b/bench/wtperf/runners/mongodb-oplog.wtperf
new file mode 100644
index 00000000000..34235f04518
--- /dev/null
+++ b/bench/wtperf/runners/mongodb-oplog.wtperf
@@ -0,0 +1,11 @@
+# wtperf options file to simulate populating a MongoDB oplog
+conn_config="cache_size=2GB,checkpoint=(wait=60)"
+table_config="type=file"
+# Start with a small set of inserts in the populate phase.
+icount=50000
+report_interval=5
+run_time=500
+populate_threads=1
+# Setup three threads to insert into the oplog
+# Setup one thread to be doing truncates from the oplog
+threads=((count=3,inserts=1,throttle=2000),(count=1,truncate=1,truncate_pct=10,truncate_count=50000))
diff --git a/bench/wtperf/runners/truncate-btree-populate.wtperf b/bench/wtperf/runners/truncate-btree-populate.wtperf
new file mode 100644
index 00000000000..4e4ae7500f0
--- /dev/null
+++ b/bench/wtperf/runners/truncate-btree-populate.wtperf
@@ -0,0 +1,7 @@
+# Truncate workload population phase
+conn_config="cache_size=2GB,checkpoint=(wait=60)"
+table_config="type=file"
+# Start with a small set of inserts in the populate phase.
+icount=50000
+report_interval=5
+populate_threads=1
diff --git a/bench/wtperf/runners/truncate-btree-workload.wtperf b/bench/wtperf/runners/truncate-btree-workload.wtperf
new file mode 100644
index 00000000000..55e01dcd0dc
--- /dev/null
+++ b/bench/wtperf/runners/truncate-btree-workload.wtperf
@@ -0,0 +1,9 @@
+# truncate workload. work phase
+conn_config="cache_size=2GB,checkpoint=(wait=60)"
+table_config="type=file"
+create=false
+report_interval=5
+run_time=500
+# Setup three threads to insert into the oplog
+# Setup one thread to be doing truncates from the oplog
+threads=((count=3,inserts=1,throttle=2000),(count=1,truncate=1,truncate_pct=10,truncate_count=50000))
diff --git a/bench/wtperf/track.c b/bench/wtperf/track.c
index 8ea4201246a..75f5a012a94 100644
--- a/bench/wtperf/track.c
+++ b/bench/wtperf/track.c
@@ -98,6 +98,11 @@ sum_read_ops(CONFIG *cfg)
return (sum_ops(cfg, offsetof(CONFIG_THREAD, read)));
}
uint64_t
+sum_truncate_ops(CONFIG *cfg)
+{
+ return (sum_ops(cfg, offsetof(CONFIG_THREAD, truncate)));
+}
+uint64_t
sum_update_ops(CONFIG *cfg)
{
return (sum_ops(cfg, offsetof(CONFIG_THREAD, update)));
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c
index 543a125d4d4..f079d6272d7 100644
--- a/bench/wtperf/wtperf.c
+++ b/bench/wtperf/wtperf.c
@@ -50,6 +50,7 @@ static const CONFIG default_cfg = {
0, /* checkpoint operations */
0, /* insert operations */
0, /* read operations */
+ 0, /* truncate operations */
0, /* update operations */
0, /* insert key */
0, /* checkpoint in progress */
@@ -57,6 +58,8 @@ static const CONFIG default_cfg = {
0, /* notify threads to stop */
0, /* in warmup phase */
0, /* total seconds running */
+ 0, /* has truncate */
+ {NULL, NULL}, /* the truncate queue */
#define OPT_DEFINE_DEFAULT
#include "wtperf_opt.i"
@@ -100,15 +103,6 @@ get_next_incr(CONFIG *cfg)
return (WT_ATOMIC_ADD8(cfg->insert_key, 1));
}
-static inline void
-generate_key(CONFIG *cfg, char *key_buf, uint64_t keyno)
-{
- /*
- * Don't change to snprintf, sprintf is faster in some tests.
- */
- sprintf(key_buf, "%0*" PRIu64, cfg->key_sz - 1, keyno);
-}
-
static void
randomize_value(CONFIG_THREAD *thread, char *value_buf)
{
@@ -258,6 +252,8 @@ op_name(uint8_t *op)
return ("insert_rmw");
case WORKER_READ:
return ("read");
+ case WORKER_TRUNCATE:
+ return ("truncate");
case WORKER_UPDATE:
return ("update");
default:
@@ -389,7 +385,7 @@ worker(void *arg)
size_t i;
uint64_t next_val, usecs;
uint8_t *op, *op_end;
- int measure_latency, ret;
+ int measure_latency, ret, truncated;
char *value_buf, *key_buf, *value;
char buf[512];
@@ -444,6 +440,11 @@ worker(void *arg)
goto err;
}
+ /* Setup for truncate */
+ if (thread->workload->truncate != 0)
+ if ((ret = setup_truncate(cfg, thread, session)) != 0)
+ goto err;
+
key_buf = thread->key_buf;
value_buf = thread->value_buf;
@@ -486,6 +487,10 @@ worker(void *arg)
if (wtperf_value_range(cfg) < next_val)
continue;
break;
+ case WORKER_TRUNCATE:
+ /* Required but not used. */
+ next_val = wtperf_rand(thread);
+ break;
default:
goto err; /* can't happen */
}
@@ -502,10 +507,9 @@ worker(void *arg)
* is 0, to avoid first time latency spikes.
*/
measure_latency =
- cfg->sample_interval != 0 && trk->ops != 0 && (
- trk->ops % cfg->sample_rate == 0);
- if (measure_latency &&
- (ret = __wt_epoch(NULL, &start)) != 0) {
+ cfg->sample_interval != 0 && trk != NULL &&
+ trk->ops != 0 && (trk->ops % cfg->sample_rate == 0);
+ if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) {
lprintf(cfg, ret, 0, "Get time call failed");
goto err;
}
@@ -548,6 +552,18 @@ worker(void *arg)
if ((ret = cursor->insert(cursor)) == 0)
break;
goto op_err;
+ case WORKER_TRUNCATE:
+ if ((ret = run_truncate(
+ cfg, thread, cursor, session, &truncated)) == 0) {
+ if (truncated)
+ trk = &thread->truncate;
+ else
+ trk = &thread->truncate_sleep;
+ /* Pause between truncate attempts */
+ (void)usleep(1000);
+ break;
+ }
+ goto op_err;
case WORKER_UPDATE:
if ((ret = cursor->search(cursor)) == 0) {
if ((ret = cursor->get_value(
@@ -711,16 +727,33 @@ run_mix_schedule(CONFIG *cfg, WORKLOAD *workp)
{
int64_t pct;
- /* Confirm reads, inserts and updates cannot all be zero. */
- if (workp->insert == 0 && workp->read == 0 && workp->update == 0) {
+ /* Confirm reads, inserts, truncates and updates cannot all be zero. */
+ if (workp->insert == 0 && workp->read == 0 &&
+ workp->truncate == 0 && workp->update == 0) {
lprintf(cfg, EINVAL, 0, "no operations scheduled");
return (EINVAL);
}
/*
+ * Handle truncate first - it's a special case that can't be used in
+ * a mixed workload.
+ */
+ if (workp->truncate != 0) {
+ if (workp->insert != 0 ||
+ workp->read != 0 || workp->update != 0) {
+ lprintf(cfg, EINVAL, 0,
+ "Can't configure truncate in a mixed workload");
+ return (EINVAL);
+ }
+ memset(workp->ops, WORKER_TRUNCATE, sizeof(workp->ops));
+ return (0);
+ }
+
+ /*
* Check for a simple case where the thread is only doing insert or
- * update operations (because the default operation for a job-mix is
- * read, the subsequent code works fine if only reads are specified).
+ * update operations (because the default operation for a
+ * job-mix is read, the subsequent code works fine if only reads are
+ * specified).
*/
if (workp->insert != 0 && workp->read == 0 && workp->update == 0) {
memset(workp->ops,
@@ -840,10 +873,9 @@ populate_thread(void *arg)
cursor = cursors[op % cfg->table_count];
generate_key(cfg, key_buf, op);
measure_latency =
- cfg->sample_interval != 0 && trk->ops != 0 && (
- trk->ops % cfg->sample_rate == 0);
- if (measure_latency &&
- (ret = __wt_epoch(NULL, &start)) != 0) {
+ cfg->sample_interval != 0 &&
+ trk->ops != 0 && (trk->ops % cfg->sample_rate == 0);
+ if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) {
lprintf(cfg, ret, 0, "Get time call failed");
goto err;
}
@@ -961,10 +993,9 @@ populate_async(void *arg)
* the time to process by workers.
*/
measure_latency =
- cfg->sample_interval != 0 && trk->ops != 0 && (
- trk->ops % cfg->sample_rate == 0);
- if (measure_latency &&
- (ret = __wt_epoch(NULL, &start)) != 0) {
+ cfg->sample_interval != 0 &&
+ trk->ops != 0 && (trk->ops % cfg->sample_rate == 0);
+ if (measure_latency && (ret = __wt_epoch(NULL, &start)) != 0) {
lprintf(cfg, ret, 0, "Get time call failed");
goto err;
}
@@ -1006,8 +1037,7 @@ populate_async(void *arg)
goto err;
if (measure_latency) {
if ((ret = __wt_epoch(NULL, &stop)) != 0) {
- lprintf(cfg, ret, 0,
- "Get time call failed");
+ lprintf(cfg, ret, 0, "Get time call failed");
goto err;
}
++trk->latency_ops;
@@ -1433,16 +1463,19 @@ execute_workload(CONFIG *cfg)
{
CONFIG_THREAD *threads;
WORKLOAD *workp;
- uint64_t last_ckpts, last_inserts, last_reads, last_updates;
+ uint64_t last_ckpts, last_inserts, last_reads, last_truncates;
+ uint64_t last_updates;
uint32_t interval, run_ops, run_time;
u_int i;
int ret, t_ret;
void *(*pfunc)(void *);
cfg->insert_key = 0;
- cfg->insert_ops = cfg->read_ops = cfg->update_ops = 0;
+ cfg->insert_ops = cfg->read_ops = cfg->truncate_ops = 0;
+ cfg->update_ops = 0;
- last_ckpts = last_inserts = last_reads = last_updates = 0;
+ last_ckpts = last_inserts = last_reads = last_truncates = 0;
+ last_updates = 0;
ret = 0;
if (cfg->warmup != 0)
@@ -1467,9 +1500,9 @@ execute_workload(CONFIG *cfg)
workp = cfg->workload; i < cfg->workload_cnt; ++i, ++workp) {
lprintf(cfg, 0, 1,
"Starting workload #%d: %" PRId64 " threads, inserts=%"
- PRId64 ", reads=%" PRId64 ", updates=%" PRId64,
- i + 1,
- workp->threads, workp->insert, workp->read, workp->update);
+ PRId64 ", reads=%" PRId64 ", updates=%" PRId64
+ ", truncate=%" PRId64, i + 1, workp->threads, workp->insert,
+ workp->read, workp->update, workp->truncate);
/* Figure out the workload's schedule. */
if ((ret = run_mix_schedule(cfg, workp)) != 0)
@@ -1509,6 +1542,7 @@ execute_workload(CONFIG *cfg)
cfg->insert_ops = sum_insert_ops(cfg);
cfg->read_ops = sum_read_ops(cfg);
cfg->update_ops = sum_update_ops(cfg);
+ cfg->truncate_ops = sum_truncate_ops(cfg);
/* If we're checking total operations, see if we're done. */
if (run_ops != 0 && run_ops <=
@@ -1523,16 +1557,18 @@ execute_workload(CONFIG *cfg)
lprintf(cfg, 0, 1,
"%" PRIu64 " reads, %" PRIu64 " inserts, %" PRIu64
- " updates, %" PRIu64 " checkpoints in %" PRIu32
- " secs (%" PRIu32 " total secs)",
+ " updates, %" PRIu64 " truncates, %" PRIu64
+ " checkpoints in %" PRIu32 " secs (%" PRIu32 " total secs)",
cfg->read_ops - last_reads,
cfg->insert_ops - last_inserts,
cfg->update_ops - last_updates,
+ cfg->truncate_ops - last_truncates,
cfg->ckpt_ops - last_ckpts,
cfg->report_interval, cfg->totalsec);
last_reads = cfg->read_ops;
last_inserts = cfg->insert_ops;
last_updates = cfg->update_ops;
+ last_truncates = cfg->truncate_ops;
last_ckpts = cfg->ckpt_ops;
}
@@ -1915,6 +1951,7 @@ start_run(CONFIG *cfg)
/* One final summation of the operations we've completed. */
cfg->read_ops = sum_read_ops(cfg);
cfg->insert_ops = sum_insert_ops(cfg);
+ cfg->truncate_ops = sum_truncate_ops(cfg);
cfg->update_ops = sum_update_ops(cfg);
cfg->ckpt_ops = sum_ckpt_ops(cfg);
total_ops = cfg->read_ops + cfg->insert_ops + cfg->update_ops;
@@ -1930,6 +1967,11 @@ start_run(CONFIG *cfg)
cfg->insert_ops, (cfg->insert_ops * 100) / total_ops,
cfg->insert_ops / cfg->run_time);
lprintf(cfg, 0, 1,
+ "Executed %" PRIu64 " truncate operations (%" PRIu64
+ "%%) %" PRIu64 " ops/sec",
+ cfg->truncate_ops, (cfg->truncate_ops * 100) / total_ops,
+ cfg->truncate_ops / cfg->run_time);
+ lprintf(cfg, 0, 1,
"Executed %" PRIu64 " update operations (%" PRIu64
"%%) %" PRIu64 " ops/sec",
cfg->update_ops, (cfg->update_ops * 100) / total_ops,
@@ -2087,8 +2129,13 @@ main(int argc, char *argv[])
* If the user wants compaction, then we also enable async for
* the compact operation, but not for the workloads.
*/
- if (cfg->async_threads > 0)
+ if (cfg->async_threads > 0) {
+ if (cfg->has_truncate > 0) {
+ lprintf(cfg, 1, 0, "Cannot run truncate and async\n");
+ goto err;
+ }
cfg->use_asyncops = 1;
+ }
if (cfg->compact && cfg->async_threads == 0)
cfg->async_threads = 2;
if (cfg->async_threads > 0) {
@@ -2110,6 +2157,18 @@ main(int argc, char *argv[])
if ((ret = config_compress(cfg)) != 0)
goto err;
+ /* You can't have truncate on a random collection. */
+ if (cfg->has_truncate && cfg->random_range) {
+ lprintf(cfg, 1, 0, "Cannot run truncate and random_range\n");
+ goto err;
+ }
+
+ /* We can't run truncate with more than one table. */
+ if (cfg->has_truncate && cfg->table_count > 1) {
+ lprintf(cfg, 1, 0, "Cannot truncate more than 1 table\n");
+ goto err;
+ }
+
/* Build the URI from the table name. */
req_len = strlen("table:") +
strlen(HELIUM_NAME) + strlen(cfg->table_name) + 2;
diff --git a/bench/wtperf/wtperf.h b/bench/wtperf/wtperf.h
index 874cdc499b1..58dc65388ae 100644
--- a/bench/wtperf/wtperf.h
+++ b/bench/wtperf/wtperf.h
@@ -26,6 +26,9 @@
* OTHER DEALINGS IN THE SOFTWARE.
*/
+#ifndef HAVE_WTPERF_H
+#define HAVE_WTPERF_H
+
#ifndef _WIN32
#include <sys/time.h>
#endif
@@ -90,14 +93,39 @@ typedef struct {
int64_t throttle; /* Maximum operations/second */
/* Number of operations per transaction. Zero for autocommit */
int64_t ops_per_txn;
+ int64_t truncate; /* Truncate ratio */
+ uint64_t truncate_pct; /* Truncate Percent */
+ uint64_t truncate_count; /* Truncate Count */
#define WORKER_INSERT 1 /* Insert */
#define WORKER_INSERT_RMW 2 /* Insert with read-modify-write */
#define WORKER_READ 3 /* Read */
-#define WORKER_UPDATE 4 /* Update */
+#define WORKER_TRUNCATE 4 /* Truncate */
+#define WORKER_UPDATE 5 /* Update */
uint8_t ops[100]; /* Operation schedule */
} WORKLOAD;
+/* Steering items for the truncate workload */
+typedef struct __truncate_struct TRUNCATE_CONFIG;
+struct __truncate_struct {
+ uint64_t stone_gap;
+ uint64_t needed_stones;
+ uint64_t final_stone_gap;
+ uint64_t expected_total;
+ uint64_t total_inserts;
+ uint64_t last_total_inserts;
+ uint64_t num_stones;
+ uint64_t last_key;
+};
+
+/* Queue entry for use with the Truncate Logic */
+struct __truncate_queue_entry {
+ char *key; /* Truncation point */
+ uint64_t diff; /* Number of items to be truncated*/
+ STAILQ_ENTRY(__truncate_queue_entry) q;
+};
+typedef struct __truncate_queue_entry TRUNCATE_QUEUE_ENTRY;
+
#define LOG_PARTIAL_CONFIG ",log=(enabled=false)"
/*
* NOTE: If you add any fields to this structure here, you must also add
@@ -135,6 +163,7 @@ struct __config { /* Configuration structure */
uint64_t ckpt_ops; /* checkpoint operations */
uint64_t insert_ops; /* insert operations */
uint64_t read_ops; /* read operations */
+ uint64_t truncate_ops; /* truncate operations */
uint64_t update_ops; /* update operations */
uint64_t insert_key; /* insert key */
@@ -146,6 +175,11 @@ struct __config { /* Configuration structure */
volatile uint32_t totalsec; /* total seconds running */
+ u_int has_truncate; /* if there is a truncate workload */
+
+ /* Queue head for use with the Truncate Logic */
+ STAILQ_HEAD(__truncate_qh, __truncate_queue_entry) stone_head;
+
/* Fields changeable on command line are listed in wtperf_opt.i */
#define OPT_DECLARE_STRUCT
#include "wtperf_opt.i"
@@ -211,7 +245,7 @@ typedef struct {
struct __config_thread { /* Per-thread structure */
CONFIG *cfg; /* Enclosing configuration */
- uint64_t rnd; /* Random number generation state */
+ WT_RAND_STATE rnd; /* Random number generation state */
pthread_t handle; /* Handle */
@@ -223,8 +257,13 @@ struct __config_thread { /* Per-thread structure */
TRACK insert; /* Insert operations */
TRACK read; /* Read operations */
TRACK update; /* Update operations */
+ TRACK truncate; /* Truncate operations */
+ TRACK truncate_sleep; /* Truncate sleep operations */
+ TRUNCATE_CONFIG trunc_cfg; /* Truncate configuration */
+
};
+void cleanup_truncate_config(CONFIG *);
int config_assign(CONFIG *, const CONFIG *);
int config_compress(CONFIG *);
void config_free(CONFIG *);
@@ -238,11 +277,15 @@ void latency_read(CONFIG *, uint32_t *, uint32_t *, uint32_t *);
void latency_update(CONFIG *, uint32_t *, uint32_t *, uint32_t *);
void latency_print(CONFIG *);
int enomem(const CONFIG *);
+int run_truncate(
+ CONFIG *, CONFIG_THREAD *, WT_CURSOR *, WT_SESSION *, int *);
int setup_log_file(CONFIG *);
+int setup_truncate(CONFIG *, CONFIG_THREAD *, WT_SESSION *);
uint64_t sum_ckpt_ops(CONFIG *);
uint64_t sum_insert_ops(CONFIG *);
uint64_t sum_pop_ops(CONFIG *);
uint64_t sum_read_ops(CONFIG *);
+uint64_t sum_truncate_ops(CONFIG *);
uint64_t sum_update_ops(CONFIG *);
void usage(void);
@@ -251,3 +294,14 @@ void lprintf(const CONFIG *, int err, uint32_t, const char *, ...)
__attribute__((format (printf, 4, 5)))
#endif
;
+
+static inline void
+generate_key(CONFIG *cfg, char *key_buf, uint64_t keyno)
+{
+ /*
+ * Don't change to snprintf, sprintf is faster in some tests.
+ */
+ sprintf(key_buf, "%0*" PRIu64, cfg->key_sz - 1, keyno);
+}
+
+#endif
diff --git a/bench/wtperf/wtperf_opt.i b/bench/wtperf/wtperf_opt.i
index 6cb39ac3cc4..7e29aa0f3c2 100644
--- a/bench/wtperf/wtperf_opt.i
+++ b/bench/wtperf/wtperf_opt.i
@@ -167,7 +167,8 @@ DEF_OPT_AS_STRING(threads, "", "workload configuration: each 'count' "
"'threads=((count=2,reads=1)(count=8,reads=1,inserts=2,updates=1))' "
"which would create 2 threads doing nothing but reads and 8 threads "
"each doing 50% inserts and 25% reads and updates. Allowed configuration "
- "values are 'count', 'throttle', 'reads', 'inserts', 'updates'. There are "
+ "values are 'count', 'throttle', 'reads', 'inserts', 'updates', 'truncate',"
+ " 'truncate_pct' and 'truncate_count'. There are "
"also behavior modifiers, supported modifiers are 'ops_per_txn'")
DEF_OPT_AS_CONFIG_STRING(transaction_config, "",
"transaction configuration string, relevant when populate_opts_per_txn "
diff --git a/bench/wtperf/wtperf_truncate.c b/bench/wtperf/wtperf_truncate.c
new file mode 100644
index 00000000000..0d5d1045e1e
--- /dev/null
+++ b/bench/wtperf/wtperf_truncate.c
@@ -0,0 +1,216 @@
+/*-
+ * Public Domain 2014-2015 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include "wtperf.h"
+
+static inline uint64_t
+decode_key(char *key_buf)
+{
+ return (strtoull(key_buf, NULL, 10));
+}
+
+int
+setup_truncate(CONFIG *cfg, CONFIG_THREAD *thread, WT_SESSION *session) {
+
+ TRUNCATE_CONFIG *trunc_cfg;
+ TRUNCATE_QUEUE_ENTRY *truncate_item;
+ WORKLOAD *workload;
+ WT_CURSOR *cursor;
+ char *key, *truncate_key;
+ int ret;
+ uint64_t end_point, final_stone_gap, i, start_point;
+
+ end_point = final_stone_gap = start_point = 0;
+ trunc_cfg = &thread->trunc_cfg;
+ workload = thread->workload;
+
+ /* We are limited to only one table when running truncate. */
+ if ((ret = session->open_cursor(
+ session, cfg->uris[0], NULL, NULL, &cursor)) != 0)
+ goto err;
+
+ /* How many entries between each stone. */
+ trunc_cfg->stone_gap =
+ (workload->truncate_count * workload->truncate_pct) / 100;
+ /* How many stones we need. */
+ trunc_cfg->needed_stones =
+ workload->truncate_count / trunc_cfg->stone_gap;
+
+ final_stone_gap = trunc_cfg->stone_gap;
+
+ /* Reset this value for use again. */
+ trunc_cfg->stone_gap = 0;
+
+ /*
+ * Here we check if there is data in the collection. If there is
+ * data available, then we need to setup some initial truncation
+ * stones.
+ */
+ if ((ret = cursor->next(cursor)) != 0 ||
+ (ret = cursor->get_key(cursor, &key)) != 0) {
+ lprintf(cfg, ret, 0, "truncate setup start: failed");
+ goto err;
+ }
+
+ start_point = decode_key(key);
+ if ((cursor->reset(cursor)) != 0 || (ret = cursor->prev(cursor)) != 0 ||
+ (ret = cursor->get_key(cursor, &key)) != 0) {
+ lprintf(cfg, ret, 0, "truncate setup end: failed");
+ goto err;
+ }
+ end_point = decode_key(key);
+
+ /* Assign stones if there are enough documents. */
+ if (start_point + trunc_cfg->needed_stones > end_point)
+ trunc_cfg->stone_gap = 0;
+ else
+ trunc_cfg->stone_gap =
+ (end_point - start_point) / trunc_cfg->needed_stones;
+
+ /* If we have enough data allocate some stones. */
+ if (trunc_cfg->stone_gap != 0) {
+ trunc_cfg->expected_total = (end_point - start_point);
+ for (i = 1; i <= trunc_cfg->needed_stones; i++) {
+ truncate_key = calloc(cfg->key_sz, 1);
+ if (truncate_key == NULL) {
+ ret = enomem(cfg);
+ goto err;
+ }
+ truncate_item = calloc(sizeof(TRUNCATE_QUEUE_ENTRY), 1);
+ if (truncate_item == NULL) {
+ free(truncate_key);
+ ret = enomem(cfg);
+ goto err;
+ }
+ generate_key(
+ cfg, truncate_key, trunc_cfg->stone_gap * i);
+ truncate_item->key = truncate_key;
+ truncate_item->diff =
+ (trunc_cfg->stone_gap * i) - trunc_cfg->last_key;
+ STAILQ_INSERT_TAIL( &cfg->stone_head, truncate_item, q);
+ trunc_cfg->last_key = trunc_cfg->stone_gap * i;
+ trunc_cfg->num_stones++;
+ }
+ }
+ trunc_cfg->stone_gap = final_stone_gap;
+
+err: if ((ret = cursor->close(cursor)) != 0) {
+ lprintf(cfg, ret, 0, "truncate setup: cursor close failed");
+ }
+ return (ret);
+}
+
+int
+run_truncate(CONFIG *cfg, CONFIG_THREAD *thread,
+ WT_CURSOR *cursor, WT_SESSION *session, int *truncatedp) {
+
+ TRUNCATE_CONFIG *trunc_cfg;
+ TRUNCATE_QUEUE_ENTRY *truncate_item;
+ char *truncate_key;
+ int ret, t_ret;
+
+ ret = 0;
+ trunc_cfg = &thread->trunc_cfg;
+
+ *truncatedp = 0;
+ /* Update the total inserts */
+ trunc_cfg->total_inserts = sum_insert_ops(cfg);
+ trunc_cfg->expected_total +=
+ (trunc_cfg->total_inserts - trunc_cfg->last_total_inserts);
+ trunc_cfg->last_total_inserts = trunc_cfg->total_inserts;
+
+ /* We are done if there isn't enough data to trigger a new milestone. */
+ if (trunc_cfg->expected_total <= trunc_cfg->needed_stones)
+ return (0);
+
+ while (trunc_cfg->num_stones < trunc_cfg->needed_stones) {
+ trunc_cfg->last_key += trunc_cfg->stone_gap;
+ truncate_key = calloc(cfg->key_sz, 1);
+ if (truncate_key == NULL) {
+ lprintf(cfg, ENOMEM, 0,
+ "truncate: couldn't allocate key array");
+ return (ENOMEM);
+ }
+ truncate_item = calloc(sizeof(TRUNCATE_QUEUE_ENTRY), 1);
+ if (truncate_item == NULL) {
+ free(truncate_key);
+ lprintf(cfg, ENOMEM, 0,
+ "truncate: couldn't allocate item");
+ return (ENOMEM);
+ }
+ generate_key(cfg, truncate_key, trunc_cfg->last_key);
+ truncate_item->key = truncate_key;
+ truncate_item->diff = trunc_cfg->stone_gap;
+ STAILQ_INSERT_TAIL(&cfg->stone_head, truncate_item, q);
+ trunc_cfg->num_stones++;
+ }
+
+ /* We are done if there isn't enough data to trigger a truncate. */
+ if (trunc_cfg->num_stones == 0 ||
+ trunc_cfg->expected_total <= thread->workload->truncate_count)
+ return (0);
+
+ truncate_item = STAILQ_FIRST(&cfg->stone_head);
+ trunc_cfg->num_stones--;
+ STAILQ_REMOVE_HEAD(&cfg->stone_head, q);
+ cursor->set_key(cursor,truncate_item->key);
+ if ((ret = cursor->search(cursor)) != 0) {
+ lprintf(cfg, ret, 0, "Truncate search: failed");
+ goto err;
+ }
+
+ if ((ret = session->truncate(session, NULL, NULL, cursor, NULL)) != 0) {
+ lprintf(cfg, ret, 0, "Truncate: failed");
+ goto err;
+ }
+
+
+ *truncatedp = 1;
+ trunc_cfg->expected_total -= truncate_item->diff;
+
+err: free(truncate_item->key);
+ free(truncate_item);
+ t_ret = cursor->reset(cursor);
+ if (t_ret != 0)
+ lprintf(cfg, t_ret, 0, "Cursor reset failed");
+ if (ret == 0 && t_ret != 0)
+ ret = t_ret;
+ return (ret);
+}
+
+void
+cleanup_truncate_config(CONFIG *cfg) {
+ TRUNCATE_QUEUE_ENTRY *truncate_item;
+
+ while (!STAILQ_EMPTY(&cfg->stone_head)) {
+ truncate_item = STAILQ_FIRST(&cfg->stone_head);
+ STAILQ_REMOVE_HEAD(&cfg->stone_head, q);
+ free(truncate_item->key);
+ free(truncate_item);
+ }
+}
diff --git a/dist/s_define.list b/dist/s_define.list
index 623a34447a8..a9ae2a10006 100644
--- a/dist/s_define.list
+++ b/dist/s_define.list
@@ -21,8 +21,8 @@ WT_ATOMIC_ADD2
WT_ATOMIC_CAS1
WT_ATOMIC_CAS2
WT_ATOMIC_FETCH_ADD1
-WT_ATOMIC_FETCH_ADD2
WT_ATOMIC_FETCH_ADD4
+WT_ATOMIC_FETCH_ADD8
WT_ATOMIC_STORE1
WT_ATOMIC_STORE2
WT_ATOMIC_SUB1
diff --git a/dist/s_string.ok b/dist/s_string.ok
index 91407361bcb..0853ce971b8 100644
--- a/dist/s_string.ok
+++ b/dist/s_string.ok
@@ -746,6 +746,7 @@ nop
noraw
notfound
notsup
+notused
nset
nsnap
nul
diff --git a/examples/c/ex_cursor.c b/examples/c/ex_cursor.c
index 112e38f9a2e..35597ca7b8e 100644
--- a/examples/c/ex_cursor.c
+++ b/examples/c/ex_cursor.c
@@ -93,7 +93,7 @@ cursor_search(WT_CURSOR *cursor)
cursor->set_key(cursor, "foo");
- if ((ret = cursor->search(cursor)) != 0)
+ if ((ret = cursor->search(cursor)) == 0)
ret = cursor->get_value(cursor, &value);
return (ret);
diff --git a/src/btree/bt_walk.c b/src/btree/bt_walk.c
index f257a955801..2705f371fb5 100644
--- a/src/btree/bt_walk.c
+++ b/src/btree/bt_walk.c
@@ -81,10 +81,11 @@ __wt_tree_walk(WT_SESSION_IMPL *session,
WT_PAGE *page;
WT_PAGE_INDEX *pindex;
WT_REF *couple, *couple_orig, *ref;
- int prev, skip;
+ int empty_internal, prev, skip;
uint32_t slot;
btree = S2BT(session);
+ empty_internal = 0;
/*
* Tree walks are special: they look inside page structures that splits
@@ -171,6 +172,15 @@ ascend: /*
(!prev && slot == pindex->entries - 1)) {
ref = ref->home->pg_intl_parent_ref;
+ /*
+ * If we got all the way through an internal page and
+ * all of the child pages were deleted, evict it.
+ */
+ if (empty_internal) {
+ __wt_page_evict_soon(ref->page);
+ empty_internal = 0;
+ }
+
/* Optionally skip internal pages. */
if (LF_ISSET(WT_READ_SKIP_INTL))
goto ascend;
@@ -226,6 +236,13 @@ ascend: /*
if (ref->pindex_hint != slot)
ref->pindex_hint = slot;
+ /*
+ * If we see any child states other than deleted, the
+ * page isn't empty.
+ */
+ if (ref->state != WT_REF_DELETED)
+ empty_internal = 0;
+
if (LF_ISSET(WT_READ_CACHE)) {
/*
* Only look at unlocked pages in memory:
@@ -338,10 +355,10 @@ ascend: /*
*/
descend: couple = ref;
page = ref->page;
- if (page->type == WT_PAGE_ROW_INT ||
- page->type == WT_PAGE_COL_INT) {
+ if (WT_PAGE_IS_INTERNAL(page)) {
WT_INTL_INDEX_GET(session, page, pindex);
slot = prev ? pindex->entries - 1 : 0;
+ empty_internal = 1;
} else {
*refp = ref;
goto done;
diff --git a/src/btree/row_modify.c b/src/btree/row_modify.c
index f0a10cdf528..62177b7e4c7 100644
--- a/src/btree/row_modify.c
+++ b/src/btree/row_modify.c
@@ -263,7 +263,6 @@ int
__wt_update_alloc(
WT_SESSION_IMPL *session, WT_ITEM *value, WT_UPDATE **updp, size_t *sizep)
{
- WT_UPDATE *upd;
size_t size;
/*
@@ -271,16 +270,15 @@ __wt_update_alloc(
* the value into place.
*/
size = value == NULL ? 0 : value->size;
- WT_RET(__wt_calloc(session, 1, sizeof(WT_UPDATE) + size, &upd));
+ WT_RET(__wt_calloc(session, 1, sizeof(WT_UPDATE) + size, updp));
if (value == NULL)
- WT_UPDATE_DELETED_SET(upd);
+ WT_UPDATE_DELETED_SET(*updp);
else {
- upd->size = WT_STORE_SIZE(size);
- memcpy(WT_UPDATE_DATA(upd), value->data, size);
+ (*updp)->size = WT_STORE_SIZE(size);
+ memcpy(WT_UPDATE_DATA(*updp), value->data, size);
}
- *updp = upd;
- *sizep = WT_UPDATE_MEMSIZE(upd);
+ *sizep = WT_UPDATE_MEMSIZE(*updp);
return (0);
}
diff --git a/src/docs/wtperf.dox b/src/docs/wtperf.dox
index 100857a985d..e541d8df873 100644
--- a/src/docs/wtperf.dox
+++ b/src/docs/wtperf.dox
@@ -233,8 +233,8 @@ be 'threads=((count=2,reads=1)(count=8,reads=1,inserts=2,updates=1))'
which would create 2 threads doing nothing but reads and 8 threads
each doing 50% inserts and 25% reads and updates. Allowed
configuration values are 'count', 'throttle', 'reads', 'inserts',
-'updates'. There are also behavior modifiers, supported modifiers are
-'ops_per_txn'
+'updates', 'truncate', 'truncate_pct' and 'truncate_count'. There are
+also behavior modifiers, supported modifiers are 'ops_per_txn'
@par transaction_config (string, default=)
transaction configuration string, relevant when populate_opts_per_txn
is nonzero
diff --git a/src/evict/evict_lru.c b/src/evict/evict_lru.c
index 6aa61b4137b..dc46c1abf1e 100644
--- a/src/evict/evict_lru.c
+++ b/src/evict/evict_lru.c
@@ -1245,6 +1245,9 @@ fast: /* If the page can't be evicted, give up. */
WT_RET(__wt_verbose(session, WT_VERB_EVICTSERVER,
"select: %p, size %" PRIu64, page, page->memory_footprint));
}
+ WT_RET_NOTFOUND_OK(ret);
+
+ *slotp += (u_int)(evict - start);
/*
* If we happen to end up on the root page, clear it. We have to track
@@ -1257,16 +1260,12 @@ fast: /* If the page can't be evicted, give up. */
if ((ref = btree->evict_ref) != NULL && (__wt_ref_is_root(ref) ||
ref->page->read_gen == WT_READGEN_OLDEST)) {
btree->evict_ref = NULL;
- __wt_page_release(session, ref, WT_READ_NO_EVICT);
+ WT_RET(__wt_page_release(session, ref, WT_READ_NO_EVICT));
}
- /* If the walk was interrupted by a locked page, that's okay. */
- if (ret == WT_NOTFOUND)
- ret = 0;
-
- *slotp += (u_int)(evict - start);
WT_STAT_FAST_CONN_INCRV(session, cache_eviction_walk, pages_walked);
- return (ret);
+
+ return (0);
}
/*
diff --git a/src/include/extern.h b/src/include/extern.h
index 992e65851eb..df76c5b8ff6 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -643,8 +643,8 @@ extern uint32_t __wt_nlpo2(uint32_t v);
extern uint32_t __wt_log2_int(uint32_t n);
extern int __wt_ispo2(uint32_t v);
extern uint32_t __wt_rduppo2(uint32_t n, uint32_t po2);
-extern void __wt_random_init(uint64_t volatile *rnd_state);
-extern uint32_t __wt_random(uint64_t volatile *rnd_state);
+extern void __wt_random_init(WT_RAND_STATE volatile *rnd_state);
+extern uint32_t __wt_random(WT_RAND_STATE volatile *rnd_state);
extern int __wt_buf_grow_worker(WT_SESSION_IMPL *session, WT_ITEM *buf, size_t size);
extern int __wt_buf_fmt(WT_SESSION_IMPL *session, WT_ITEM *buf, const char *fmt, ...) WT_GCC_FUNC_DECL_ATTRIBUTE((format (printf, 3, 4)));
extern int __wt_buf_catfmt(WT_SESSION_IMPL *session, WT_ITEM *buf, const char *fmt, ...) WT_GCC_FUNC_DECL_ATTRIBUTE((format (printf, 3, 4)));
diff --git a/src/include/misc.h b/src/include/misc.h
index bcb5e054865..1b2cbf11fc2 100644
--- a/src/include/misc.h
+++ b/src/include/misc.h
@@ -256,3 +256,11 @@
#define __wt_page_swap(session, held, want, flags) \
__wt_page_swap_func(session, held, want, flags)
#endif
+
+/* Random number generator state. */
+union __wt_rand_state {
+ uint64_t v;
+ struct {
+ uint32_t w, z;
+ } x;
+};
diff --git a/src/include/mutex.h b/src/include/mutex.h
index 8e0a2e2d5a6..a08d504a054 100644
--- a/src/include/mutex.h
+++ b/src/include/mutex.h
@@ -24,24 +24,20 @@ struct __wt_condvar {
/*
* !!!
- * Don't touch this structure without understanding the read/write
- * locking functions.
+ * Don't modify this structure without understanding the read/write locking
+ * functions.
*/
-typedef union { /* Read/write lock */
-#ifdef WORDS_BIGENDIAN
- WiredTiger read/write locks require modification for big-endian systems.
-#else
+typedef union { /* Read/write lock */
uint64_t u;
struct {
- uint32_t us;
+ uint32_t wr; /* Writers and readers */
} i;
struct {
- uint16_t writers;
- uint16_t readers;
- uint16_t users;
- uint16_t pad;
+ uint16_t writers; /* Now serving for writers */
+ uint16_t readers; /* Now serving for readers */
+ uint16_t users; /* Next available ticket number */
+ uint16_t __notused; /* Padding */
} s;
-#endif
} wt_rwlock_t;
/*
diff --git a/src/include/serial.i b/src/include/serial.i
index 9e6b0f7916c..9e5acde9616 100644
--- a/src/include/serial.i
+++ b/src/include/serial.i
@@ -30,6 +30,32 @@ __page_write_gen_wrapped_check(WT_PAGE *page)
}
/*
+ * __insert_simple_func --
+ * Worker function to add a WT_INSERT entry to the middle of a skiplist.
+ */
+static inline int
+__insert_simple_func(WT_SESSION_IMPL *session,
+ WT_INSERT ***ins_stack, WT_INSERT *new_ins, u_int skipdepth)
+{
+ u_int i;
+
+ WT_UNUSED(session);
+
+ /*
+ * Update the skiplist elements referencing the new WT_INSERT item.
+ * If we fail connecting one of the upper levels in the skiplist,
+ * return success: the levels we updated are correct and sufficient.
+ * Even though we don't get the benefit of the memory we allocated,
+ * we can't roll back.
+ */
+ for (i = 0; i < skipdepth; i++)
+ if (!WT_ATOMIC_CAS8(*ins_stack[i], new_ins->next[i], new_ins))
+ return (i == 0 ? WT_RESTART : 0);
+
+ return (0);
+}
+
+/*
* __insert_serial_func --
* Worker function to add a WT_INSERT entry to a skiplist.
*/
@@ -42,31 +68,20 @@ __insert_serial_func(WT_SESSION_IMPL *session, WT_INSERT_HEAD *ins_head,
WT_UNUSED(session);
/*
- * Confirm we are still in the expected position, and no item has been
- * added where our insert belongs. Take extra care at the beginning
- * and end of the list (at each level): retry if we race there.
+ * Update the skiplist elements referencing the new WT_INSERT item.
*
- * !!!
- * Note the test for ins_stack[0] == NULL: that's the test for an
- * uninitialized cursor, ins_stack[0] is cleared as part of
- * initializing a cursor for a search.
+ * Confirm we are still in the expected position, and no item has been
+ * added where our insert belongs. If we fail connecting one of the
+ * upper levels in the skiplist, return success: the levels we updated
+ * are correct and sufficient. Even though we don't get the benefit of
+ * the memory we allocated, we can't roll back.
*/
for (i = 0; i < skipdepth; i++) {
- if (ins_stack[i] == NULL ||
- *ins_stack[i] != new_ins->next[i])
- return (WT_RESTART);
- if (new_ins->next[i] == NULL &&
- ins_head->tail[i] != NULL &&
- ins_stack[i] != &ins_head->tail[i]->next[i])
- return (WT_RESTART);
- }
-
- /* Update the skiplist elements referencing the new WT_INSERT item. */
- for (i = 0; i < skipdepth; i++) {
+ if (!WT_ATOMIC_CAS8(*ins_stack[i], new_ins->next[i], new_ins))
+ return (i == 0 ? WT_RESTART : 0);
if (ins_head->tail[i] == NULL ||
ins_stack[i] == &ins_head->tail[i]->next[i])
ins_head->tail[i] = new_ins;
- *ins_stack[i] = new_ins;
}
return (0);
@@ -128,12 +143,19 @@ __wt_col_append_serial(WT_SESSION_IMPL *session, WT_PAGE *page,
WT_INSERT *new_ins = *new_insp;
WT_DECL_RET;
- /* Clear references to memory we now own. */
- *new_insp = NULL;
+ /* !!!
+ * Test for an uninitialized cursor, ins_stack[0] is cleared as part of
+ * initializing a cursor for a search.
+ */
+ if (ins_stack[0] == NULL)
+ return (WT_RESTART);
/* Check for page write generation wrap. */
WT_RET(__page_write_gen_wrapped_check(page));
+ /* Clear references to memory we now own and must free on error. */
+ *new_insp = NULL;
+
/* Acquire the page's spinlock, call the worker function. */
WT_PAGE_LOCK(session, page);
ret = __col_append_serial_func(
@@ -171,21 +193,39 @@ __wt_insert_serial(WT_SESSION_IMPL *session, WT_PAGE *page,
{
WT_INSERT *new_ins = *new_insp;
WT_DECL_RET;
+ int simple;
+ u_int i;
- /* Clear references to memory we now own. */
- *new_insp = NULL;
+ /* !!!
+ * Test for an uninitialized cursor, ins_stack[0] is cleared as part of
+ * initializing a cursor for a search.
+ */
+ if (ins_stack[0] == NULL)
+ return (WT_RESTART);
/* Check for page write generation wrap. */
WT_RET(__page_write_gen_wrapped_check(page));
- /* Acquire the page's spinlock, call the worker function. */
- WT_PAGE_LOCK(session, page);
- ret = __insert_serial_func(
- session, ins_head, ins_stack, new_ins, skipdepth);
- WT_PAGE_UNLOCK(session, page);
+ /* Clear references to memory we now own and must free on error. */
+ *new_insp = NULL;
+
+ simple = 1;
+ for (i = 0; i < skipdepth; i++)
+ if (new_ins->next[i] == NULL)
+ simple = 0;
+
+ if (simple)
+ ret = __insert_simple_func(
+ session, ins_stack, new_ins, skipdepth);
+ else {
+ WT_PAGE_LOCK(session, page);
+ ret = __insert_serial_func(
+ session, ins_head, ins_stack, new_ins, skipdepth);
+ WT_PAGE_UNLOCK(session, page);
+ }
- /* Free unused memory on error. */
if (ret != 0) {
+ /* Free unused memory on error. */
__wt_free(session, new_ins);
return (ret);
}
@@ -215,17 +255,15 @@ __wt_update_serial(WT_SESSION_IMPL *session, WT_PAGE *page,
WT_DECL_RET;
WT_UPDATE *obsolete, *upd = *updp;
- /* Clear references to memory we now own. */
- *updp = NULL;
-
/* Check for page write generation wrap. */
WT_RET(__page_write_gen_wrapped_check(page));
+ /* Clear references to memory we now own and must free on error. */
+ *updp = NULL;
+
/*
* Swap the update into place. If that fails, a new update was added
- * after our search, we raced. Check if our update is still permitted,
- * and if it is, do a full-barrier to ensure the update's next pointer
- * is set before we update the linked list and try again.
+ * after our search, we raced. Check if our update is still permitted.
*/
while (!WT_ATOMIC_CAS8(*srch_upd, upd->next, upd)) {
if ((ret = __wt_txn_update_check(
@@ -234,7 +272,6 @@ __wt_update_serial(WT_SESSION_IMPL *session, WT_PAGE *page,
__wt_free(session, upd);
return (ret);
}
- WT_WRITE_BARRIER();
}
/*
diff --git a/src/include/session.h b/src/include/session.h
index bf1aa98d8d3..f32da177bf9 100644
--- a/src/include/session.h
+++ b/src/include/session.h
@@ -148,7 +148,7 @@ struct WT_COMPILER_TYPE_ALIGN(WT_CACHE_LINE_ALIGNMENT) __wt_session_impl {
#define WT_SESSION_CLEAR_SIZE(s) \
(WT_PTRDIFF(&(s)->rnd, s))
- uint64_t rnd; /* Random number generation state */
+ WT_RAND_STATE rnd; /* Random number generation state */
/* Hashed handle reference list array */
SLIST_HEAD(__dhandles_hash, __wt_data_handle_cache) *dhhash;
diff --git a/src/include/txn.i b/src/include/txn.i
index a9b54d26e47..a8e052ec5eb 100644
--- a/src/include/txn.i
+++ b/src/include/txn.i
@@ -377,7 +377,8 @@ __wt_txn_id_check(WT_SESSION_IMPL *session)
do {
txn_state->id = txn->id = txn_global->current;
} while (!WT_ATOMIC_CAS8(
- txn_global->current, txn->id, txn->id + 1));
+ txn_global->current, txn->id, txn->id + 1) ||
+ WT_TXNID_LT(txn->id, txn_global->last_running));
/*
* If we have used 64-bits of transaction IDs, there is nothing
diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h
index b876a2d032d..64e29e104bc 100644
--- a/src/include/wt_internal.h
+++ b/src/include/wt_internal.h
@@ -266,6 +266,8 @@ struct __wt_upd_skipped;
typedef struct __wt_upd_skipped WT_UPD_SKIPPED;
struct __wt_update;
typedef struct __wt_update WT_UPDATE;
+union __wt_rand_state;
+ typedef union __wt_rand_state WT_RAND_STATE;
/*
* Forward type declarations for internal types: END
* DO NOT EDIT: automatically built by dist/s_typedef.
diff --git a/src/os_posix/os_mtx_rw.c b/src/os_posix/os_mtx_rw.c
index cdd4f8a24e1..df558b12bef 100644
--- a/src/os_posix/os_mtx_rw.c
+++ b/src/os_posix/os_mtx_rw.c
@@ -38,6 +38,78 @@
* Joseph Seigh. Note that a similar (but not identical) algorithm was published
* by John Mellor-Crummey and Michael Scott in their landmark paper "Scalable
* Reader-Writer Synchronization for Shared-Memory Multiprocessors".
+ *
+ * The following is an explanation of this code. First, the underlying lock
+ * structure.
+ *
+ * struct {
+ * uint16_t writers; Now serving for writers
+ * uint16_t readers; Now serving for readers
+ * uint16_t users; Next available ticket number
+ * uint16_t __notused; Padding
+ * }
+ *
+ * First, imagine a store's 'take a number' ticket algorithm. A customer takes
+ * a unique ticket number and customers are served in ticket order. In the data
+ * structure, 'writers' is the next writer to be served, 'readers' is the next
+ * reader to be served, and 'users' is the next available ticket number.
+ *
+ * Next, consider exclusive (write) locks. The 'now serving' number for writers
+ * is 'writers'. To lock, 'take a number' and wait until that number is being
+ * served; more specifically, atomically copy and increment the current value of
+ * 'users', and then wait until 'writers' equals that copied number.
+ *
+ * Shared (read) locks are similar. Like writers, readers atomically get the
+ * next number available. However, instead of waiting for 'writers' to equal
+ * their number, they wait for 'readers' to equal their number.
+ *
+ * This has the effect of queuing lock requests in the order they arrive
+ * (incidentally avoiding starvation).
+ *
+ * Each lock/unlock pair requires incrementing both 'readers' and 'writers'.
+ * In the case of a reader, the 'readers' increment happens when the reader
+ * acquires the lock (to allow read-lock sharing), and the 'writers' increment
+ * happens when the reader releases the lock. In the case of a writer, both
+ * 'readers' and 'writers' are incremented when the writer releases the lock.
+ *
+ * For example, consider the following read (R) and write (W) lock requests:
+ *
+ * writers readers users
+ * 0 0 0
+ * R: ticket 0, readers match OK 0 1 1
+ * R: ticket 1, readers match OK 0 2 2
+ * R: ticket 2, readers match OK 0 3 3
+ * W: ticket 3, writers no match block 0 3 4
+ * R: ticket 2, unlock 1 3 4
+ * R: ticket 0, unlock 2 3 4
+ * R: ticket 1, unlock 3 3 4
+ * W: ticket 3, writers match OK 3 3 4
+ *
+ * Note the writer blocks until 'writers' equals its ticket number and it does
+ * not matter if readers unlock in order or not.
+ *
+ * Readers or writers entering the system after the write lock is queued block,
+ * and the next ticket holder (reader or writer) will unblock when the writer
+ * unlocks. An example, continuing from the last line of the above example:
+ *
+ * writers readers users
+ * W: ticket 3, writers match OK 3 3 4
+ * R: ticket 4, readers no match block 3 3 5
+ * R: ticket 5, readers no match block 3 3 6
+ * W: ticket 6, writers no match block 3 3 7
+ * W: ticket 3, unlock 4 4 7
+ * R: ticket 4, readers match OK 4 5 7
+ * R: ticket 5, readers match OK 4 6 7
+ *
+ * The 'users' field is a 2-byte value so the available ticket number wraps at
+ * 64K requests. If a thread's lock request is not granted until the 'users'
+ * field cycles and the same ticket is taken by another thread, we could grant
+ * a lock to two separate threads at the same time, and bad things happen: two
+ * writer threads or a reader thread and a writer thread would run in parallel,
+ * and lock waiters could be skipped if the unlocks race. This is unlikely, it
+ * only happens if a lock request is blocked by 64K other requests. The fix is
+ * to grow the lock structure fields, but the largest atomic instruction we have
+ * is 8 bytes, the structure has no room to grow.
*/
#include "wt_internal.h"
@@ -69,20 +141,31 @@ __wt_rwlock_alloc(
int
__wt_try_readlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock)
{
- wt_rwlock_t *l;
- uint64_t old, new, pad, users, writers;
+ wt_rwlock_t *l, new, old;
WT_RET(__wt_verbose(
session, WT_VERB_MUTEX, "rwlock: try_readlock %s", rwlock->name));
WT_STAT_FAST_CONN_INCR(session, rwlock_read);
l = &rwlock->rwlock;
- pad = l->s.pad;
- users = l->s.users;
- writers = l->s.writers;
- old = (pad << 48) + (users << 32) + (users << 16) + writers;
- new = (pad << 48) + ((users + 1) << 32) + ((users + 1) << 16) + writers;
- return (WT_ATOMIC_CAS8(l->u, old, new) ? 0 : EBUSY);
+ new = old = *l;
+
+ /*
+ * This read lock can only be granted if the lock was last granted to
+ * a reader and there are no readers or writers blocked on the lock,
+ * that is, if this thread's ticket would be the next ticket granted.
+ * Do the cheap test to see if this can possibly succeed (and confirm
+ * the lock is in the correct state to grant this read lock).
+ */
+ if (old.s.readers != old.s.users)
+ return (EBUSY);
+
+ /*
+ * The replacement lock value is a result of allocating a new ticket and
+ * incrementing the reader value to match it.
+ */
+ new.s.readers = new.s.users = old.s.users + 1;
+ return (WT_ATOMIC_CAS8(l->u, old.u, new.u) ? 0 : EBUSY);
}
/*
@@ -93,8 +176,7 @@ int
__wt_readlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock)
{
wt_rwlock_t *l;
- uint64_t me;
- uint16_t val;
+ uint16_t ticket;
int pause_cnt;
WT_RET(__wt_verbose(
@@ -102,17 +184,22 @@ __wt_readlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock)
WT_STAT_FAST_CONN_INCR(session, rwlock_read);
l = &rwlock->rwlock;
- me = WT_ATOMIC_FETCH_ADD8(l->u, (uint64_t)1 << 32);
- val = (uint16_t)(me >> 32);
- for (pause_cnt = 0; val != l->s.readers;) {
+
+ /*
+ * Possibly wrap: if we have more than 64K lockers waiting, the ticket
+ * value will wrap and two lockers will simultaneously be granted the
+ * lock.
+ */
+ ticket = WT_ATOMIC_FETCH_ADD2(l->s.users, 1);
+ for (pause_cnt = 0; ticket != l->s.readers;) {
/*
* We failed to get the lock; pause before retrying and if we've
* paused enough, sleep so we don't burn CPU to no purpose. This
* situation happens if there are more threads than cores in the
- * system and we're thrashing on shared resources. Regardless,
- * don't sleep long, all we need is to schedule the other reader
- * threads to complete a few more instructions and increment the
- * reader count.
+ * system and we're thrashing on shared resources.
+ *
+ * Don't sleep long when waiting on a read lock, hopefully we're
+ * waiting on another read thread to increment the reader count.
*/
if (++pause_cnt < 1000)
WT_PAUSE();
@@ -120,6 +207,10 @@ __wt_readlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock)
__wt_sleep(0, 10);
}
+ /*
+ * We're the only writer of the readers field, so the update does not
+ * need to be atomic.
+ */
++l->s.readers;
return (0);
@@ -138,6 +229,11 @@ __wt_readunlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock)
session, WT_VERB_MUTEX, "rwlock: read unlock %s", rwlock->name));
l = &rwlock->rwlock;
+
+ /*
+ * Increment the writers value (other readers are doing the same, make
+ * sure we don't race).
+ */
WT_ATOMIC_ADD2(l->s.writers, 1);
return (0);
@@ -150,20 +246,28 @@ __wt_readunlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock)
int
__wt_try_writelock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock)
{
- wt_rwlock_t *l;
- uint64_t old, new, pad, readers, users;
+ wt_rwlock_t *l, new, old;
WT_RET(__wt_verbose(
session, WT_VERB_MUTEX, "rwlock: try_writelock %s", rwlock->name));
WT_STAT_FAST_CONN_INCR(session, rwlock_write);
l = &rwlock->rwlock;
- pad = l->s.pad;
- readers = l->s.readers;
- users = l->s.users;
- old = (pad << 48) + (users << 32) + (readers << 16) + users;
- new = (pad << 48) + ((users + 1) << 32) + (readers << 16) + users;
- return (WT_ATOMIC_CAS8(l->u, old, new) ? 0 : EBUSY);
+ old = new = *l;
+
+ /*
+ * This write lock can only be granted if the lock was last granted to
+ * a writer and there are no readers or writers blocked on the lock,
+ * that is, if this thread's ticket would be the next ticket granted.
+ * Do the cheap test to see if this can possibly succeed (and confirm
+ * the lock is in the correct state to grant this write lock).
+ */
+ if (old.s.writers != old.s.users)
+ return (EBUSY);
+
+ /* The replacement lock value is a result of allocating a new ticket. */
+ ++new.s.users;
+ return (WT_ATOMIC_CAS8(l->u, old.u, new.u) ? 0 : EBUSY);
}
/*
@@ -174,23 +278,33 @@ int
__wt_writelock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock)
{
wt_rwlock_t *l;
- uint64_t me;
- uint16_t val;
+ uint16_t ticket;
+ int pause_cnt;
WT_RET(__wt_verbose(
session, WT_VERB_MUTEX, "rwlock: writelock %s", rwlock->name));
WT_STAT_FAST_CONN_INCR(session, rwlock_write);
+ l = &rwlock->rwlock;
+
/*
- * Possibly wrap: if we have more than 64K lockers waiting, the count
- * of writers will wrap and two lockers will simultaneously be granted
- * the write lock.
+ * Possibly wrap: if we have more than 64K lockers waiting, the ticket
+ * value will wrap and two lockers will simultaneously be granted the
+ * lock.
*/
- l = &rwlock->rwlock;
- me = WT_ATOMIC_FETCH_ADD8(l->u, (uint64_t)1 << 32);
- val = (uint16_t)(me >> 32);
- while (val != l->s.writers)
- WT_PAUSE();
+ ticket = WT_ATOMIC_FETCH_ADD2(l->s.users, 1);
+ for (pause_cnt = 0; ticket != l->s.writers;) {
+ /*
+ * We failed to get the lock; pause before retrying and if we've
+ * paused enough, sleep so we don't burn CPU to no purpose. This
+ * situation happens if there are more threads than cores in the
+ * system and we're thrashing on shared resources.
+ */
+ if (++pause_cnt < 1000)
+ WT_PAUSE();
+ else
+ __wt_sleep(0, 10);
+ }
return (0);
}
@@ -211,12 +325,23 @@ __wt_writeunlock(WT_SESSION_IMPL *session, WT_RWLOCK *rwlock)
copy = *l;
+ /*
+ * We're the only writer of the writers/readers fields, so the update
+ * does not need to be atomic; we have to update both values at the
+ * same time though, otherwise we'd potentially race with the thread
+ * next granted the lock.
+ *
+ * Use a memory barrier to ensure the compiler doesn't mess with these
+ * instructions and rework the code in a way that avoids the update as
+ * a unit.
+ */
WT_BARRIER();
++copy.s.writers;
++copy.s.readers;
- l->i.us = copy.i.us;
+ l->i.wr = copy.i.wr;
+
return (0);
}
diff --git a/src/reconcile/rec_write.c b/src/reconcile/rec_write.c
index 53a73b44feb..37acb28a00b 100644
--- a/src/reconcile/rec_write.c
+++ b/src/reconcile/rec_write.c
@@ -343,11 +343,12 @@ __wt_reconcile(WT_SESSION_IMPL *session,
WT_PAGE *page;
WT_PAGE_MODIFY *mod;
WT_RECONCILE *r;
- int locked;
+ int page_lock, scan_lock, split_lock;
conn = S2C(session);
page = ref->page;
mod = page->modify;
+ page_lock = scan_lock = split_lock = 0;
/* We're shouldn't get called with a clean page, that's an error. */
if (!__wt_page_is_modified(page))
@@ -386,22 +387,38 @@ __wt_reconcile(WT_SESSION_IMPL *session,
/*
* The compaction process looks at the page's modification information;
- * if compaction is running, lock the page down.
- *
- * Otherwise, flip on the scanning flag: obsolete updates cannot be
- * freed while reconciliation is in progress.
+ * if compaction is running, acquire the page's lock.
*/
- locked = 0;
if (conn->compact_in_memory_pass) {
- locked = 1;
WT_PAGE_LOCK(session, page);
- } else
+ page_lock = 1;
+ }
+
+ /*
+ * Reconciliation reads the lists of updates, so obsolete updates cannot
+ * be discarded while reconciliation is in progress.
+ */
+ for (;;) {
+ F_CAS_ATOMIC(page, WT_PAGE_SCANNING, ret);
+ if (ret == 0)
+ break;
+ __wt_yield();
+ }
+ scan_lock = 1;
+
+ /*
+ * Mark internal pages as splitting to ensure we don't deadlock when
+ * performing an in-memory split during a checkpoint.
+ */
+ if (WT_PAGE_IS_INTERNAL(page)) {
for (;;) {
- F_CAS_ATOMIC(page, WT_PAGE_SCANNING, ret);
+ F_CAS_ATOMIC(page, WT_PAGE_SPLIT_LOCKED, ret);
if (ret == 0)
break;
__wt_yield();
}
+ split_lock = 1;
+ }
/* Reconcile the page. */
switch (page->type) {
@@ -434,11 +451,13 @@ __wt_reconcile(WT_SESSION_IMPL *session,
else
WT_TRET(__rec_write_wrapup_err(session, r, page));
- /* Release the page lock if we're holding one. */
- if (locked)
- WT_PAGE_UNLOCK(session, page);
- else
+ /* Release the locks we're holding. */
+ if (split_lock)
+ F_CLR_ATOMIC(page, WT_PAGE_SPLIT_LOCKED);
+ if (scan_lock)
F_CLR_ATOMIC(page, WT_PAGE_SCANNING);
+ if (page_lock)
+ WT_PAGE_UNLOCK(session, page);
/*
* Clean up the boundary structures: some workloads result in millions
@@ -3266,18 +3285,6 @@ __rec_col_int(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page)
WT_RET(__rec_split_init(
session, r, page, page->pg_intl_recno, btree->maxintlpage));
- /*
- * We need to mark this page as splitting, as this may be an in-memory
- * split during a checkpoint.
- */
- for (;;) {
- F_CAS_ATOMIC(page, WT_PAGE_SPLIT_LOCKED, ret);
- if (ret == 0) {
- break;
- }
- __wt_yield();
- }
-
/* For each entry in the in-memory page... */
WT_INTL_FOREACH_BEGIN(session, page, ref) {
/* Update the starting record number in case we split. */
@@ -3360,8 +3367,6 @@ __rec_col_int(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page)
__rec_copy_incr(session, r, val);
} WT_INTL_FOREACH_END;
- F_CLR_ATOMIC(page, WT_PAGE_SPLIT_LOCKED);
-
/* Write the remnant page. */
return (__rec_split_finish(session, r));
@@ -4094,18 +4099,6 @@ __rec_row_int(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page)
*/
r->cell_zero = 1;
- /*
- * We need to mark this page as splitting in order to ensure we don't
- * deadlock when performing an in-memory split during a checkpoint.
- */
- for (;;) {
- F_CAS_ATOMIC(page, WT_PAGE_SPLIT_LOCKED, ret);
- if (ret == 0) {
- break;
- }
- __wt_yield();
- }
-
/* For each entry in the in-memory page... */
WT_INTL_FOREACH_BEGIN(session, page, ref) {
/*
@@ -4264,8 +4257,6 @@ __rec_row_int(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page)
__rec_key_state_update(r, ovfl_key);
} WT_INTL_FOREACH_END;
- F_CLR_ATOMIC(page, WT_PAGE_SPLIT_LOCKED);
-
/* Write the remnant page. */
return (__rec_split_finish(session, r));
diff --git a/src/session/session_api.c b/src/session/session_api.c
index ef9735a8b98..e7ac9d9b365 100644
--- a/src/session/session_api.c
+++ b/src/session/session_api.c
@@ -800,7 +800,7 @@ __session_commit_transaction(WT_SESSION *wt_session, const char *config)
WT_STAT_FAST_CONN_INCR(session, txn_commit);
txn = &session->txn;
- if (F_ISSET(txn, WT_TXN_ERROR)) {
+ if (F_ISSET(txn, WT_TXN_ERROR) && txn->mod_count != 0) {
__wt_errx(session, "failed transaction requires rollback");
ret = EINVAL;
}
diff --git a/src/support/rand.c b/src/support/rand.c
index 7dfb98c5ca4..caac04d3529 100644
--- a/src/support/rand.c
+++ b/src/support/rand.c
@@ -41,18 +41,18 @@
* of the values to avoid that, and read/write in atomic, 8B chunks.
*/
#undef M_W
-#define M_W(p) ((uint32_t *)&(p))[0]
+#define M_W(r) r.x.w
#undef M_Z
-#define M_Z(p) ((uint32_t *)&(p))[1]
+#define M_Z(r) r.x.z
/*
* __wt_random_init --
* Initialize return of a 32-bit pseudo-random number.
*/
void
-__wt_random_init(uint64_t volatile * rnd_state)
+__wt_random_init(WT_RAND_STATE volatile * rnd_state)
{
- uint64_t rnd;
+ WT_RAND_STATE rnd;
M_W(rnd) = 521288629;
M_Z(rnd) = 362436069;
@@ -64,9 +64,9 @@ __wt_random_init(uint64_t volatile * rnd_state)
* Return a 32-bit pseudo-random number.
*/
uint32_t
-__wt_random(uint64_t volatile * rnd_state)
+__wt_random(WT_RAND_STATE volatile * rnd_state)
{
- uint64_t rnd;
+ WT_RAND_STATE rnd;
uint32_t w, z;
/*
diff --git a/src/txn/txn.c b/src/txn/txn.c
index 7f8a944acbd..a9566876c8a 100644
--- a/src/txn/txn.c
+++ b/src/txn/txn.c
@@ -182,10 +182,6 @@ __wt_txn_get_snapshot(WT_SESSION_IMPL *session)
WT_ASSERT(session, prev_oldest_id == txn_global->oldest_id);
txn_state->snap_min = snap_min;
- /* Update the last running ID if we have a much newer value. */
- if (snap_min > txn_global->last_running + 100)
- txn_global->last_running = snap_min;
-
WT_ASSERT(session, txn_global->scan_count > 0);
(void)WT_ATOMIC_SUB4(txn_global->scan_count, 1);
@@ -287,30 +283,38 @@ __wt_txn_update_oldest(WT_SESSION_IMPL *session, int force)
oldest_id = id;
/* Update the last running ID. */
- if (WT_TXNID_LT(txn_global->last_running, snap_min)) {
- txn_global->last_running = snap_min;
- last_running_moved = 1;
- } else
- last_running_moved = 0;
+ last_running_moved = WT_TXNID_LT(txn_global->last_running, snap_min);
/* Update the oldest ID. */
- if (WT_TXNID_LT(prev_oldest_id, oldest_id) &&
+ if ((WT_TXNID_LT(prev_oldest_id, oldest_id) || last_running_moved) &&
WT_ATOMIC_CAS4(txn_global->scan_count, 1, -1)) {
WT_ORDERED_READ(session_cnt, conn->session_cnt);
for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) {
if ((id = s->id) != WT_TXN_NONE &&
- WT_TXNID_LT(id, oldest_id))
+ WT_TXNID_LT(id, snap_min))
oldest_id = id;
if ((id = s->snap_min) != WT_TXN_NONE &&
WT_TXNID_LT(id, oldest_id))
oldest_id = id;
}
- /* Make sure the ID doesn't move past any named snapshots. */
- WT_ASSERT(session,
- (id = txn_global->nsnap_oldest_id) == WT_TXN_NONE ||
- !WT_TXNID_LT(id, oldest_id));
+ if (WT_TXNID_LT(snap_min, oldest_id))
+ oldest_id = snap_min;
+#ifdef HAVE_DIAGNOSTIC
+ /*
+ * Make sure the ID doesn't move past any named snapshots.
+ *
+ * Don't include the read/assignment in the assert statement.
+ * Coverity complains if there are assignments only done in
+ * diagnostic builds, and when the read is from a volatile.
+ */
+ id = txn_global->nsnap_oldest_id;
+ WT_ASSERT(session,
+ id == WT_TXN_NONE || !WT_TXNID_LT(id, oldest_id));
+#endif
+ if (WT_TXNID_LT(txn_global->last_running, snap_min))
+ txn_global->last_running = snap_min;
if (WT_TXNID_LT(txn_global->oldest_id, oldest_id))
txn_global->oldest_id = oldest_id;
txn_global->scan_count = 0;
@@ -412,6 +416,9 @@ __wt_txn_release(WT_SESSION_IMPL *session)
txn_global->checkpoint_id = 0;
txn_global->checkpoint_pinned = WT_TXN_NONE;
} else if (F_ISSET(txn, WT_TXN_HAS_ID)) {
+ WT_ASSERT(session,
+ !WT_TXNID_LT(txn->id, txn_global->last_running));
+
WT_ASSERT(session, txn_state->id != WT_TXN_NONE &&
txn->id != WT_TXN_NONE);
WT_PUBLISH(txn_state->id, WT_TXN_NONE);
@@ -462,7 +469,7 @@ __wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[])
txn = &session->txn;
conn = S2C(session);
- WT_ASSERT(session, !F_ISSET(txn, WT_TXN_ERROR));
+ WT_ASSERT(session, !F_ISSET(txn, WT_TXN_ERROR) || txn->mod_count == 0);
if (!F_ISSET(txn, WT_TXN_RUNNING))
WT_RET_MSG(session, EINVAL, "No transaction is active");
@@ -586,6 +593,7 @@ __wt_txn_rollback(WT_SESSION_IMPL *session, const char *cfg[])
switch (op->type) {
case WT_TXN_OP_BASIC:
case WT_TXN_OP_INMEM:
+ WT_ASSERT(session, op->u.upd->txnid == txn->id);
op->u.upd->txnid = WT_TXN_ABORTED;
break;
case WT_TXN_OP_REF:
diff --git a/test/checkpoint/workers.c b/test/checkpoint/workers.c
index 664acd6183d..5cd2ef4e97b 100644
--- a/test/checkpoint/workers.c
+++ b/test/checkpoint/workers.c
@@ -172,7 +172,7 @@ real_worker(void)
{
WT_CURSOR **cursors;
WT_SESSION *session;
- uint64_t rnd;
+ WT_RAND_STATE rnd;
u_int i, keyno;
int j, ret, t_ret;
diff --git a/test/fops/fops.c b/test/fops/fops.c
index e86795c4fc0..0a83e8511f4 100644
--- a/test/fops/fops.c
+++ b/test/fops/fops.c
@@ -98,7 +98,7 @@ fop(void *arg)
{
STATS *s;
uintptr_t id;
- uint64_t rnd;
+ WT_RAND_STATE rnd;
u_int i;
id = (uintptr_t)arg;
diff --git a/test/format/format.h b/test/format/format.h
index 6cbfd882616..4ec2734aee9 100644
--- a/test/format/format.h
+++ b/test/format/format.h
@@ -151,7 +151,7 @@ typedef struct {
pthread_rwlock_t backup_lock; /* Hot backup running */
- uint64_t rnd; /* Global RNG state */
+ WT_RAND_STATE rnd; /* Global RNG state */
/*
* We have a list of records that are appended, but not yet "resolved",
@@ -269,7 +269,7 @@ typedef struct {
extern GLOBAL g;
typedef struct {
- uint64_t rnd; /* thread RNG state */
+ WT_RAND_STATE rnd; /* thread RNG state */
uint64_t search; /* operations */
uint64_t insert;
@@ -312,15 +312,15 @@ void config_setup(void);
void config_single(const char *, int);
void fclose_and_clear(FILE **);
void key_gen(uint8_t *, size_t *, uint64_t);
-void key_gen_insert(uint64_t *, uint8_t *, size_t *, uint64_t);
+void key_gen_insert(WT_RAND_STATE *, uint8_t *, size_t *, uint64_t);
void key_gen_setup(uint8_t **);
void key_len_setup(void);
void *lrt(void *);
void path_setup(const char *);
-uint32_t rng(uint64_t *);
+uint32_t rng(WT_RAND_STATE *);
void track(const char *, uint64_t, TINFO *);
-void val_gen(uint64_t *, uint8_t *, size_t *, uint64_t);
-void val_gen_setup(uint64_t *, uint8_t **);
+void val_gen(WT_RAND_STATE *, uint8_t *, size_t *, uint64_t);
+void val_gen_setup(WT_RAND_STATE *, uint8_t **);
void wts_close(void);
void wts_create(void);
void wts_dump(const char *, int);
@@ -343,7 +343,7 @@ __attribute__((__noreturn__))
* Return a random value between a min/max pair.
*/
static inline uint32_t
-mmrand(uint64_t *rnd, u_int min, u_int max)
+mmrand(WT_RAND_STATE *rnd, u_int min, u_int max)
{
return (rng(rnd) % (((max) + 1) - (min)) + (min));
}
diff --git a/test/format/ops.c b/test/format/ops.c
index b9606ad8f36..7d3b22175ca 100644
--- a/test/format/ops.c
+++ b/test/format/ops.c
@@ -202,7 +202,7 @@ wts_ops(int lastrun)
* Return the current session configuration.
*/
static const char *
-ops_session_config(uint64_t *rnd)
+ops_session_config(WT_RAND_STATE *rnd)
{
u_int v;
diff --git a/test/format/util.c b/test/format/util.c
index 8d077f6caa7..9d28b7a81bc 100644
--- a/test/format/util.c
+++ b/test/format/util.c
@@ -33,7 +33,7 @@
#endif
static inline uint32_t
-kv_len(uint64_t *rnd, uint64_t keyno, uint32_t min, uint32_t max)
+kv_len(WT_RAND_STATE *rnd, uint64_t keyno, uint32_t min, uint32_t max)
{
/*
* Focus on relatively small key/value items, admitting the possibility
@@ -116,7 +116,7 @@ key_gen(uint8_t *key, size_t *sizep, uint64_t keyno)
}
void
-key_gen_insert(uint64_t *rnd, uint8_t *key, size_t *sizep, uint64_t keyno)
+key_gen_insert(WT_RAND_STATE *rnd, uint8_t *key, size_t *sizep, uint64_t keyno)
{
key_gen_common(key, sizep, keyno, (int)mmrand(rnd, 1, 15));
}
@@ -124,7 +124,7 @@ key_gen_insert(uint64_t *rnd, uint8_t *key, size_t *sizep, uint64_t keyno)
static uint32_t val_dup_data_len; /* Length of duplicate data items */
void
-val_gen_setup(uint64_t *rnd, uint8_t **valp)
+val_gen_setup(WT_RAND_STATE *rnd, uint8_t **valp)
{
uint8_t *val;
size_t i, len;
@@ -151,7 +151,7 @@ val_gen_setup(uint64_t *rnd, uint8_t **valp)
}
void
-val_gen(uint64_t *rnd, uint8_t *val, size_t *sizep, uint64_t keyno)
+val_gen(WT_RAND_STATE *rnd, uint8_t *val, size_t *sizep, uint64_t keyno)
{
/*
* Fixed-length records: take the low N bits from the last digit of
@@ -361,7 +361,7 @@ path_setup(const char *home)
* Return a random number.
*/
uint32_t
-rng(uint64_t *rnd)
+rng(WT_RAND_STATE *rnd)
{
char buf[64];
uint32_t r;
diff --git a/test/suite/test_txn12.py b/test/suite/test_txn12.py
new file mode 100644
index 00000000000..0901811535e
--- /dev/null
+++ b/test/suite/test_txn12.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2015 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+import wiredtiger, wttest
+from suite_subprocess import suite_subprocess
+from wiredtiger import stat
+from wtscenario import multiply_scenarios, number_scenarios
+
+# test_txn12.py
+# test of commit following failed op in a read only transaction.
+class test_txn12(wttest.WiredTigerTestCase, suite_subprocess):
+ name = 'test_txn12'
+ uri = 'table:' + name
+ create_params = 'key_format=i,value_format=i'
+
+ # Test that read-only transactions can commit following a failure.
+ def test_txn12(self):
+
+ # Setup the session and table.
+ session = self.conn.open_session(None)
+ session.create(self.uri, self.create_params)
+ session.begin_transaction("isolation=snapshot")
+
+ # Create a read only transaction.
+ c = session.open_cursor(self.uri, None)
+ c.next()
+ msg = '/next_random.*boolean/'
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda:session.open_cursor(self.uri, None, "next_random=bar"), msg)
+ # This commit should succeed as we have done no writes.
+ session.commit_transaction()
+
+ # Create a read/write transaction.
+ session.begin_transaction("isolation=snapshot")
+ c = session.open_cursor(self.uri, None)
+ c[123] = 123
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda:session.open_cursor(self.uri, None, "next_random=bar"), msg)
+ # This commit should fail as we have written something
+ self.assertRaisesWithMessage(wiredtiger.WiredTigerError,
+ lambda:session.commit_transaction(), '/requires rollback/')
+
+if __name__ == '__main__':
+ wttest.run()
+
diff --git a/test/thread/rw.c b/test/thread/rw.c
index 402789dd2a5..c9e2e78ec35 100644
--- a/test/thread/rw.c
+++ b/test/thread/rw.c
@@ -36,7 +36,7 @@ typedef struct {
char *name; /* object name */
u_int nops; /* Thread op count */
- uint64_t rnd; /* RNG */
+ WT_RAND_STATE rnd; /* RNG */
int remove; /* cursor.remove */
int update; /* cursor.update */