summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen.cxx423
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen.h42
-rw-r--r--src/third_party/wiredtiger/bench/workgen/workgen_int.h42
-rwxr-xr-x[-rw-r--r--]src/third_party/wiredtiger/bench/workgen/wtperf.py24
-rw-r--r--src/third_party/wiredtiger/bench/wtperf/runners/checkpoint-latency.wtperf25
-rw-r--r--src/third_party/wiredtiger/dist/s_string.ok1
-rw-r--r--src/third_party/wiredtiger/import.data2
-rw-r--r--src/third_party/wiredtiger/src/btree/bt_cursor.c145
-rw-r--r--src/third_party/wiredtiger/src/btree/bt_read.c34
-rw-r--r--src/third_party/wiredtiger/src/cache/cache_las.c6
-rw-r--r--src/third_party/wiredtiger/src/cursor/cur_file.c17
-rw-r--r--src/third_party/wiredtiger/src/include/extern.h4
-rw-r--r--src/third_party/wiredtiger/src/include/misc.h7
-rw-r--r--src/third_party/wiredtiger/src/include/txn.i17
-rw-r--r--src/third_party/wiredtiger/src/reconcile/rec_write.c18
-rw-r--r--src/third_party/wiredtiger/src/txn/txn.c10
-rw-r--r--src/third_party/wiredtiger/test/csuite/timestamp_abort/main.c202
-rw-r--r--src/third_party/wiredtiger/test/suite/test_prepare_lookaside01.py4
18 files changed, 608 insertions, 415 deletions
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.cxx b/src/third_party/wiredtiger/bench/workgen/workgen.cxx
index 9ae63682f9c..39aacb89dc8 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen.cxx
+++ b/src/third_party/wiredtiger/bench/workgen/workgen.cxx
@@ -257,7 +257,7 @@ int ContextInternal::create_all() {
if (_runtime_alloced < _tint_last) {
// The array references are 1-based, we'll waste one entry.
TableRuntime *new_table_runtime = new TableRuntime[_tint_last + 1];
- for (int i = 0; i < _runtime_alloced; i++)
+ for (uint32_t i = 0; i < _runtime_alloced; i++)
new_table_runtime[i + 1] = _table_runtime[i + 1];
delete _table_runtime;
_table_runtime = new_table_runtime;
@@ -318,12 +318,13 @@ int Monitor::run() {
new_totals.add(tr->_stats, true);
Stats interval(new_totals);
interval.subtract(prev_totals);
- interval.smooth(prev_interval);
int interval_secs = options->sample_interval;
uint64_t cur_reads = interval.read.ops / interval_secs;
uint64_t cur_inserts = interval.insert.ops / interval_secs;
uint64_t cur_updates = interval.update.ops / interval_secs;
+ bool checkpointing = new_totals.checkpoint.ops_in_progress > 0 ||
+ interval.checkpoint.ops > 0;
uint64_t totalsec = ts_sec(t - _wrunner._start);
(*_out) << time_buf
@@ -331,7 +332,7 @@ int Monitor::run() {
<< "," << cur_reads
<< "," << cur_inserts
<< "," << cur_updates
- << "," << 'N' // checkpoint in progress
+ << "," << (checkpointing ? 'Y' : 'N')
<< "," << interval.read.average_latency()
<< "," << interval.read.min_latency
<< "," << interval.read.max_latency
@@ -348,13 +349,22 @@ int Monitor::run() {
(void)strftime(time_buf, sizeof(time_buf),
WORKGEN_TIMESTAMP_JSON, tm);
-#define TRACK_JSON(name, t) \
- "\"" << (name) << "\":{" \
- << "\"ops per sec\":" << ((t).ops / interval_secs) \
- << ",\"average latency\":" << (t).average_latency() \
- << ",\"min latency\":" << (t).min_latency \
- << ",\"max latency\":" << (t).max_latency \
- << "}"
+ // Note: we could allow this to be configurable.
+ int percentiles[4] = {50, 95, 99, 0};
+
+#define TRACK_JSON(f, name, t, percentiles, extra) \
+ do { \
+ int _i; \
+ (f) << "\"" << (name) << "\":{" << extra \
+ << "\"ops per sec\":" << ((t).ops / interval_secs) \
+ << ",\"average latency\":" << (t).average_latency() \
+ << ",\"min latency\":" << (t).min_latency \
+ << ",\"max latency\":" << (t).max_latency; \
+ for (_i = 0; (percentiles)[_i] != 0; _i++) \
+ (f) << ",\"" << (percentiles)[_i] << "% latency\":" \
+ << (t).percentile_latency(percentiles[_i]); \
+ (f) << "}"; \
+ } while(0)
(*_json) << "{";
if (first) {
@@ -362,11 +372,16 @@ int Monitor::run() {
first = false;
}
(*_json) << "\"localTime\":\"" << time_buf
- << "\",\"workgen\":{"
- << TRACK_JSON("read", interval.read) << ","
- << TRACK_JSON("insert", interval.insert) << ","
- << TRACK_JSON("update", interval.update)
- << "}}" << std::endl;
+ << "\",\"workgen\":{";
+ TRACK_JSON(*_json, "read", interval.read, percentiles, "");
+ (*_json) << ",";
+ TRACK_JSON(*_json, "insert", interval.insert, percentiles, "");
+ (*_json) << ",";
+ TRACK_JSON(*_json, "update", interval.update, percentiles, "");
+ (*_json) << ",";
+ TRACK_JSON(*_json, "checkpoint", interval.checkpoint, percentiles,
+ "\"active\":" << (checkpointing ? "1," : "0,"));
+ (*_json) << "}}" << std::endl;
}
uint64_t read_max = interval.read.max_latency;
@@ -553,7 +568,7 @@ void ThreadRunner::op_create_all(Operation *op, size_t &keysize,
tint_t tint;
op->create_all();
- if (op->_optype != Operation::OP_NONE) {
+ if (op->is_table_op()) {
op->kv_compute_max(true, false);
if (OP_HAS_VALUE(op))
op->kv_compute_max(false, op->_table.options.random_value);
@@ -678,7 +693,7 @@ int ThreadRunner::op_run(Operation *op) {
&_throttle_limit));
_throttle_ops = 0;
}
- if (op->_optype != Operation::OP_NONE)
+ if (op->is_table_op())
++_throttle_ops;
}
@@ -691,6 +706,10 @@ int ThreadRunner::op_run(Operation *op) {
// (and most likely when the threads are first beginning). Any
// WT_NOTFOUND returns are allowed and get their own statistic bumped.
switch (op->_optype) {
+ case Operation::OP_CHECKPOINT:
+ recno = 0;
+ track = &_stats.checkpoint;
+ break;
case Operation::OP_INSERT:
track = &_stats.insert;
if (op->_key._keytype == Key::KEYGEN_APPEND ||
@@ -700,6 +719,10 @@ int ThreadRunner::op_run(Operation *op) {
else
recno = op_get_key_recno(op, range, tint);
break;
+ case Operation::OP_NONE:
+ case Operation::OP_NOOP:
+ recno = 0;
+ break;
case Operation::OP_REMOVE:
track = &_stats.remove;
recno = op_get_key_recno(op, range, tint);
@@ -712,12 +735,11 @@ int ThreadRunner::op_run(Operation *op) {
track = &_stats.update;
recno = op_get_key_recno(op, range, tint);
break;
- case Operation::OP_NONE:
+ case Operation::OP_SLEEP:
recno = 0;
break;
}
- if ((op->_flags & WORKGEN_OP_REOPEN) != 0 &&
- op->_optype != Operation::OP_NONE) {
+ if ((op->_internal->_flags & WORKGEN_OP_REOPEN) != 0) {
WT_ERR(_session->open_cursor(_session, op->_table._uri.c_str(), NULL,
NULL, &cursor));
own_cursor = true;
@@ -732,6 +754,10 @@ int ThreadRunner::op_run(Operation *op) {
timespec start;
if (measure_latency)
workgen_epoch(&start);
+ // Whether or not we are measuring latency, we track how many operations
+ // are in progress, or that complete.
+ if (track != NULL)
+ track->begin();
if (op->_transaction != NULL) {
if (_in_transaction)
@@ -740,7 +766,7 @@ int ThreadRunner::op_run(Operation *op) {
op->_transaction->_begin_config.c_str());
_in_transaction = true;
}
- if (op->_optype != Operation::OP_NONE) {
+ if (op->is_table_op()) {
op->kv_gen(this, true, 100, recno, _keybuf);
cursor->set_key(cursor, _keybuf);
if (OP_HAS_VALUE(op)) {
@@ -771,15 +797,18 @@ int ThreadRunner::op_run(Operation *op) {
ret = 0; // WT_NOTFOUND allowed.
}
cursor->reset(cursor);
- }
+ } else
+ WT_ERR(op->_internal->run(this, _session));
+
if (measure_latency) {
timespec stop;
workgen_epoch(&stop);
- track->incr_with_latency(ts_us(stop - start));
+ track->complete_with_latency(ts_us(stop - start));
} else if (track != NULL)
- track->incr();
+ track->complete();
if (op->_group != NULL)
+ VERBOSE(*this, "GROUP operation " << op->_repeatgroup << " times");
for (int count = 0; !_stop && count < op->_repeatgroup; count++)
for (std::vector<Operation>::iterator i = op->_group->begin();
i != op->_group->end(); i++)
@@ -932,45 +961,51 @@ void Thread::describe(std::ostream &os) const {
}
Operation::Operation() :
- _optype(OP_NONE), _table(), _key(), _value(), _config(), _transaction(NULL),
- _group(NULL), _repeatgroup(0), _flags(0),
- _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ _optype(OP_NONE), _internal(NULL), _table(), _key(), _value(), _config(),
+ _transaction(NULL), _group(NULL), _repeatgroup(0) {
+ init_internal(NULL);
}
Operation::Operation(OpType optype, Table table, Key key, Value value) :
- _optype(optype), _table(table), _key(key), _value(value), _config(),
- _transaction(NULL), _group(NULL), _repeatgroup(0), _flags(0),
- _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ _optype(optype), _internal(NULL), _table(table), _key(key), _value(value),
+ _config(), _transaction(NULL), _group(NULL), _repeatgroup(0) {
+ init_internal(NULL);
size_check();
}
Operation::Operation(OpType optype, Table table, Key key) :
- _optype(optype), _table(table), _key(key), _value(), _config(),
- _transaction(NULL), _group(NULL), _repeatgroup(0), _flags(0),
- _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ _optype(optype), _internal(NULL), _table(table), _key(key), _value(),
+ _config(), _transaction(NULL), _group(NULL), _repeatgroup(0) {
+ init_internal(NULL);
size_check();
}
Operation::Operation(OpType optype, Table table) :
- _optype(optype), _table(table), _key(), _value(), _config(),
- _transaction(NULL), _group(NULL), _repeatgroup(0), _flags(0),
- _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) {
+ _optype(optype), _internal(NULL), _table(table), _key(), _value(),
+ _config(), _transaction(NULL), _group(NULL), _repeatgroup(0) {
+ init_internal(NULL);
size_check();
}
Operation::Operation(const Operation &other) :
- _optype(other._optype), _table(other._table), _key(other._key),
- _value(other._value), _config(other._config),
+ _optype(other._optype), _internal(NULL), _table(other._table),
+ _key(other._key), _value(other._value), _config(other._config),
_transaction(other._transaction), _group(other._group),
- _repeatgroup(other._repeatgroup), _flags(other._flags),
- _keysize(other._keysize), _valuesize(other._valuesize),
- _keymax(other._keymax), _valuemax(other._valuemax) {
+ _repeatgroup(other._repeatgroup) {
// Creation and destruction of _group and _transaction is managed
// by Python.
+ init_internal(other._internal);
+}
+
+Operation::Operation(OpType optype, const char *config) :
+ _optype(optype), _internal(NULL), _table(), _key(), _value(),
+ _config(config), _transaction(NULL), _group(NULL), _repeatgroup(0) {
+ init_internal(NULL);
}
Operation::~Operation() {
// Creation and destruction of _group, _transaction is managed by Python.
+ delete _internal;
}
Operation& Operation::operator=(const Operation &other) {
@@ -981,28 +1016,62 @@ Operation& Operation::operator=(const Operation &other) {
_transaction = other._transaction;
_group = other._group;
_repeatgroup = other._repeatgroup;
- _keysize = other._keysize;
- _valuesize = other._valuesize;
- _keymax = other._keymax;
- _valuemax = other._valuemax;
+ delete _internal;
+ _internal = NULL;
+ init_internal(other._internal);
return (*this);
}
-void Operation::create_all() {
- size_check();
+void Operation::init_internal(OperationInternal *other) {
+ ASSERT(_internal == NULL);
- _flags = 0;
- if (!_config.empty()) {
- if (_config == "reopen")
- _flags |= WORKGEN_OP_REOPEN;
+ switch (_optype) {
+ case OP_CHECKPOINT:
+ if (other == NULL)
+ _internal = new CheckpointOperationInternal();
+ else
+ _internal = new CheckpointOperationInternal(
+ *(CheckpointOperationInternal *)other);
+ break;
+ case OP_INSERT:
+ case OP_REMOVE:
+ case OP_SEARCH:
+ case OP_UPDATE:
+ if (other == NULL)
+ _internal = new TableOperationInternal();
+ else
+ _internal = new TableOperationInternal(
+ *(TableOperationInternal *)other);
+ break;
+ case OP_NONE:
+ case OP_NOOP:
+ if (other == NULL)
+ _internal = new OperationInternal();
+ else
+ _internal = new OperationInternal(*other);
+ break;
+ case OP_SLEEP:
+ if (other == NULL)
+ _internal = new SleepOperationInternal();
else
- THROW("operation has illegal config: \"" << _config << "\"");
+ _internal = new SleepOperationInternal(
+ *(SleepOperationInternal *)other);
+ break;
+ default:
+ ASSERT(false);
}
}
+void Operation::create_all() {
+ size_check();
+
+ _internal->_flags = 0;
+ _internal->parse_config(_config);
+}
+
void Operation::describe(std::ostream &os) const {
os << "Operation: " << _optype;
- if (_optype != OP_NONE) {
+ if (is_table_op()) {
os << ", "; _table.describe(os);
os << ", "; _key.describe(os);
os << ", "; _value.describe(os);
@@ -1029,33 +1098,44 @@ void Operation::describe(std::ostream &os) const {
}
void Operation::get_static_counts(Stats &stats, int multiplier) {
- switch (_optype) {
- case OP_NONE:
- break;
- case OP_INSERT:
- stats.insert.ops += multiplier;
- break;
- case OP_REMOVE:
- stats.remove.ops += multiplier;
- break;
- case OP_SEARCH:
- stats.read.ops += multiplier;
- break;
- case OP_UPDATE:
- stats.update.ops += multiplier;
- break;
- default:
- ASSERT(false);
- }
+ if (is_table_op())
+ switch (_optype) {
+ case OP_INSERT:
+ stats.insert.ops += multiplier;
+ break;
+ case OP_REMOVE:
+ stats.remove.ops += multiplier;
+ break;
+ case OP_SEARCH:
+ stats.read.ops += multiplier;
+ break;
+ case OP_UPDATE:
+ stats.update.ops += multiplier;
+ break;
+ default:
+ ASSERT(false);
+ }
+ else if (_optype == OP_CHECKPOINT)
+ stats.checkpoint.ops += multiplier;
+
if (_group != NULL)
for (std::vector<Operation>::iterator i = _group->begin();
i != _group->end(); i++)
i->get_static_counts(stats, multiplier * _repeatgroup);
}
+bool Operation::is_table_op() const {
+ return (_optype == OP_INSERT || _optype == OP_REMOVE ||
+ _optype == OP_SEARCH || _optype == OP_UPDATE);
+}
+
void Operation::kv_compute_max(bool iskey, bool has_random) {
uint64_t max;
int size;
+ TableOperationInternal *internal;
+
+ ASSERT(is_table_op());
+ internal = (TableOperationInternal *)_internal;
size = iskey ? _key._size : _value._size;
if (size == 0)
@@ -1076,31 +1156,40 @@ void Operation::kv_compute_max(bool iskey, bool has_random) {
max = 0;
if (iskey) {
- _keysize = size;
- _keymax = max;
+ internal->_keysize = size;
+ internal->_keymax = max;
} else {
- _valuesize = size;
- _valuemax = max;
+ internal->_valuesize = size;
+ internal->_valuemax = max;
}
}
void Operation::kv_size_buffer(bool iskey, size_t &maxsize) const {
+ TableOperationInternal *internal;
+
+ ASSERT(is_table_op());
+ internal = (TableOperationInternal *)_internal;
+
if (iskey) {
- if ((size_t)_keysize > maxsize)
- maxsize = _keysize;
+ if ((size_t)internal->_keysize > maxsize)
+ maxsize = internal->_keysize;
} else {
- if ((size_t)_valuesize > maxsize)
- maxsize = _valuesize;
+ if ((size_t)internal->_valuesize > maxsize)
+ maxsize = internal->_valuesize;
}
}
void Operation::kv_gen(ThreadRunner *runner, bool iskey,
uint64_t compressibility, uint64_t n, char *result) const {
- uint64_t max;
- int size;
+ TableOperationInternal *internal;
+ uint_t max;
+ uint_t size;
- size = iskey ? _keysize : _valuesize;
- max = iskey ? _keymax : _valuemax;
+ ASSERT(is_table_op());
+ internal = (TableOperationInternal *)_internal;
+
+ size = iskey ? internal->_keysize : internal->_valuesize;
+ max = iskey ? internal->_keymax : internal->_valuemax;
if (n > max)
THROW((iskey ? "Key" : "Value") << " (" << n
<< ") too large for size (" << size << ")");
@@ -1124,13 +1213,13 @@ void Operation::kv_gen(ThreadRunner *runner, bool iskey,
* That means that 75% of the string will be random numbers, and 25
* will be easily compressible zero-fill.
*/
- uint64_t random_len = size - ((size * compressibility) / 100);
+ uint_t random_len = size - ((size * compressibility) / 100);
/* Never overwrite the record number identifier */
if (random_len > size - 20)
random_len = size - 20;
- for (int i = 0; i < random_len; ++i)
+ for (uint64_t i = 0; i < random_len; ++i)
/*
* TODO: It'd be nice to use workgen_rand here, but this class
* is without the context of a runner thread, so it's not easy
@@ -1141,22 +1230,60 @@ void Operation::kv_gen(ThreadRunner *runner, bool iskey,
}
void Operation::size_check() const {
- if (_optype != OP_NONE && _key._size == 0 && _table.options.key_size == 0)
- THROW("operation requires a key size");
- if (OP_HAS_VALUE(this) && _value._size == 0 &&
- _table.options.value_size == 0)
- THROW("operation requires a value size");
+ if (is_table_op()) {
+ if (_key._size == 0 && _table.options.key_size == 0)
+ THROW("operation requires a key size");
+ if (OP_HAS_VALUE(this) && _value._size == 0 &&
+ _table.options.value_size == 0)
+ THROW("operation requires a value size");
+ }
}
-Track::Track(bool latency_tracking) : ops(0), latency_ops(0), latency(0),
- min_latency(0), max_latency(0), us(NULL), ms(NULL), sec(NULL) {
- track_latency(latency_tracking);
+int CheckpointOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
+{
+ return (session->checkpoint(session, NULL));
+}
+
+void SleepOperationInternal::parse_config(const std::string &config)
+{
+ int amount = 0;
+ const char *configp;
+ char *endp;
+
+ configp = config.c_str();
+ _sleepvalue = strtod(configp, &endp);
+ if (configp == endp || *endp != '\0' || _sleepvalue < 0.0)
+ THROW("sleep operation requires a configuration string as "
+ "a non-negative float, e.g. '1.5'");
+}
+
+int SleepOperationInternal::run(ThreadRunner *runner, WT_SESSION *session)
+{
+ (void)session; /* not used */
+ sleep(_sleepvalue);
+ return (0);
}
-Track::Track(const Track &other) : ops(other.ops),
- latency_ops(other.latency_ops), latency(other.latency),
- min_latency(other.min_latency), max_latency(other.max_latency),
+void TableOperationInternal::parse_config(const std::string &config)
+{
+ if (!config.empty()) {
+ if (config == "reopen")
+ _flags |= WORKGEN_OP_REOPEN;
+ else
+ THROW("table operation has illegal config: \"" << config << "\"");
+ }
+}
+
+Track::Track(bool latency_tracking) : ops_in_progress(0), ops(0),
+ latency_ops(0), latency(0), bucket_ops(0), min_latency(0), max_latency(0),
us(NULL), ms(NULL), sec(NULL) {
+ track_latency(latency_tracking);
+}
+
+Track::Track(const Track &other) : ops_in_progress(other.ops_in_progress),
+ ops(other.ops), latency_ops(other.latency_ops), latency(other.latency),
+ bucket_ops(other.bucket_ops), min_latency(other.min_latency),
+ max_latency(other.max_latency), us(NULL), ms(NULL), sec(NULL) {
if (other.us != NULL) {
us = new uint32_t[LATENCY_US_BUCKETS];
ms = new uint32_t[LATENCY_MS_BUCKETS];
@@ -1176,6 +1303,7 @@ Track::~Track() {
}
void Track::add(Track &other, bool reset) {
+ ops_in_progress += other.ops_in_progress;
ops += other.ops;
latency_ops += other.latency_ops;
latency += other.latency;
@@ -1198,6 +1326,7 @@ void Track::add(Track &other, bool reset) {
}
void Track::assign(const Track &other) {
+ ops_in_progress = other.ops_in_progress;
ops = other.ops;
latency_ops = other.latency_ops;
latency = other.latency;
@@ -1231,10 +1360,16 @@ uint64_t Track::average_latency() const {
return (latency / latency_ops);
}
+void Track::begin() {
+ ops_in_progress++;
+}
+
void Track::clear() {
+ ops_in_progress = 0;
ops = 0;
latency_ops = 0;
latency = 0;
+ bucket_ops = 0;
min_latency = 0;
max_latency = 0;
if (us != NULL) {
@@ -1244,13 +1379,15 @@ void Track::clear() {
}
}
-void Track::incr() {
+void Track::complete() {
+ --ops_in_progress;
ops++;
}
-void Track::incr_with_latency(uint64_t usecs) {
+void Track::complete_with_latency(uint64_t usecs) {
ASSERT(us != NULL);
+ --ops_in_progress;
ops++;
latency_ops++;
latency += usecs;
@@ -1277,7 +1414,51 @@ void Track::incr_with_latency(uint64_t usecs) {
sec[LATENCY_SEC_BUCKETS - 1]++;
}
+// Return the latency for which the given percent is lower than it.
+// E.g. for percent == 95, returns the latency for which 95% of latencies
+// are faster (lower), and 5% are slower (higher).
+uint64_t Track::percentile_latency(int percent) const {
+ // Get the total number of operations in the latency buckets.
+ // We can't reliably use latency_ops, because this struct was
+ // added up from Track structures that were being copied while
+ // being updated.
+ uint64_t total = 0;
+ for (int i = 0; i < LATENCY_SEC_BUCKETS; i++)
+ total += sec[i];
+ for (int i = 0; i < LATENCY_MS_BUCKETS; i++)
+ total += ms[i];
+ for (int i = 0; i < LATENCY_US_BUCKETS; i++)
+ total += us[i];
+ if (total == 0)
+ return (0);
+
+ // optimized for percent values over 50, we start counting from above.
+ uint64_t n = 0;
+ uint64_t k = (100 - percent) * total / 100;
+ if (k == 0)
+ return (0);
+ for (int i = LATENCY_SEC_BUCKETS - 1; i >= 0; --i) {
+ n += sec[i];
+ if (n >= k)
+ return (sec_to_us(i));
+ }
+ for (int i = LATENCY_MS_BUCKETS - 1; i >= 0; --i) {
+ n += ms[i];
+ if (n >= k)
+ return (ms_to_us(i));
+ }
+ for (int i = LATENCY_US_BUCKETS - 1; i >= 0; --i) {
+ n += us[i];
+ if (n >= k)
+ return (100 * i);
+ }
+ // We should have accounted for all the buckets.
+ ASSERT(false);
+ return (0);
+}
+
void Track::subtract(const Track &other) {
+ ops_in_progress -= other.ops_in_progress;
ops -= other.ops;
latency_ops -= other.latency_ops;
latency -= other.latency;
@@ -1294,19 +1475,6 @@ void Track::subtract(const Track &other) {
}
}
-// If there are no entries in this Track, take them from
-// a previous Track. Used to smooth graphs. We don't worry
-// about latency buckets here.
-void Track::smooth(const Track &other) {
- if (latency_ops == 0) {
- ops = other.ops;
- latency = other.latency;
- latency_ops = other.latency_ops;
- min_latency = other.min_latency;
- max_latency = other.max_latency;
- }
-}
-
void Track::track_latency(bool newval) {
if (newval) {
if (us == NULL) {
@@ -1351,18 +1519,20 @@ void Track::_get_sec(long *result) {
memset(result, 0, sizeof(long) * LATENCY_SEC_BUCKETS);
}
-Stats::Stats(bool latency) : insert(latency), not_found(latency),
- read(latency), remove(latency), update(latency), truncate(latency) {
+Stats::Stats(bool latency) : checkpoint(latency), insert(latency),
+ not_found(latency), read(latency), remove(latency), update(latency),
+ truncate(latency) {
}
-Stats::Stats(const Stats &other) : insert(other.insert),
- not_found(other.not_found), read(other.read), remove(other.remove),
- update(other.update), truncate(other.truncate) {
+Stats::Stats(const Stats &other) : checkpoint(other.checkpoint),
+ insert(other.insert), not_found(other.not_found), read(other.read),
+ remove(other.remove), update(other.update), truncate(other.truncate) {
}
Stats::~Stats() {}
void Stats::add(Stats &other, bool reset) {
+ checkpoint.add(other.checkpoint, reset);
insert.add(other.insert, reset);
not_found.add(other.not_found, reset);
read.add(other.read, reset);
@@ -1372,6 +1542,7 @@ void Stats::add(Stats &other, bool reset) {
}
void Stats::assign(const Stats &other) {
+ checkpoint.assign(other.checkpoint);
insert.assign(other.insert);
not_found.assign(other.not_found);
read.assign(other.read);
@@ -1381,6 +1552,7 @@ void Stats::assign(const Stats &other) {
}
void Stats::clear() {
+ checkpoint.clear();
insert.clear();
not_found.clear();
read.clear();
@@ -1398,10 +1570,12 @@ void Stats::describe(std::ostream &os) const {
os << ", updates " << update.ops;
os << ", truncates " << truncate.ops;
os << ", removes " << remove.ops;
+ os << ", checkpoints " << checkpoint.ops;
}
void Stats::final_report(std::ostream &os, timespec &totalsecs) const {
uint64_t ops = 0;
+ ops += checkpoint.ops;
ops += read.ops;
ops += not_found.ops;
ops += insert.ops;
@@ -1420,6 +1594,7 @@ void Stats::final_report(std::ostream &os, timespec &totalsecs) const {
FINAL_OUTPUT(os, update.ops, update, ops, totalsecs);
FINAL_OUTPUT(os, truncate.ops, truncate, ops, totalsecs);
FINAL_OUTPUT(os, remove.ops, remove, ops, totalsecs);
+ FINAL_OUTPUT(os, checkpoint.ops, checkpoint, ops, totalsecs);
}
void Stats::report(std::ostream &os) const {
@@ -1430,19 +1605,12 @@ void Stats::report(std::ostream &os) const {
os << ", " << insert.ops << " inserts, ";
os << update.ops << " updates, ";
os << truncate.ops << " truncates, ";
- os << remove.ops << " removes";
-}
-
-void Stats::smooth(const Stats &other) {
- insert.smooth(other.insert);
- not_found.smooth(other.not_found);
- read.smooth(other.read);
- remove.smooth(other.remove);
- update.smooth(other.update);
- truncate.smooth(other.truncate);
+ os << remove.ops << " removes, ";
+ os << checkpoint.ops << " checkpoints";
}
void Stats::subtract(const Stats &other) {
+ checkpoint.subtract(other.checkpoint);
insert.subtract(other.insert);
not_found.subtract(other.not_found);
read.subtract(other.read);
@@ -1452,6 +1620,7 @@ void Stats::subtract(const Stats &other) {
}
void Stats::track_latency(bool latency) {
+ checkpoint.track_latency(latency);
insert.track_latency(latency);
not_found.track_latency(latency);
read.track_latency(latency);
@@ -1507,7 +1676,7 @@ TableInternal::~TableInternal() {}
WorkloadOptions::WorkloadOptions() : max_latency(0),
report_file("workload.stat"), report_interval(0), run_time(0),
- sample_file("sample.json"), sample_interval(0), sample_rate(1), warmup(0),
+ sample_file("monitor.json"), sample_interval(0), sample_rate(1), warmup(0),
_options() {
_options.add_int("max_latency", max_latency,
"prints warning if any latency measured exceeds this number of "
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.h b/src/third_party/wiredtiger/bench/workgen/workgen.h
index 7de03a90f17..cc93409b388 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen.h
+++ b/src/third_party/wiredtiger/bench/workgen/workgen.h
@@ -30,6 +30,9 @@
#include <vector>
#include <map>
+// For convenience: A type exposed to Python that cannot be negative.
+typedef unsigned int uint_t;
+
namespace workgen {
struct ContextInternal;
@@ -81,9 +84,11 @@ struct Track {
// Threads maintain the total thread operation and total latency they've
// experienced.
- uint64_t ops; // Total operations */
+ uint64_t ops_in_progress; // Total operations not completed */
+ uint64_t ops; // Total operations completed */
uint64_t latency_ops; // Total ops sampled for latency
uint64_t latency; // Total latency */
+ uint64_t bucket_ops; // Computed for percentile_latency
// Minimum/maximum latency, shared with the monitor thread, that is, the
// monitor thread clears it so it's recalculated again for each period.
@@ -98,10 +103,11 @@ struct Track {
void add(Track&, bool reset = false);
void assign(const Track&);
uint64_t average_latency() const;
+ void begin();
void clear();
- void incr();
- void incr_with_latency(uint64_t usecs);
- void smooth(const Track&);
+ void complete();
+ void complete_with_latency(uint64_t usecs);
+ uint64_t percentile_latency(int percent) const;
void subtract(const Track&);
void track_latency(bool);
bool track_latency() const { return (us != NULL); }
@@ -120,6 +126,7 @@ private:
};
struct Stats {
+ Track checkpoint;
Track insert;
Track not_found;
Track read;
@@ -139,7 +146,6 @@ struct Stats {
void final_report(std::ostream &os, timespec &totalsecs) const;
void report(std::ostream &os) const;
#endif
- void smooth(const Stats&);
void subtract(const Stats&);
void track_latency(bool);
bool track_latency() const { return (insert.track_latency()); }
@@ -170,11 +176,11 @@ struct Context {
// properties are prevented, only existing properties can be set.
//
struct TableOptions {
- int key_size;
- int value_size;
- uint64_t value_compressibility;
+ uint_t key_size;
+ uint_t value_size;
+ uint_t value_compressibility;
bool random_value;
- int range;
+ uint_t range;
TableOptions();
TableOptions(const TableOptions &other);
@@ -276,8 +282,10 @@ struct Value {
struct Operation {
enum OpType {
- OP_NONE, OP_INSERT, OP_REMOVE, OP_SEARCH, OP_UPDATE };
+ OP_CHECKPOINT, OP_INSERT, OP_NONE, OP_NOOP, OP_REMOVE, OP_SEARCH,
+ OP_SLEEP, OP_UPDATE };
OpType _optype;
+ OperationInternal *_internal;
Table _table;
Key _key;
@@ -287,28 +295,22 @@ struct Operation {
std::vector<Operation> *_group;
int _repeatgroup;
-#ifndef SWIG
-#define WORKGEN_OP_REOPEN 0x0001 // reopen cursor for each op
- uint32_t _flags;
-
- int _keysize; // derived from Key._size and Table.options.key_size
- int _valuesize;
- uint64_t _keymax;
- uint64_t _valuemax;
-#endif
-
Operation();
Operation(OpType optype, Table table, Key key, Value value);
Operation(OpType optype, Table table, Key key);
Operation(OpType optype, Table table);
+ // Constructor with string applies to NOOP, SLEEP, CHECKPOINT
+ Operation(OpType optype, const char *config);
Operation(const Operation &other);
~Operation();
void describe(std::ostream &os) const;
#ifndef SWIG
Operation& operator=(const Operation &other);
+ void init_internal(OperationInternal *other);
void create_all();
void get_static_counts(Stats &stats, int multiplier);
+ bool is_table_op() const;
void kv_compute_max(bool iskey, bool has_random);
void kv_gen(ThreadRunner *runner, bool iskey, uint64_t compressibility,
uint64_t n, char *result) const;
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen_int.h b/src/third_party/wiredtiger/bench/workgen/workgen_int.h
index c38f709efa1..dbcde8472b5 100644
--- a/src/third_party/wiredtiger/bench/workgen/workgen_int.h
+++ b/src/third_party/wiredtiger/bench/workgen/workgen_int.h
@@ -176,6 +176,48 @@ struct ContextInternal {
int create_all();
};
+struct OperationInternal {
+#define WORKGEN_OP_REOPEN 0x0001 // reopen cursor for each op
+ uint32_t _flags;
+
+ OperationInternal() : _flags(0) {}
+ OperationInternal(const OperationInternal &other) : _flags(other._flags) {}
+ virtual ~OperationInternal() {}
+ virtual void parse_config(const std::string &config) {}
+ virtual int run(ThreadRunner *runner, WT_SESSION *session) {
+ (void)runner; (void)session; return (0); }
+};
+
+struct CheckpointOperationInternal : OperationInternal {
+ CheckpointOperationInternal() : OperationInternal() {}
+ CheckpointOperationInternal(const CheckpointOperationInternal &other) {}
+ virtual int run(ThreadRunner *runner, WT_SESSION *session);
+};
+
+struct TableOperationInternal : OperationInternal {
+ uint_t _keysize; // derived from Key._size and Table.options.key_size
+ uint_t _valuesize;
+ uint_t _keymax;
+ uint_t _valuemax;
+
+ TableOperationInternal() : OperationInternal(), _keysize(0), _valuesize(0),
+ _keymax(0),_valuemax(0) {}
+ TableOperationInternal(const TableOperationInternal &other) :
+ _keysize(other._keysize), _valuesize(other._valuesize),
+ _keymax(other._keymax), _valuemax(other._valuemax) {}
+ virtual void parse_config(const std::string &config);
+};
+
+struct SleepOperationInternal : OperationInternal {
+ float _sleepvalue;
+
+ SleepOperationInternal() : OperationInternal(), _sleepvalue(0) {}
+ SleepOperationInternal(const SleepOperationInternal &other) :
+ _sleepvalue(other._sleepvalue) {}
+ virtual void parse_config(const std::string &config);
+ virtual int run(ThreadRunner *runner, WT_SESSION *session);
+};
+
struct TableInternal {
tint_t _tint;
uint32_t _context_count;
diff --git a/src/third_party/wiredtiger/bench/workgen/wtperf.py b/src/third_party/wiredtiger/bench/workgen/wtperf.py
index 9da8c37fd3a..b059b31f8db 100644..100755
--- a/src/third_party/wiredtiger/bench/workgen/wtperf.py
+++ b/src/third_party/wiredtiger/bench/workgen/wtperf.py
@@ -76,7 +76,8 @@ class Translator:
self.error(msg)
raise TranslateException(errtype)
- supported_opt_list = [ 'close_conn', 'compression', 'compact',
+ supported_opt_list = [ 'checkpoint_interval', 'checkpoint_threads',
+ 'close_conn', 'compact', 'compression',
'conn_config', 'create', 'icount',
'key_sz', 'log_like_table', 'pareto',
'populate_ops_per_txn', 'populate_threads',
@@ -219,13 +220,14 @@ class Translator:
str(factor) + ' to compensate for log_like operations.\n'
return (new_throttle, comment)
- def parse_threads(self, threads_config):
+ def parse_threads(self, threads_config, checkpoint_threads):
opts = self.options
tdecls = ''
tlist = self.split_config_parens(threads_config)
table_count = self.get_int_opt('table_count', 1)
log_like_table = self.get_boolean_opt('log_like_table', False)
txn_config = self.get_string_opt('transaction_config', '')
+ checkpoint_interval = self.get_int_opt('checkpoint_interval', 120)
run_ops = self.get_int_opt('run_ops', -1)
if log_like_table:
tdecls += 'log_name = "table:log"\n'
@@ -317,6 +319,18 @@ class Translator:
tnames += str(topts.count) + ' * '
tnames += thread_name + ' + '
+ if checkpoint_threads != 0:
+ thread_name = 'checkpoint_thread'
+
+ tdecls += 'ops = Operation(Operation.OP_SLEEP, "' + \
+ str(checkpoint_interval) + \
+ '") + \\\n Operation(Operation.OP_CHECKPOINT, "")\n'
+ tdecls += thread_name + ' = Thread(ops)\n'
+ tdecls += '\n'
+ if checkpoint_threads > 1:
+ tnames += str(checkpoint_threads) + ' * '
+ tnames += thread_name + ' + '
+
tnames = tnames.rstrip(' +')
return (tdecls, tnames)
@@ -552,8 +566,10 @@ class Translator:
s += self.translate_populate()
thread_config = self.get_string_opt('threads', '')
- if thread_config != '':
- (t_create, t_var) = self.parse_threads(thread_config)
+ checkpoint_threads = self.get_int_opt('checkpoint_threads', 0)
+ if thread_config != '' or checkpoint_threads != 0:
+ (t_create, t_var) = self.parse_threads(thread_config,
+ checkpoint_threads)
s += '\n' + t_create
if reopen_connection:
s += '\n# reopen the connection\n'
diff --git a/src/third_party/wiredtiger/bench/wtperf/runners/checkpoint-latency.wtperf b/src/third_party/wiredtiger/bench/wtperf/runners/checkpoint-latency.wtperf
new file mode 100644
index 00000000000..80a579cc818
--- /dev/null
+++ b/src/third_party/wiredtiger/bench/wtperf/runners/checkpoint-latency.wtperf
@@ -0,0 +1,25 @@
+# A stress configuration to create checkpoints while doing a mix of inserts
+# and reads.
+conn_config="cache_size=1200MB,eviction=(threads_max=8),log=(enabled=false)"
+table_config="leaf_page_max=32k,internal_page_max=16k,allocation_size=4k,split_pct=90,type=file"
+# Enough data for 10x cache. 200k records sized 60k = 12G
+# tables
+checkpoint_interval=60
+checkpoint_threads=1
+create=true
+close_conn=false
+icount=200000
+log_like_table=true
+populate_threads=4
+report_interval=1
+# Run for a longer duration to ensure checkpoints are completing.
+run_time=1200
+sample_interval=1
+sample_rate=1
+# MongoDB always has multiple tables, and checkpoints behave differently when
+# there is more than a single table.
+table_count=10
+threads=((count=8,inserts=1,throttle=125),(count=8,reads=1,throttle=500))
+value_sz=60000
+# Wait for the throughput to stabilize
+warmup=120
diff --git a/src/third_party/wiredtiger/dist/s_string.ok b/src/third_party/wiredtiger/dist/s_string.ok
index ed5f27cdf11..247ad261085 100644
--- a/src/third_party/wiredtiger/dist/s_string.ok
+++ b/src/third_party/wiredtiger/dist/s_string.ok
@@ -265,6 +265,7 @@ NEEDVALUE
NOLL
NOLOCK
NONINFRINGEMENT
+NOOP
NOTFOUND
NOTREACHED
NOVALUE
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data
index 553ff5a7b55..813d52dea11 100644
--- a/src/third_party/wiredtiger/import.data
+++ b/src/third_party/wiredtiger/import.data
@@ -1,5 +1,5 @@
{
- "commit": "e7d742daa2d2500cd94a7061f754a1d0c4aa963c",
+ "commit": "d235e0e71ef84c3f9d5a870f08feeff9a7c5581e",
"github": "wiredtiger/wiredtiger.git",
"vendor": "wiredtiger",
"branch": "mongodb-4.2"
diff --git a/src/third_party/wiredtiger/src/btree/bt_cursor.c b/src/third_party/wiredtiger/src/btree/bt_cursor.c
index aa2aa24a7a9..24264ca44a6 100644
--- a/src/third_party/wiredtiger/src/btree/bt_cursor.c
+++ b/src/third_party/wiredtiger/src/btree/bt_cursor.c
@@ -1014,52 +1014,27 @@ err: if (ret == WT_RESTART) {
* Remove a record from the tree.
*/
int
-__wt_btcur_remove(WT_CURSOR_BTREE *cbt)
+__wt_btcur_remove(WT_CURSOR_BTREE *cbt, bool positioned)
{
- enum { NO_POSITION, POSITIONED, SEARCH_POSITION } positioned;
WT_BTREE *btree;
WT_CURFILE_STATE state;
WT_CURSOR *cursor;
WT_DECL_RET;
WT_SESSION_IMPL *session;
uint64_t yield_count, sleep_usecs;
- bool iterating, valid;
+ bool iterating, searched, valid;
btree = cbt->btree;
cursor = &cbt->iface;
session = (WT_SESSION_IMPL *)cursor->session;
yield_count = sleep_usecs = 0;
iterating = F_ISSET(cbt, WT_CBT_ITERATE_NEXT | WT_CBT_ITERATE_PREV);
+ searched = false;
WT_STAT_CONN_INCR(session, cursor_remove);
WT_STAT_DATA_INCR(session, cursor_remove);
WT_STAT_DATA_INCRV(session, cursor_remove_bytes, cursor->key.size);
- /*
- * WT_CURSOR.remove has a unique semantic, the cursor stays positioned
- * if it starts positioned, otherwise clear the cursor on completion.
- *
- * However, if we unpin the page (because the page is in WT_REF_LIMBO or
- * it was selected for forcible eviction), and every item on the page is
- * deleted, eviction can delete the page and our subsequent search will
- * re-instantiate an empty page for us, with no key/value pairs. Cursor
- * remove will search that page and return not-found, which is OK unless
- * cursor-overwrite is configured (which causes cursor remove to return
- * success even if there's no item to delete). In that case, we're
- * supposed to return a positioned cursor, but there's nothing to which
- * we can position, and we'll fail attempting to point the cursor at the
- * key on the page to satisfy the positioned requirement.
- *
- * Do the best we can: If we start with a positioned cursor, and we let
- * go of our pinned page, reset our state to use the search position,
- * that is, use a successful search to return to a "positioned" state.
- * If we start with a positioned cursor, let go of our pinned page, and
- * the search fails, leave the cursor's key set so the cursor appears
- * positioned to the application.
- */
- positioned =
- F_ISSET(cursor, WT_CURSTD_KEY_INT) ? POSITIONED : NO_POSITION;
-
/* Save the cursor state. */
__cursor_state_save(cursor, &state);
@@ -1103,36 +1078,41 @@ __wt_btcur_remove(WT_CURSOR_BTREE *cbt)
goto err;
}
- /*
- * The pinned page goes away if we do a search, including as a result of
- * a restart. Get a local copy of any pinned key and re-save the cursor
- * state: we may retry but eventually fail.
- *
+retry: /*
* Note these steps must be repeatable, we'll continue to take this path
* as long as we encounter WT_RESTART.
+ *
+ * Any pinned page goes away if we do a search, including as a result of
+ * a restart. Get a local copy of any pinned key and re-save the cursor
+ * state: we may retry but eventually fail.
*/
-retry: if (positioned == POSITIONED)
- positioned = SEARCH_POSITION;
WT_ERR(__cursor_localkey(cursor));
__cursor_state_save(cursor, &state);
+ searched = true;
WT_ERR(__cursor_func_init(cbt, true));
if (btree->type == BTREE_ROW) {
- WT_ERR(__cursor_row_search(session, cbt, NULL, false));
+ ret = __cursor_row_search(session, cbt, NULL, false);
+ if (ret == WT_NOTFOUND)
+ goto search_notfound;
+ WT_ERR(ret);
/* Check whether an update would conflict. */
WT_ERR(__curfile_update_check(cbt));
if (cbt->compare != 0)
- WT_ERR(WT_NOTFOUND);
+ goto search_notfound;
WT_ERR(__wt_cursor_valid(cbt, NULL, &valid));
if (!valid)
- WT_ERR(WT_NOTFOUND);
+ goto search_notfound;
ret = __cursor_row_modify(session, cbt, WT_UPDATE_TOMBSTONE);
} else {
- WT_ERR(__cursor_col_search(session, cbt, NULL));
+ ret = __cursor_col_search(session, cbt, NULL);
+ if (ret == WT_NOTFOUND)
+ goto search_notfound;
+ WT_ERR(ret);
/*
* If we find a matching record, check whether an update would
@@ -1147,7 +1127,7 @@ retry: if (positioned == POSITIONED)
WT_ERR(__wt_cursor_valid(cbt, NULL, &valid));
if (cbt->compare != 0 || !valid) {
if (!__cursor_fix_implicit(btree, cbt))
- WT_ERR(WT_NOTFOUND);
+ goto search_notfound;
/*
* Creating a record past the end of the tree in a
* fixed-length column-store implicitly fills the
@@ -1170,58 +1150,57 @@ err: if (ret == WT_RESTART) {
}
if (ret == 0) {
-done: switch (positioned) {
- case NO_POSITION:
- /*
- * Never positioned and we leave it that way, clear any
- * key and reset the cursor.
- */
+ /*
+ * If positioned originally, but we had to do a search, acquire
+ * a position so we can return success.
+ *
+ * If not positioned originally, leave it that way, clear any
+ * key and reset the cursor.
+ */
+ if (positioned) {
+ if (searched)
+ WT_TRET(__wt_key_return(session, cbt));
+ } else {
F_CLR(cursor, WT_CURSTD_KEY_SET);
WT_TRET(__cursor_reset(cbt));
- break;
- case POSITIONED:
- /*
- * Positioned and we used the pinned page, leave the key
- * alone, whatever it is.
- */
- break;
- case SEARCH_POSITION:
- /*
- * Positioned and we did a search anyway, get a key to
- * return.
- */
- WT_TRET(__wt_key_return(session, cbt));
- break;
}
- }
- if (ret != 0) {
- WT_TRET(__cursor_reset(cbt));
- __cursor_state_restore(cursor, &state);
+ /*
+ * Check the return status again as we might have encountered an
+ * error setting the return key or resetting the cursor after an
+ * otherwise successful remove.
+ */
+ if (ret != 0) {
+ WT_TRET(__cursor_reset(cbt));
+ __cursor_state_restore(cursor, &state);
+ }
+ } else {
+ /*
+ * If the cursor is configured for overwrite and search returned
+ * not-found, that is what we want, try to return success. We
+ * can do that as long as it's not an iterating or positioned
+ * cursor. (Iterating or positioned cursors would have been
+ * forced to give up any pinned page, and when the search failed
+ * we've lost the cursor position. Since no subsequent iteration
+ * can succeed, we cannot return success.)
+ */
+ if (0) {
+search_notfound: ret = WT_NOTFOUND;
+ if (!iterating && !positioned &&
+ F_ISSET(cursor, WT_CURSTD_OVERWRITE))
+ ret = 0;
+ }
/*
- * If the record isn't found and the cursor is configured for
- * overwrite, that is what we want, try to return success.
- *
- * We set the return to 0 after testing for success, the clause
- * above dealing with the cursor position is only correct if we
- * were successful. If search failed after positioned is set to
- * SEARCH_POSITION, we cannot return a key. The only action to
- * take is to set the cursor to its original key, which we just
- * did.
- *
- * Finally, if an iterating or positioned cursor was forced to
- * give up its pinned page and then a search failed, we've
- * lost our cursor position. Since no subsequent iteration can
- * succeed, we cannot return success.
+ * Reset the cursor and restore the original cursor key: done
+ * after clearing the return value in the clause immediately
+ * above so we don't lose an error value if cursor reset fails.
*/
- if (ret == WT_NOTFOUND &&
- F_ISSET(cursor, WT_CURSTD_OVERWRITE) &&
- !iterating && positioned == NO_POSITION)
- ret = 0;
+ WT_TRET(__cursor_reset(cbt));
+ __cursor_state_restore(cursor, &state);
}
- /*
+done: /*
* Upper level cursor removes don't expect the cursor value to be set
* after a successful remove (and check in diagnostic mode). Error
* handling may have converted failure to a success, do a final check.
diff --git a/src/third_party/wiredtiger/src/btree/bt_read.c b/src/third_party/wiredtiger/src/btree/bt_read.c
index 0d0cf17762c..7229c87df04 100644
--- a/src/third_party/wiredtiger/src/btree/bt_read.c
+++ b/src/third_party/wiredtiger/src/btree/bt_read.c
@@ -113,7 +113,7 @@ __las_page_instantiate_verbose(WT_SESSION_IMPL *session, uint64_t las_pageid)
* Instantiate lookaside update records in a recently read page.
*/
static int
-__las_page_instantiate(WT_SESSION_IMPL *session, WT_REF *ref)
+__las_page_instantiate(WT_SESSION_IMPL *session, WT_REF *ref, bool *preparedp)
{
WT_CACHE *cache;
WT_CURSOR *cursor;
@@ -166,6 +166,7 @@ __las_page_instantiate(WT_SESSION_IMPL *session, WT_REF *ref)
__wt_readlock(session, &cache->las_sweepwalk_lock);
WT_PUBLISH(cache->las_reader, false);
locked = true;
+ *preparedp = false;
for (ret = __wt_las_cursor_position(cursor, las_pageid);
ret == 0;
ret = cursor->next(cursor)) {
@@ -188,6 +189,8 @@ __las_page_instantiate(WT_SESSION_IMPL *session, WT_REF *ref)
total_incr += incr;
upd->txnid = las_txnid;
upd->prepare_state = prepare_state;
+ if (prepare_state == WT_PREPARE_INPROGRESS)
+ *preparedp = true;
#ifdef HAVE_TIMESTAMPS
WT_ASSERT(session, las_timestamp.size == WT_TIMESTAMP_SIZE);
memcpy(&upd->timestamp, las_timestamp.data, las_timestamp.size);
@@ -374,8 +377,8 @@ __evict_force_check(WT_SESSION_IMPL *session, WT_REF *ref)
* page access.
*/
static inline int
-__page_read_lookaside(WT_SESSION_IMPL *session,
- WT_REF *ref, uint32_t previous_state, uint32_t *final_statep)
+__page_read_lookaside(WT_SESSION_IMPL *session, WT_REF *ref,
+ uint32_t previous_state, uint32_t *final_statep, bool *preparedp)
{
/*
* Reading a lookaside ref for the first time, and not requiring the
@@ -400,7 +403,7 @@ __page_read_lookaside(WT_SESSION_IMPL *session,
cache_read_lookaside_delay_checkpoint);
}
- WT_RET(__las_page_instantiate(session, ref));
+ WT_RET(__las_page_instantiate(session, ref, preparedp));
ref->page_las->eviction_to_lookaside = false;
return (0);
}
@@ -419,7 +422,7 @@ __page_read(WT_SESSION_IMPL *session, WT_REF *ref, uint32_t flags)
uint64_t time_start, time_stop;
uint32_t page_flags, final_state, new_state, previous_state;
const uint8_t *addr;
- bool timer;
+ bool prepared, timer;
time_start = time_stop = 0;
@@ -517,6 +520,7 @@ __page_read(WT_SESSION_IMPL *session, WT_REF *ref, uint32_t flags)
F_ISSET(ref->page->dsk, WT_PAGE_LAS_UPDATE));
skip_read:
+ prepared = false;
switch (previous_state) {
case WT_REF_DELETED:
/*
@@ -526,7 +530,7 @@ skip_read:
* then apply the delete.
*/
if (ref->page_las != NULL) {
- WT_ERR(__las_page_instantiate(session, ref));
+ WT_ERR(__las_page_instantiate(session, ref, &prepared));
ref->page_las->eviction_to_lookaside = false;
}
@@ -536,26 +540,26 @@ skip_read:
case WT_REF_LIMBO:
case WT_REF_LOOKASIDE:
WT_ERR(__page_read_lookaside(
- session, ref, previous_state, &final_state));
+ session, ref, previous_state, &final_state, &prepared));
break;
}
/*
* Once the page is instantiated, we no longer need the history in
* lookaside. We leave the lookaside sweep thread to do most cleanup,
- * but it can only remove keys that skew newest (if there are entries
- * in the lookaside newer than the page, they need to be read back into
- * cache or they will be lost).
+ * but it can only remove committed updates and keys that skew newest
+ * (if there are entries in the lookaside newer than the page, they need
+ * to be read back into cache or they will be lost).
*
- * There is no reason for the lookaside remove should fail, but ignore
- * it if for some reason it fails, we've got a valid page.
+ * Prepared updates can not be removed by the lookaside sweep, remove
+ * them as we read the page back in memory.
*
* Don't free WT_REF.page_las, there may be concurrent readers.
*/
if (final_state == WT_REF_MEM &&
- ref->page_las != NULL && !ref->page_las->skew_newest)
- WT_IGNORE_RET(__wt_las_remove_block(
- session, ref->page_las->las_pageid, false));
+ ref->page_las != NULL && (prepared || !ref->page_las->skew_newest))
+ WT_ERR(__wt_las_remove_block(
+ session, ref->page_las->las_pageid));
WT_PUBLISH(ref->state, final_state);
return (ret);
diff --git a/src/third_party/wiredtiger/src/cache/cache_las.c b/src/third_party/wiredtiger/src/cache/cache_las.c
index f99bb0bbd9d..3ba2f3b3b06 100644
--- a/src/third_party/wiredtiger/src/cache/cache_las.c
+++ b/src/third_party/wiredtiger/src/cache/cache_las.c
@@ -859,8 +859,7 @@ __wt_las_cursor_position(WT_CURSOR *cursor, uint64_t pageid)
* Remove all records for a given page from the lookaside table.
*/
int
-__wt_las_remove_block(
- WT_SESSION_IMPL *session, uint64_t pageid, bool lock_wait)
+__wt_las_remove_block(WT_SESSION_IMPL *session, uint64_t pageid)
{
WT_CONNECTION_IMPL *conn;
WT_CURSOR *cursor;
@@ -878,8 +877,7 @@ __wt_las_remove_block(
*/
__wt_las_cursor(session, &cursor, &session_flags);
- if ((ret = __las_remove_block(
- cursor, pageid, lock_wait, &remove_cnt)) == 0)
+ if ((ret = __las_remove_block(cursor, pageid, true, &remove_cnt)) == 0)
(void)__wt_atomic_add64(
&conn->cache->las_remove_count, remove_cnt);
diff --git a/src/third_party/wiredtiger/src/cursor/cur_file.c b/src/third_party/wiredtiger/src/cursor/cur_file.c
index 80093fceb02..e8922cd86a7 100644
--- a/src/third_party/wiredtiger/src/cursor/cur_file.c
+++ b/src/third_party/wiredtiger/src/cursor/cur_file.c
@@ -389,17 +389,32 @@ __curfile_remove(WT_CURSOR *cursor)
WT_DECL_RET;
WT_SESSION_IMPL *session;
uint64_t time_start, time_stop;
+ bool positioned;
+
+ /*
+ * WT_CURSOR.remove has a unique semantic, the cursor stays positioned
+ * if it starts positioned, otherwise clear the cursor on completion.
+ * Track if starting with a positioned cursor and pass that information
+ * into the underlying Btree remove function so it tries to maintain a
+ * position in the tree. This is complicated by the loop in this code
+ * that restarts operations if they return prepare-conflict or restart.
+ */
+ positioned = F_ISSET(cursor, WT_CURSTD_KEY_INT);
cbt = (WT_CURSOR_BTREE *)cursor;
CURSOR_REMOVE_API_CALL(cursor, session, cbt->btree);
WT_ERR(__cursor_checkkey(cursor));
time_start = __wt_clock(session);
- WT_ERR(__wt_btcur_remove(cbt));
+ WT_ERR(__wt_btcur_remove(cbt, positioned));
time_stop = __wt_clock(session);
__wt_stat_usecs_hist_incr_opwrite(session,
WT_CLOCKDIFF_US(time_stop, time_start));
+ /* If we've lost an initial position, we must fail. */
+ if (positioned && !F_ISSET(cursor, WT_CURSTD_KEY_INT))
+ WT_ERR(WT_ROLLBACK);
+
/*
* Remove with a search-key is fire-and-forget, no position and no key.
* Remove starting from a position maintains the position and a key,
diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h
index 05a2a2f3f52..9a614dc2c19 100644
--- a/src/third_party/wiredtiger/src/include/extern.h
+++ b/src/third_party/wiredtiger/src/include/extern.h
@@ -107,7 +107,7 @@ extern int __wt_btcur_search(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((w
extern int __wt_btcur_search_near(WT_CURSOR_BTREE *cbt, int *exactp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_btcur_insert(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_btcur_insert_check(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
-extern int __wt_btcur_remove(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
+extern int __wt_btcur_remove(WT_CURSOR_BTREE *cbt, bool positioned) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_btcur_modify(WT_CURSOR_BTREE *cbt, WT_MODIFY *entries, int nentries) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_btcur_reserve(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_btcur_update(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
@@ -213,7 +213,7 @@ extern bool __wt_las_page_skip_locked(WT_SESSION_IMPL *session, WT_REF *ref) WT_
extern bool __wt_las_page_skip(WT_SESSION_IMPL *session, WT_REF *ref) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_las_insert_block(WT_CURSOR *cursor, WT_BTREE *btree, WT_PAGE *page, WT_MULTI *multi, WT_ITEM *key) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_las_cursor_position(WT_CURSOR *cursor, uint64_t pageid) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
-extern int __wt_las_remove_block(WT_SESSION_IMPL *session, uint64_t pageid, bool lock_wait) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
+extern int __wt_las_remove_block(WT_SESSION_IMPL *session, uint64_t pageid) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_las_save_dropped(WT_SESSION_IMPL *session) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_las_sweep(WT_SESSION_IMPL *session) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern uint32_t __wt_checksum_sw(const void *chunk, size_t len);
diff --git a/src/third_party/wiredtiger/src/include/misc.h b/src/third_party/wiredtiger/src/include/misc.h
index 8c6af3ca14c..9e2108a6be2 100644
--- a/src/third_party/wiredtiger/src/include/misc.h
+++ b/src/third_party/wiredtiger/src/include/misc.h
@@ -390,10 +390,3 @@ union __wt_rand_state {
session, buf, (buf)->size + __len + 1)); \
} \
} while (0)
-
-/*
- * HAVE_LONG_RUNNING_PREPARE
- * To enable functionality of evicting prepared transactions using
- * cache overflow mechanism.
- */
-#undef HAVE_LONG_RUNNING_PREPARE
diff --git a/src/third_party/wiredtiger/src/include/txn.i b/src/third_party/wiredtiger/src/include/txn.i
index 0b7e9ae1aa2..81646af5217 100644
--- a/src/third_party/wiredtiger/src/include/txn.i
+++ b/src/third_party/wiredtiger/src/include/txn.i
@@ -249,7 +249,6 @@ __wt_txn_op_set_key(WT_SESSION_IMPL *session, const WT_ITEM *key)
WT_IS_METADATA(op->btree->dhandle))
return (0);
-#ifdef HAVE_LONG_RUNNING_PREPARE
WT_ASSERT(session, op->type == WT_TXN_OP_BASIC_ROW ||
op->type == WT_TXN_OP_INMEM_ROW);
@@ -263,10 +262,6 @@ __wt_txn_op_set_key(WT_SESSION_IMPL *session, const WT_ITEM *key)
* prepared.
*/
return (__wt_buf_set(session, &op->u.op_row.key, key->data, key->size));
-#else
- WT_UNUSED(key);
- return (0);
-#endif
}
/*
@@ -359,17 +354,25 @@ __wt_txn_resolve_prepared_op(
* case.
*/
WT_ASSERT(session, upd != NULL || txn->multi_update_count != 0);
- if (upd == NULL)
+
+ /*
+ * We track the update count only for commit, but not for rollback, as
+ * our tracking is based on transaction id, and in case of rollback, we
+ * set it to aborted.
+ */
+ if (upd == NULL && commit)
--txn->multi_update_count;
#endif
- op->u.op_upd = upd;
WT_STAT_CONN_INCR(session, txn_prepared_updates_resolved);
for (; upd != NULL; upd = upd->next) {
if (upd->txnid != txn->id)
continue;
+ if (op->u.op_upd == NULL)
+ op->u.op_upd = upd;
+
if (!commit) {
upd->txnid = WT_TXN_ABORTED;
continue;
diff --git a/src/third_party/wiredtiger/src/reconcile/rec_write.c b/src/third_party/wiredtiger/src/reconcile/rec_write.c
index 2b70db8443f..afb97d115fc 100644
--- a/src/third_party/wiredtiger/src/reconcile/rec_write.c
+++ b/src/third_party/wiredtiger/src/reconcile/rec_write.c
@@ -1352,21 +1352,6 @@ __rec_txn_read(WT_SESSION_IMPL *session, WT_RECONCILE *r,
!__txn_visible_id(session, txnid))
uncommitted = r->update_uncommitted = true;
- /*
- * TODO:
- * The following portion of code under #ifdef is there
- * to temporarily disable lookaside eviction of the
- * prepared updates. Once we have all the pieces put
- * together to enable the feature, remove this temporary
- * code.
- */
-#ifndef HAVE_LONG_RUNNING_PREPARE
- if (prepared) {
- prepared = false;
- uncommitted = r->update_uncommitted = true;
- }
-#endif
-
if (prepared || uncommitted)
continue;
}
@@ -6286,8 +6271,7 @@ __rec_las_wrapup_err(WT_SESSION_IMPL *session, WT_RECONCILE *r)
for (multi = r->multi, i = 0; i < r->multi_next; ++multi, ++i)
if (multi->supd != NULL &&
(las_pageid = multi->page_las.las_pageid) != 0)
- WT_TRET(
- __wt_las_remove_block(session, las_pageid, true));
+ WT_TRET(__wt_las_remove_block(session, las_pageid));
return (ret);
}
diff --git a/src/third_party/wiredtiger/src/txn/txn.c b/src/third_party/wiredtiger/src/txn/txn.c
index 1add958e226..6f8c0969f40 100644
--- a/src/third_party/wiredtiger/src/txn/txn.c
+++ b/src/third_party/wiredtiger/src/txn/txn.c
@@ -828,11 +828,7 @@ __wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[])
* Need to resolve indirect references of transaction
* operation, in case of prepared transaction.
*/
-#ifdef HAVE_LONG_RUNNING_PREPARE
if (!prepare) {
-#else
- if (1) {
-#endif
/*
* Switch reserved operations to abort to
* simplify obsolete update list truncation.
@@ -1033,9 +1029,7 @@ __wt_txn_prepare(WT_SESSION_IMPL *session, const char *cfg[])
__wt_timestamp_set(&upd->timestamp, &ts);
WT_PUBLISH(upd->prepare_state, WT_PREPARE_INPROGRESS);
-#ifdef HAVE_LONG_RUNNING_PREPARE
op->u.op_upd = NULL;
-#endif
WT_STAT_CONN_INCR(session, txn_prepared_updates_count);
break;
case WT_TXN_OP_REF_DELETE:
@@ -1121,14 +1115,10 @@ __wt_txn_rollback(WT_SESSION_IMPL *session, const char *cfg[])
* Need to resolve indirect references of transaction
* operation, in case of prepared transaction.
*/
-#ifdef HAVE_LONG_RUNNING_PREPARE
if (F_ISSET(txn, WT_TXN_PREPARE))
WT_RET(__wt_txn_resolve_prepared_op(
session, op, false));
else {
-#else
- {
-#endif
WT_ASSERT(session, upd->txnid == txn->id ||
upd->txnid == WT_TXN_ABORTED);
upd->txnid = WT_TXN_ABORTED;
diff --git a/src/third_party/wiredtiger/test/csuite/timestamp_abort/main.c b/src/third_party/wiredtiger/test/csuite/timestamp_abort/main.c
index f5f34fe3505..ae292ffa5b4 100644
--- a/src/third_party/wiredtiger/test/csuite/timestamp_abort/main.c
+++ b/src/third_party/wiredtiger/test/csuite/timestamp_abort/main.c
@@ -67,7 +67,6 @@ static char home[1024]; /* Program working dir */
#define PREPARE_FREQ 5
#define PREPARE_YIELD (PREPARE_FREQ * 10)
#define RECORDS_FILE "records-%" PRIu32
-#define STABLE_PERIOD 100
#define SESSION_MAX MAX_TH + 3 /* Include program worker threads */
static const char * table_pfx = "table";
@@ -79,7 +78,6 @@ static const char * const ckpt_file = "checkpoint_done";
static bool compat, inmem, use_ts;
static volatile uint64_t global_ts = 1;
-static volatile uint64_t th_ts[MAX_TH];
#define ENV_CONFIG_COMPAT ",compatibility=(release=\"2.9\")"
#define ENV_CONFIG_DEF \
@@ -103,6 +101,9 @@ typedef struct {
uint32_t info;
} THREAD_DATA;
+/* Lock for transactional ops that set or query a timestamp. */
+static pthread_rwlock_t ts_lock;
+
static void handler(int)
WT_GCC_FUNC_DECL_ATTRIBUTE((noreturn));
static void usage(void)
@@ -122,41 +123,27 @@ usage(void)
static WT_THREAD_RET
thread_ts_run(void *arg)
{
+ WT_DECL_RET;
WT_SESSION *session;
THREAD_DATA *td;
- uint64_t i, last_ts, oldest_ts, this_ts;
- char tscfg[64];
+ char tscfg[64], ts_buf[WT_TIMESTAMP_SIZE];
td = (THREAD_DATA *)arg;
- last_ts = 0;
testutil_check(td->conn->open_session(td->conn, NULL, NULL, &session));
- /*
- * Every N records we will record our stable timestamp into the stable
- * table. That will define our threshold where we expect to find records
- * after recovery.
- */
+ /* Update the oldest timestamp every 1 millisecond. */
for (;;) {
- oldest_ts = UINT64_MAX;
/*
- * For the timestamp thread, the info field contains the number
- * of worker threads.
+ * We get the last committed timestamp periodically in order to
+ * update the oldest timestamp, that requires locking out
+ * transactional ops that set or query a timestamp.
*/
- for (i = 0; i < td->info; ++i) {
- /*
- * We need to let all threads get started, so if we find
- * any thread still with a zero timestamp we go to
- * sleep.
- */
- this_ts = th_ts[i];
- if (this_ts == 0)
- goto ts_wait;
- else if (this_ts < oldest_ts)
- oldest_ts = this_ts;
- }
-
- if (oldest_ts != UINT64_MAX &&
- oldest_ts - last_ts > STABLE_PERIOD) {
+ testutil_check(pthread_rwlock_wrlock(&ts_lock));
+ ret = td->conn->query_timestamp(
+ td->conn, ts_buf, "get=all_committed");
+ testutil_check(pthread_rwlock_unlock(&ts_lock));
+ testutil_assert(ret == 0 || ret == WT_NOTFOUND);
+ if (ret == 0) {
/*
* Set both the oldest and stable timestamp so that we
* don't need to maintain read availability at older
@@ -164,14 +151,12 @@ thread_ts_run(void *arg)
*/
testutil_check(__wt_snprintf(
tscfg, sizeof(tscfg),
- "oldest_timestamp=%" PRIx64
- ",stable_timestamp=%" PRIx64,
- oldest_ts, oldest_ts));
+ "oldest_timestamp=%s,stable_timestamp=%s",
+ ts_buf, ts_buf));
testutil_check(
td->conn->set_timestamp(td->conn, tscfg));
- last_ts = oldest_ts;
- } else
-ts_wait: __wt_sleep(0, 1000);
+ }
+ __wt_sleep(0, 1000);
}
/* NOTREACHED */
}
@@ -242,9 +227,9 @@ thread_run(void *arg)
WT_CURSOR *cur_coll, *cur_local, *cur_oplog;
WT_ITEM data;
WT_RAND_STATE rnd;
- WT_SESSION *oplog_session, *session;
+ WT_SESSION *prepared_session, *session;
THREAD_DATA *td;
- uint64_t i, stable_ts;
+ uint64_t i, active_ts;
char cbuf[MAX_VAL], lbuf[MAX_VAL], obuf[MAX_VAL];
char kname[64], tscfg[64], uri[128];
bool use_prep;
@@ -255,6 +240,7 @@ thread_run(void *arg)
memset(obuf, 0, sizeof(obuf));
memset(kname, 0, sizeof(kname));
+ prepared_session = NULL;
td = (THREAD_DATA *)arg;
/*
* Set up the separate file for checking.
@@ -277,64 +263,73 @@ thread_run(void *arg)
use_prep = (use_ts && td->info % 10 == 0) ? true : false;
/*
- * We may have two sessions so that the oplog session can have its own
- * transaction in parallel with the collection session for threads
- * that are going to be using prepared transactions. We need this
- * because prepared transactions cannot have any operations that modify
- * a table that is logged. But we also want to test mixed logged and
- * not-logged transactions.
+ * For the prepared case we have two sessions so that the oplog session
+ * can have its own transaction in parallel with the collection session
+ * We need this because prepared transactions cannot have any operations
+ * that modify a table that is logged. But we also want to test mixed
+ * logged and not-logged transactions.
*/
testutil_check(td->conn->open_session(td->conn, NULL, NULL, &session));
+ if (use_prep)
+ testutil_check(td->conn->open_session(
+ td->conn, NULL, NULL, &prepared_session));
/*
* Open a cursor to each table.
*/
testutil_check(__wt_snprintf(
uri, sizeof(uri), "%s:%s", table_pfx, uri_collection));
- testutil_check(session->open_cursor(session,
- uri, NULL, NULL, &cur_coll));
+ if (use_prep)
+ testutil_check(prepared_session->open_cursor(prepared_session,
+ uri, NULL, NULL, &cur_coll));
+ else
+ testutil_check(session->open_cursor(session,
+ uri, NULL, NULL, &cur_coll));
testutil_check(__wt_snprintf(
uri, sizeof(uri), "%s:%s", table_pfx, uri_local));
- testutil_check(session->open_cursor(session,
- uri, NULL, NULL, &cur_local));
+ if (use_prep)
+ testutil_check(prepared_session->open_cursor(prepared_session,
+ uri, NULL, NULL, &cur_local));
+ else
+ testutil_check(session->open_cursor(session,
+ uri, NULL, NULL, &cur_local));
testutil_check(__wt_snprintf(
uri, sizeof(uri), "%s:%s", table_pfx, uri_oplog));
- oplog_session = NULL;
- if (use_prep) {
- testutil_check(td->conn->open_session(
- td->conn, NULL, NULL, &oplog_session));
- testutil_check(session->open_cursor(oplog_session,
- uri, NULL, NULL, &cur_oplog));
- } else
- testutil_check(session->open_cursor(session,
- uri, NULL, NULL, &cur_oplog));
+ testutil_check(session->open_cursor(session,
+ uri, NULL, NULL, &cur_oplog));
/*
* Write our portion of the key space until we're killed.
*/
printf("Thread %" PRIu32 " starts at %" PRIu64 "\n",
td->info, td->start);
- stable_ts = 0;
+ active_ts = 0;
for (i = td->start;; ++i) {
- if (use_ts)
- stable_ts = __wt_atomic_addv64(&global_ts, 1);
testutil_check(__wt_snprintf(
kname, sizeof(kname), "%" PRIu64, i));
testutil_check(session->begin_transaction(session, NULL));
if (use_prep)
- testutil_check(oplog_session->begin_transaction(
- oplog_session, NULL));
- /*
- * If not using prepared transactions set the timestamp now
- * before performing the operation. If we are using prepared
- * transactions, it must be set after the prepare.
- */
- if (use_ts && !use_prep) {
- testutil_check(__wt_snprintf(tscfg, sizeof(tscfg),
- "commit_timestamp=%" PRIx64, stable_ts));
- testutil_check(
- session->timestamp_transaction(session, tscfg));
+ testutil_check(prepared_session->begin_transaction(
+ prepared_session, NULL));
+
+ if (use_ts) {
+ testutil_check(pthread_rwlock_rdlock(&ts_lock));
+ active_ts = __wt_atomic_addv64(&global_ts, 1);
+ testutil_check(__wt_snprintf(tscfg,
+ sizeof(tscfg), "commit_timestamp=%" PRIx64,
+ active_ts));
+ /*
+ * Set the transaction's timestamp now before performing
+ * the operation. If we are using prepared transactions,
+ * set the timestamp for the session used for oplog. The
+ * collection session in that case would continue to use
+ * this timestamp.
+ */
+ testutil_check(session->timestamp_transaction(
+ session, tscfg));
+ testutil_check(pthread_rwlock_unlock(&ts_lock));
}
+
cur_coll->set_key(cur_coll, kname);
cur_local->set_key(cur_local, kname);
cur_oplog->set_key(cur_oplog, kname);
@@ -344,13 +339,13 @@ thread_run(void *arg)
*/
testutil_check(__wt_snprintf(cbuf, sizeof(cbuf),
"COLL: thread:%" PRIu64 " ts:%" PRIu64 " key: %" PRIu64,
- td->info, stable_ts, i));
+ td->info, active_ts, i));
testutil_check(__wt_snprintf(lbuf, sizeof(lbuf),
"LOCAL: thread:%" PRIu64 " ts:%" PRIu64 " key: %" PRIu64,
- td->info, stable_ts, i));
+ td->info, active_ts, i));
testutil_check(__wt_snprintf(obuf, sizeof(obuf),
"OPLOG: thread:%" PRIu64 " ts:%" PRIu64 " key: %" PRIu64,
- td->info, stable_ts, i));
+ td->info, active_ts, i));
data.size = __wt_random(&rnd) % MAX_VAL;
data.data = cbuf;
cur_coll->set_value(cur_coll, &data);
@@ -359,55 +354,31 @@ thread_run(void *arg)
data.data = obuf;
cur_oplog->set_value(cur_oplog, &data);
testutil_check(cur_oplog->insert(cur_oplog));
- if (use_ts) {
+ if (use_prep) {
/*
* Run with prepare every once in a while. And also
* yield after prepare sometimes too. This is only done
- * on the regular session.
+ * on the collection session.
*/
- if (use_prep && i % PREPARE_FREQ == 0) {
- testutil_check(__wt_snprintf(
- tscfg, sizeof(tscfg),
- "prepare_timestamp=%" PRIx64, stable_ts));
- testutil_check(session->prepare_transaction(
- session, tscfg));
+ if (i % PREPARE_FREQ == 0) {
+ testutil_check(__wt_snprintf(tscfg,
+ sizeof(tscfg), "prepare_timestamp=%"
+ PRIx64, active_ts));
+ testutil_check(
+ prepared_session->prepare_transaction(
+ prepared_session, tscfg));
if (i % PREPARE_YIELD == 0)
__wt_yield();
}
- /*
- * If we did not set the timestamp above via
- * timestamp_transaction send it now on commit.
- */
- if (use_ts && !use_prep)
- testutil_check(
- session->commit_transaction(session, NULL));
- else {
- testutil_check(
- __wt_snprintf(tscfg, sizeof(tscfg),
- "commit_timestamp=%" PRIx64, stable_ts));
- testutil_check(
- session->commit_transaction(session,
- tscfg));
- }
- if (use_prep)
- testutil_check(
- oplog_session->commit_transaction(
- oplog_session, tscfg));
- /*
- * Update the thread's last-committed timestamp.
- * Don't let the compiler re-order this statement,
- * if we were to race with the timestamp thread, it
- * might see our thread update before the commit.
- */
- WT_PUBLISH(th_ts[td->info], stable_ts);
- } else {
testutil_check(
- session->commit_transaction(session, NULL));
- if (use_prep)
- testutil_check(
- oplog_session->commit_transaction(
- oplog_session, NULL));
+ __wt_snprintf(tscfg, sizeof(tscfg),
+ "commit_timestamp=%" PRIx64, active_ts));
+ testutil_check(
+ prepared_session->commit_transaction(
+ prepared_session, tscfg));
}
+ testutil_check(
+ session->commit_transaction(session, NULL));
/*
* Insert into the local table outside the timestamp txn.
*/
@@ -420,7 +391,7 @@ thread_run(void *arg)
* Save the timestamp and key separately for checking later.
*/
if (fprintf(fp,
- "%" PRIu64 " %" PRIu64 "\n", stable_ts, i) < 0)
+ "%" PRIu64 " %" PRIu64 "\n", active_ts, i) < 0)
testutil_die(EIO, "fprintf");
}
/* NOTREACHED */
@@ -657,6 +628,8 @@ main(int argc, char *argv[])
usage();
testutil_work_dir_from_path(home, sizeof(home), working_dir);
+ testutil_check(pthread_rwlock_init(&ts_lock, NULL));
+
/*
* If the user wants to verify they need to tell us how many threads
* there were so we can find the old record files.
@@ -955,6 +928,7 @@ main(int argc, char *argv[])
absent_oplog, count);
fatal = true;
}
+ testutil_check(pthread_rwlock_destroy(&ts_lock));
if (fatal)
return (EXIT_FAILURE);
printf("%" PRIu64 " records verified\n", count);
diff --git a/src/third_party/wiredtiger/test/suite/test_prepare_lookaside01.py b/src/third_party/wiredtiger/test/suite/test_prepare_lookaside01.py
index ed905cdadcd..57dc4c7a116 100644
--- a/src/third_party/wiredtiger/test/suite/test_prepare_lookaside01.py
+++ b/src/third_party/wiredtiger/test/suite/test_prepare_lookaside01.py
@@ -127,9 +127,7 @@ class test_prepare_lookaside01(wttest.WiredTigerTestCase):
# Check if lookaside is working properly with prepare transactions.
# We put prepared updates in multiple sessions so that we do not hang
# because of cache being full with uncommitted updates.
- # TODO: Increase the nsessions below to start testing lookaside eviction
- # of prepared updates.
- nsessions = 1
+ nsessions = 3
nkeys = 4000
self.prepare_updates(uri, ds, nrows, nsessions, nkeys)