diff options
Diffstat (limited to 'src')
18 files changed, 608 insertions, 415 deletions
diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.cxx b/src/third_party/wiredtiger/bench/workgen/workgen.cxx index 9ae63682f9c..39aacb89dc8 100644 --- a/src/third_party/wiredtiger/bench/workgen/workgen.cxx +++ b/src/third_party/wiredtiger/bench/workgen/workgen.cxx @@ -257,7 +257,7 @@ int ContextInternal::create_all() { if (_runtime_alloced < _tint_last) { // The array references are 1-based, we'll waste one entry. TableRuntime *new_table_runtime = new TableRuntime[_tint_last + 1]; - for (int i = 0; i < _runtime_alloced; i++) + for (uint32_t i = 0; i < _runtime_alloced; i++) new_table_runtime[i + 1] = _table_runtime[i + 1]; delete _table_runtime; _table_runtime = new_table_runtime; @@ -318,12 +318,13 @@ int Monitor::run() { new_totals.add(tr->_stats, true); Stats interval(new_totals); interval.subtract(prev_totals); - interval.smooth(prev_interval); int interval_secs = options->sample_interval; uint64_t cur_reads = interval.read.ops / interval_secs; uint64_t cur_inserts = interval.insert.ops / interval_secs; uint64_t cur_updates = interval.update.ops / interval_secs; + bool checkpointing = new_totals.checkpoint.ops_in_progress > 0 || + interval.checkpoint.ops > 0; uint64_t totalsec = ts_sec(t - _wrunner._start); (*_out) << time_buf @@ -331,7 +332,7 @@ int Monitor::run() { << "," << cur_reads << "," << cur_inserts << "," << cur_updates - << "," << 'N' // checkpoint in progress + << "," << (checkpointing ? 'Y' : 'N') << "," << interval.read.average_latency() << "," << interval.read.min_latency << "," << interval.read.max_latency @@ -348,13 +349,22 @@ int Monitor::run() { (void)strftime(time_buf, sizeof(time_buf), WORKGEN_TIMESTAMP_JSON, tm); -#define TRACK_JSON(name, t) \ - "\"" << (name) << "\":{" \ - << "\"ops per sec\":" << ((t).ops / interval_secs) \ - << ",\"average latency\":" << (t).average_latency() \ - << ",\"min latency\":" << (t).min_latency \ - << ",\"max latency\":" << (t).max_latency \ - << "}" + // Note: we could allow this to be configurable. + int percentiles[4] = {50, 95, 99, 0}; + +#define TRACK_JSON(f, name, t, percentiles, extra) \ + do { \ + int _i; \ + (f) << "\"" << (name) << "\":{" << extra \ + << "\"ops per sec\":" << ((t).ops / interval_secs) \ + << ",\"average latency\":" << (t).average_latency() \ + << ",\"min latency\":" << (t).min_latency \ + << ",\"max latency\":" << (t).max_latency; \ + for (_i = 0; (percentiles)[_i] != 0; _i++) \ + (f) << ",\"" << (percentiles)[_i] << "% latency\":" \ + << (t).percentile_latency(percentiles[_i]); \ + (f) << "}"; \ + } while(0) (*_json) << "{"; if (first) { @@ -362,11 +372,16 @@ int Monitor::run() { first = false; } (*_json) << "\"localTime\":\"" << time_buf - << "\",\"workgen\":{" - << TRACK_JSON("read", interval.read) << "," - << TRACK_JSON("insert", interval.insert) << "," - << TRACK_JSON("update", interval.update) - << "}}" << std::endl; + << "\",\"workgen\":{"; + TRACK_JSON(*_json, "read", interval.read, percentiles, ""); + (*_json) << ","; + TRACK_JSON(*_json, "insert", interval.insert, percentiles, ""); + (*_json) << ","; + TRACK_JSON(*_json, "update", interval.update, percentiles, ""); + (*_json) << ","; + TRACK_JSON(*_json, "checkpoint", interval.checkpoint, percentiles, + "\"active\":" << (checkpointing ? "1," : "0,")); + (*_json) << "}}" << std::endl; } uint64_t read_max = interval.read.max_latency; @@ -553,7 +568,7 @@ void ThreadRunner::op_create_all(Operation *op, size_t &keysize, tint_t tint; op->create_all(); - if (op->_optype != Operation::OP_NONE) { + if (op->is_table_op()) { op->kv_compute_max(true, false); if (OP_HAS_VALUE(op)) op->kv_compute_max(false, op->_table.options.random_value); @@ -678,7 +693,7 @@ int ThreadRunner::op_run(Operation *op) { &_throttle_limit)); _throttle_ops = 0; } - if (op->_optype != Operation::OP_NONE) + if (op->is_table_op()) ++_throttle_ops; } @@ -691,6 +706,10 @@ int ThreadRunner::op_run(Operation *op) { // (and most likely when the threads are first beginning). Any // WT_NOTFOUND returns are allowed and get their own statistic bumped. switch (op->_optype) { + case Operation::OP_CHECKPOINT: + recno = 0; + track = &_stats.checkpoint; + break; case Operation::OP_INSERT: track = &_stats.insert; if (op->_key._keytype == Key::KEYGEN_APPEND || @@ -700,6 +719,10 @@ int ThreadRunner::op_run(Operation *op) { else recno = op_get_key_recno(op, range, tint); break; + case Operation::OP_NONE: + case Operation::OP_NOOP: + recno = 0; + break; case Operation::OP_REMOVE: track = &_stats.remove; recno = op_get_key_recno(op, range, tint); @@ -712,12 +735,11 @@ int ThreadRunner::op_run(Operation *op) { track = &_stats.update; recno = op_get_key_recno(op, range, tint); break; - case Operation::OP_NONE: + case Operation::OP_SLEEP: recno = 0; break; } - if ((op->_flags & WORKGEN_OP_REOPEN) != 0 && - op->_optype != Operation::OP_NONE) { + if ((op->_internal->_flags & WORKGEN_OP_REOPEN) != 0) { WT_ERR(_session->open_cursor(_session, op->_table._uri.c_str(), NULL, NULL, &cursor)); own_cursor = true; @@ -732,6 +754,10 @@ int ThreadRunner::op_run(Operation *op) { timespec start; if (measure_latency) workgen_epoch(&start); + // Whether or not we are measuring latency, we track how many operations + // are in progress, or that complete. + if (track != NULL) + track->begin(); if (op->_transaction != NULL) { if (_in_transaction) @@ -740,7 +766,7 @@ int ThreadRunner::op_run(Operation *op) { op->_transaction->_begin_config.c_str()); _in_transaction = true; } - if (op->_optype != Operation::OP_NONE) { + if (op->is_table_op()) { op->kv_gen(this, true, 100, recno, _keybuf); cursor->set_key(cursor, _keybuf); if (OP_HAS_VALUE(op)) { @@ -771,15 +797,18 @@ int ThreadRunner::op_run(Operation *op) { ret = 0; // WT_NOTFOUND allowed. } cursor->reset(cursor); - } + } else + WT_ERR(op->_internal->run(this, _session)); + if (measure_latency) { timespec stop; workgen_epoch(&stop); - track->incr_with_latency(ts_us(stop - start)); + track->complete_with_latency(ts_us(stop - start)); } else if (track != NULL) - track->incr(); + track->complete(); if (op->_group != NULL) + VERBOSE(*this, "GROUP operation " << op->_repeatgroup << " times"); for (int count = 0; !_stop && count < op->_repeatgroup; count++) for (std::vector<Operation>::iterator i = op->_group->begin(); i != op->_group->end(); i++) @@ -932,45 +961,51 @@ void Thread::describe(std::ostream &os) const { } Operation::Operation() : - _optype(OP_NONE), _table(), _key(), _value(), _config(), _transaction(NULL), - _group(NULL), _repeatgroup(0), _flags(0), - _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) { + _optype(OP_NONE), _internal(NULL), _table(), _key(), _value(), _config(), + _transaction(NULL), _group(NULL), _repeatgroup(0) { + init_internal(NULL); } Operation::Operation(OpType optype, Table table, Key key, Value value) : - _optype(optype), _table(table), _key(key), _value(value), _config(), - _transaction(NULL), _group(NULL), _repeatgroup(0), _flags(0), - _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) { + _optype(optype), _internal(NULL), _table(table), _key(key), _value(value), + _config(), _transaction(NULL), _group(NULL), _repeatgroup(0) { + init_internal(NULL); size_check(); } Operation::Operation(OpType optype, Table table, Key key) : - _optype(optype), _table(table), _key(key), _value(), _config(), - _transaction(NULL), _group(NULL), _repeatgroup(0), _flags(0), - _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) { + _optype(optype), _internal(NULL), _table(table), _key(key), _value(), + _config(), _transaction(NULL), _group(NULL), _repeatgroup(0) { + init_internal(NULL); size_check(); } Operation::Operation(OpType optype, Table table) : - _optype(optype), _table(table), _key(), _value(), _config(), - _transaction(NULL), _group(NULL), _repeatgroup(0), _flags(0), - _keysize(0), _valuesize(0), _keymax(0), _valuemax(0) { + _optype(optype), _internal(NULL), _table(table), _key(), _value(), + _config(), _transaction(NULL), _group(NULL), _repeatgroup(0) { + init_internal(NULL); size_check(); } Operation::Operation(const Operation &other) : - _optype(other._optype), _table(other._table), _key(other._key), - _value(other._value), _config(other._config), + _optype(other._optype), _internal(NULL), _table(other._table), + _key(other._key), _value(other._value), _config(other._config), _transaction(other._transaction), _group(other._group), - _repeatgroup(other._repeatgroup), _flags(other._flags), - _keysize(other._keysize), _valuesize(other._valuesize), - _keymax(other._keymax), _valuemax(other._valuemax) { + _repeatgroup(other._repeatgroup) { // Creation and destruction of _group and _transaction is managed // by Python. + init_internal(other._internal); +} + +Operation::Operation(OpType optype, const char *config) : + _optype(optype), _internal(NULL), _table(), _key(), _value(), + _config(config), _transaction(NULL), _group(NULL), _repeatgroup(0) { + init_internal(NULL); } Operation::~Operation() { // Creation and destruction of _group, _transaction is managed by Python. + delete _internal; } Operation& Operation::operator=(const Operation &other) { @@ -981,28 +1016,62 @@ Operation& Operation::operator=(const Operation &other) { _transaction = other._transaction; _group = other._group; _repeatgroup = other._repeatgroup; - _keysize = other._keysize; - _valuesize = other._valuesize; - _keymax = other._keymax; - _valuemax = other._valuemax; + delete _internal; + _internal = NULL; + init_internal(other._internal); return (*this); } -void Operation::create_all() { - size_check(); +void Operation::init_internal(OperationInternal *other) { + ASSERT(_internal == NULL); - _flags = 0; - if (!_config.empty()) { - if (_config == "reopen") - _flags |= WORKGEN_OP_REOPEN; + switch (_optype) { + case OP_CHECKPOINT: + if (other == NULL) + _internal = new CheckpointOperationInternal(); + else + _internal = new CheckpointOperationInternal( + *(CheckpointOperationInternal *)other); + break; + case OP_INSERT: + case OP_REMOVE: + case OP_SEARCH: + case OP_UPDATE: + if (other == NULL) + _internal = new TableOperationInternal(); + else + _internal = new TableOperationInternal( + *(TableOperationInternal *)other); + break; + case OP_NONE: + case OP_NOOP: + if (other == NULL) + _internal = new OperationInternal(); + else + _internal = new OperationInternal(*other); + break; + case OP_SLEEP: + if (other == NULL) + _internal = new SleepOperationInternal(); else - THROW("operation has illegal config: \"" << _config << "\""); + _internal = new SleepOperationInternal( + *(SleepOperationInternal *)other); + break; + default: + ASSERT(false); } } +void Operation::create_all() { + size_check(); + + _internal->_flags = 0; + _internal->parse_config(_config); +} + void Operation::describe(std::ostream &os) const { os << "Operation: " << _optype; - if (_optype != OP_NONE) { + if (is_table_op()) { os << ", "; _table.describe(os); os << ", "; _key.describe(os); os << ", "; _value.describe(os); @@ -1029,33 +1098,44 @@ void Operation::describe(std::ostream &os) const { } void Operation::get_static_counts(Stats &stats, int multiplier) { - switch (_optype) { - case OP_NONE: - break; - case OP_INSERT: - stats.insert.ops += multiplier; - break; - case OP_REMOVE: - stats.remove.ops += multiplier; - break; - case OP_SEARCH: - stats.read.ops += multiplier; - break; - case OP_UPDATE: - stats.update.ops += multiplier; - break; - default: - ASSERT(false); - } + if (is_table_op()) + switch (_optype) { + case OP_INSERT: + stats.insert.ops += multiplier; + break; + case OP_REMOVE: + stats.remove.ops += multiplier; + break; + case OP_SEARCH: + stats.read.ops += multiplier; + break; + case OP_UPDATE: + stats.update.ops += multiplier; + break; + default: + ASSERT(false); + } + else if (_optype == OP_CHECKPOINT) + stats.checkpoint.ops += multiplier; + if (_group != NULL) for (std::vector<Operation>::iterator i = _group->begin(); i != _group->end(); i++) i->get_static_counts(stats, multiplier * _repeatgroup); } +bool Operation::is_table_op() const { + return (_optype == OP_INSERT || _optype == OP_REMOVE || + _optype == OP_SEARCH || _optype == OP_UPDATE); +} + void Operation::kv_compute_max(bool iskey, bool has_random) { uint64_t max; int size; + TableOperationInternal *internal; + + ASSERT(is_table_op()); + internal = (TableOperationInternal *)_internal; size = iskey ? _key._size : _value._size; if (size == 0) @@ -1076,31 +1156,40 @@ void Operation::kv_compute_max(bool iskey, bool has_random) { max = 0; if (iskey) { - _keysize = size; - _keymax = max; + internal->_keysize = size; + internal->_keymax = max; } else { - _valuesize = size; - _valuemax = max; + internal->_valuesize = size; + internal->_valuemax = max; } } void Operation::kv_size_buffer(bool iskey, size_t &maxsize) const { + TableOperationInternal *internal; + + ASSERT(is_table_op()); + internal = (TableOperationInternal *)_internal; + if (iskey) { - if ((size_t)_keysize > maxsize) - maxsize = _keysize; + if ((size_t)internal->_keysize > maxsize) + maxsize = internal->_keysize; } else { - if ((size_t)_valuesize > maxsize) - maxsize = _valuesize; + if ((size_t)internal->_valuesize > maxsize) + maxsize = internal->_valuesize; } } void Operation::kv_gen(ThreadRunner *runner, bool iskey, uint64_t compressibility, uint64_t n, char *result) const { - uint64_t max; - int size; + TableOperationInternal *internal; + uint_t max; + uint_t size; - size = iskey ? _keysize : _valuesize; - max = iskey ? _keymax : _valuemax; + ASSERT(is_table_op()); + internal = (TableOperationInternal *)_internal; + + size = iskey ? internal->_keysize : internal->_valuesize; + max = iskey ? internal->_keymax : internal->_valuemax; if (n > max) THROW((iskey ? "Key" : "Value") << " (" << n << ") too large for size (" << size << ")"); @@ -1124,13 +1213,13 @@ void Operation::kv_gen(ThreadRunner *runner, bool iskey, * That means that 75% of the string will be random numbers, and 25 * will be easily compressible zero-fill. */ - uint64_t random_len = size - ((size * compressibility) / 100); + uint_t random_len = size - ((size * compressibility) / 100); /* Never overwrite the record number identifier */ if (random_len > size - 20) random_len = size - 20; - for (int i = 0; i < random_len; ++i) + for (uint64_t i = 0; i < random_len; ++i) /* * TODO: It'd be nice to use workgen_rand here, but this class * is without the context of a runner thread, so it's not easy @@ -1141,22 +1230,60 @@ void Operation::kv_gen(ThreadRunner *runner, bool iskey, } void Operation::size_check() const { - if (_optype != OP_NONE && _key._size == 0 && _table.options.key_size == 0) - THROW("operation requires a key size"); - if (OP_HAS_VALUE(this) && _value._size == 0 && - _table.options.value_size == 0) - THROW("operation requires a value size"); + if (is_table_op()) { + if (_key._size == 0 && _table.options.key_size == 0) + THROW("operation requires a key size"); + if (OP_HAS_VALUE(this) && _value._size == 0 && + _table.options.value_size == 0) + THROW("operation requires a value size"); + } } -Track::Track(bool latency_tracking) : ops(0), latency_ops(0), latency(0), - min_latency(0), max_latency(0), us(NULL), ms(NULL), sec(NULL) { - track_latency(latency_tracking); +int CheckpointOperationInternal::run(ThreadRunner *runner, WT_SESSION *session) +{ + return (session->checkpoint(session, NULL)); +} + +void SleepOperationInternal::parse_config(const std::string &config) +{ + int amount = 0; + const char *configp; + char *endp; + + configp = config.c_str(); + _sleepvalue = strtod(configp, &endp); + if (configp == endp || *endp != '\0' || _sleepvalue < 0.0) + THROW("sleep operation requires a configuration string as " + "a non-negative float, e.g. '1.5'"); +} + +int SleepOperationInternal::run(ThreadRunner *runner, WT_SESSION *session) +{ + (void)session; /* not used */ + sleep(_sleepvalue); + return (0); } -Track::Track(const Track &other) : ops(other.ops), - latency_ops(other.latency_ops), latency(other.latency), - min_latency(other.min_latency), max_latency(other.max_latency), +void TableOperationInternal::parse_config(const std::string &config) +{ + if (!config.empty()) { + if (config == "reopen") + _flags |= WORKGEN_OP_REOPEN; + else + THROW("table operation has illegal config: \"" << config << "\""); + } +} + +Track::Track(bool latency_tracking) : ops_in_progress(0), ops(0), + latency_ops(0), latency(0), bucket_ops(0), min_latency(0), max_latency(0), us(NULL), ms(NULL), sec(NULL) { + track_latency(latency_tracking); +} + +Track::Track(const Track &other) : ops_in_progress(other.ops_in_progress), + ops(other.ops), latency_ops(other.latency_ops), latency(other.latency), + bucket_ops(other.bucket_ops), min_latency(other.min_latency), + max_latency(other.max_latency), us(NULL), ms(NULL), sec(NULL) { if (other.us != NULL) { us = new uint32_t[LATENCY_US_BUCKETS]; ms = new uint32_t[LATENCY_MS_BUCKETS]; @@ -1176,6 +1303,7 @@ Track::~Track() { } void Track::add(Track &other, bool reset) { + ops_in_progress += other.ops_in_progress; ops += other.ops; latency_ops += other.latency_ops; latency += other.latency; @@ -1198,6 +1326,7 @@ void Track::add(Track &other, bool reset) { } void Track::assign(const Track &other) { + ops_in_progress = other.ops_in_progress; ops = other.ops; latency_ops = other.latency_ops; latency = other.latency; @@ -1231,10 +1360,16 @@ uint64_t Track::average_latency() const { return (latency / latency_ops); } +void Track::begin() { + ops_in_progress++; +} + void Track::clear() { + ops_in_progress = 0; ops = 0; latency_ops = 0; latency = 0; + bucket_ops = 0; min_latency = 0; max_latency = 0; if (us != NULL) { @@ -1244,13 +1379,15 @@ void Track::clear() { } } -void Track::incr() { +void Track::complete() { + --ops_in_progress; ops++; } -void Track::incr_with_latency(uint64_t usecs) { +void Track::complete_with_latency(uint64_t usecs) { ASSERT(us != NULL); + --ops_in_progress; ops++; latency_ops++; latency += usecs; @@ -1277,7 +1414,51 @@ void Track::incr_with_latency(uint64_t usecs) { sec[LATENCY_SEC_BUCKETS - 1]++; } +// Return the latency for which the given percent is lower than it. +// E.g. for percent == 95, returns the latency for which 95% of latencies +// are faster (lower), and 5% are slower (higher). +uint64_t Track::percentile_latency(int percent) const { + // Get the total number of operations in the latency buckets. + // We can't reliably use latency_ops, because this struct was + // added up from Track structures that were being copied while + // being updated. + uint64_t total = 0; + for (int i = 0; i < LATENCY_SEC_BUCKETS; i++) + total += sec[i]; + for (int i = 0; i < LATENCY_MS_BUCKETS; i++) + total += ms[i]; + for (int i = 0; i < LATENCY_US_BUCKETS; i++) + total += us[i]; + if (total == 0) + return (0); + + // optimized for percent values over 50, we start counting from above. + uint64_t n = 0; + uint64_t k = (100 - percent) * total / 100; + if (k == 0) + return (0); + for (int i = LATENCY_SEC_BUCKETS - 1; i >= 0; --i) { + n += sec[i]; + if (n >= k) + return (sec_to_us(i)); + } + for (int i = LATENCY_MS_BUCKETS - 1; i >= 0; --i) { + n += ms[i]; + if (n >= k) + return (ms_to_us(i)); + } + for (int i = LATENCY_US_BUCKETS - 1; i >= 0; --i) { + n += us[i]; + if (n >= k) + return (100 * i); + } + // We should have accounted for all the buckets. + ASSERT(false); + return (0); +} + void Track::subtract(const Track &other) { + ops_in_progress -= other.ops_in_progress; ops -= other.ops; latency_ops -= other.latency_ops; latency -= other.latency; @@ -1294,19 +1475,6 @@ void Track::subtract(const Track &other) { } } -// If there are no entries in this Track, take them from -// a previous Track. Used to smooth graphs. We don't worry -// about latency buckets here. -void Track::smooth(const Track &other) { - if (latency_ops == 0) { - ops = other.ops; - latency = other.latency; - latency_ops = other.latency_ops; - min_latency = other.min_latency; - max_latency = other.max_latency; - } -} - void Track::track_latency(bool newval) { if (newval) { if (us == NULL) { @@ -1351,18 +1519,20 @@ void Track::_get_sec(long *result) { memset(result, 0, sizeof(long) * LATENCY_SEC_BUCKETS); } -Stats::Stats(bool latency) : insert(latency), not_found(latency), - read(latency), remove(latency), update(latency), truncate(latency) { +Stats::Stats(bool latency) : checkpoint(latency), insert(latency), + not_found(latency), read(latency), remove(latency), update(latency), + truncate(latency) { } -Stats::Stats(const Stats &other) : insert(other.insert), - not_found(other.not_found), read(other.read), remove(other.remove), - update(other.update), truncate(other.truncate) { +Stats::Stats(const Stats &other) : checkpoint(other.checkpoint), + insert(other.insert), not_found(other.not_found), read(other.read), + remove(other.remove), update(other.update), truncate(other.truncate) { } Stats::~Stats() {} void Stats::add(Stats &other, bool reset) { + checkpoint.add(other.checkpoint, reset); insert.add(other.insert, reset); not_found.add(other.not_found, reset); read.add(other.read, reset); @@ -1372,6 +1542,7 @@ void Stats::add(Stats &other, bool reset) { } void Stats::assign(const Stats &other) { + checkpoint.assign(other.checkpoint); insert.assign(other.insert); not_found.assign(other.not_found); read.assign(other.read); @@ -1381,6 +1552,7 @@ void Stats::assign(const Stats &other) { } void Stats::clear() { + checkpoint.clear(); insert.clear(); not_found.clear(); read.clear(); @@ -1398,10 +1570,12 @@ void Stats::describe(std::ostream &os) const { os << ", updates " << update.ops; os << ", truncates " << truncate.ops; os << ", removes " << remove.ops; + os << ", checkpoints " << checkpoint.ops; } void Stats::final_report(std::ostream &os, timespec &totalsecs) const { uint64_t ops = 0; + ops += checkpoint.ops; ops += read.ops; ops += not_found.ops; ops += insert.ops; @@ -1420,6 +1594,7 @@ void Stats::final_report(std::ostream &os, timespec &totalsecs) const { FINAL_OUTPUT(os, update.ops, update, ops, totalsecs); FINAL_OUTPUT(os, truncate.ops, truncate, ops, totalsecs); FINAL_OUTPUT(os, remove.ops, remove, ops, totalsecs); + FINAL_OUTPUT(os, checkpoint.ops, checkpoint, ops, totalsecs); } void Stats::report(std::ostream &os) const { @@ -1430,19 +1605,12 @@ void Stats::report(std::ostream &os) const { os << ", " << insert.ops << " inserts, "; os << update.ops << " updates, "; os << truncate.ops << " truncates, "; - os << remove.ops << " removes"; -} - -void Stats::smooth(const Stats &other) { - insert.smooth(other.insert); - not_found.smooth(other.not_found); - read.smooth(other.read); - remove.smooth(other.remove); - update.smooth(other.update); - truncate.smooth(other.truncate); + os << remove.ops << " removes, "; + os << checkpoint.ops << " checkpoints"; } void Stats::subtract(const Stats &other) { + checkpoint.subtract(other.checkpoint); insert.subtract(other.insert); not_found.subtract(other.not_found); read.subtract(other.read); @@ -1452,6 +1620,7 @@ void Stats::subtract(const Stats &other) { } void Stats::track_latency(bool latency) { + checkpoint.track_latency(latency); insert.track_latency(latency); not_found.track_latency(latency); read.track_latency(latency); @@ -1507,7 +1676,7 @@ TableInternal::~TableInternal() {} WorkloadOptions::WorkloadOptions() : max_latency(0), report_file("workload.stat"), report_interval(0), run_time(0), - sample_file("sample.json"), sample_interval(0), sample_rate(1), warmup(0), + sample_file("monitor.json"), sample_interval(0), sample_rate(1), warmup(0), _options() { _options.add_int("max_latency", max_latency, "prints warning if any latency measured exceeds this number of " diff --git a/src/third_party/wiredtiger/bench/workgen/workgen.h b/src/third_party/wiredtiger/bench/workgen/workgen.h index 7de03a90f17..cc93409b388 100644 --- a/src/third_party/wiredtiger/bench/workgen/workgen.h +++ b/src/third_party/wiredtiger/bench/workgen/workgen.h @@ -30,6 +30,9 @@ #include <vector> #include <map> +// For convenience: A type exposed to Python that cannot be negative. +typedef unsigned int uint_t; + namespace workgen { struct ContextInternal; @@ -81,9 +84,11 @@ struct Track { // Threads maintain the total thread operation and total latency they've // experienced. - uint64_t ops; // Total operations */ + uint64_t ops_in_progress; // Total operations not completed */ + uint64_t ops; // Total operations completed */ uint64_t latency_ops; // Total ops sampled for latency uint64_t latency; // Total latency */ + uint64_t bucket_ops; // Computed for percentile_latency // Minimum/maximum latency, shared with the monitor thread, that is, the // monitor thread clears it so it's recalculated again for each period. @@ -98,10 +103,11 @@ struct Track { void add(Track&, bool reset = false); void assign(const Track&); uint64_t average_latency() const; + void begin(); void clear(); - void incr(); - void incr_with_latency(uint64_t usecs); - void smooth(const Track&); + void complete(); + void complete_with_latency(uint64_t usecs); + uint64_t percentile_latency(int percent) const; void subtract(const Track&); void track_latency(bool); bool track_latency() const { return (us != NULL); } @@ -120,6 +126,7 @@ private: }; struct Stats { + Track checkpoint; Track insert; Track not_found; Track read; @@ -139,7 +146,6 @@ struct Stats { void final_report(std::ostream &os, timespec &totalsecs) const; void report(std::ostream &os) const; #endif - void smooth(const Stats&); void subtract(const Stats&); void track_latency(bool); bool track_latency() const { return (insert.track_latency()); } @@ -170,11 +176,11 @@ struct Context { // properties are prevented, only existing properties can be set. // struct TableOptions { - int key_size; - int value_size; - uint64_t value_compressibility; + uint_t key_size; + uint_t value_size; + uint_t value_compressibility; bool random_value; - int range; + uint_t range; TableOptions(); TableOptions(const TableOptions &other); @@ -276,8 +282,10 @@ struct Value { struct Operation { enum OpType { - OP_NONE, OP_INSERT, OP_REMOVE, OP_SEARCH, OP_UPDATE }; + OP_CHECKPOINT, OP_INSERT, OP_NONE, OP_NOOP, OP_REMOVE, OP_SEARCH, + OP_SLEEP, OP_UPDATE }; OpType _optype; + OperationInternal *_internal; Table _table; Key _key; @@ -287,28 +295,22 @@ struct Operation { std::vector<Operation> *_group; int _repeatgroup; -#ifndef SWIG -#define WORKGEN_OP_REOPEN 0x0001 // reopen cursor for each op - uint32_t _flags; - - int _keysize; // derived from Key._size and Table.options.key_size - int _valuesize; - uint64_t _keymax; - uint64_t _valuemax; -#endif - Operation(); Operation(OpType optype, Table table, Key key, Value value); Operation(OpType optype, Table table, Key key); Operation(OpType optype, Table table); + // Constructor with string applies to NOOP, SLEEP, CHECKPOINT + Operation(OpType optype, const char *config); Operation(const Operation &other); ~Operation(); void describe(std::ostream &os) const; #ifndef SWIG Operation& operator=(const Operation &other); + void init_internal(OperationInternal *other); void create_all(); void get_static_counts(Stats &stats, int multiplier); + bool is_table_op() const; void kv_compute_max(bool iskey, bool has_random); void kv_gen(ThreadRunner *runner, bool iskey, uint64_t compressibility, uint64_t n, char *result) const; diff --git a/src/third_party/wiredtiger/bench/workgen/workgen_int.h b/src/third_party/wiredtiger/bench/workgen/workgen_int.h index c38f709efa1..dbcde8472b5 100644 --- a/src/third_party/wiredtiger/bench/workgen/workgen_int.h +++ b/src/third_party/wiredtiger/bench/workgen/workgen_int.h @@ -176,6 +176,48 @@ struct ContextInternal { int create_all(); }; +struct OperationInternal { +#define WORKGEN_OP_REOPEN 0x0001 // reopen cursor for each op + uint32_t _flags; + + OperationInternal() : _flags(0) {} + OperationInternal(const OperationInternal &other) : _flags(other._flags) {} + virtual ~OperationInternal() {} + virtual void parse_config(const std::string &config) {} + virtual int run(ThreadRunner *runner, WT_SESSION *session) { + (void)runner; (void)session; return (0); } +}; + +struct CheckpointOperationInternal : OperationInternal { + CheckpointOperationInternal() : OperationInternal() {} + CheckpointOperationInternal(const CheckpointOperationInternal &other) {} + virtual int run(ThreadRunner *runner, WT_SESSION *session); +}; + +struct TableOperationInternal : OperationInternal { + uint_t _keysize; // derived from Key._size and Table.options.key_size + uint_t _valuesize; + uint_t _keymax; + uint_t _valuemax; + + TableOperationInternal() : OperationInternal(), _keysize(0), _valuesize(0), + _keymax(0),_valuemax(0) {} + TableOperationInternal(const TableOperationInternal &other) : + _keysize(other._keysize), _valuesize(other._valuesize), + _keymax(other._keymax), _valuemax(other._valuemax) {} + virtual void parse_config(const std::string &config); +}; + +struct SleepOperationInternal : OperationInternal { + float _sleepvalue; + + SleepOperationInternal() : OperationInternal(), _sleepvalue(0) {} + SleepOperationInternal(const SleepOperationInternal &other) : + _sleepvalue(other._sleepvalue) {} + virtual void parse_config(const std::string &config); + virtual int run(ThreadRunner *runner, WT_SESSION *session); +}; + struct TableInternal { tint_t _tint; uint32_t _context_count; diff --git a/src/third_party/wiredtiger/bench/workgen/wtperf.py b/src/third_party/wiredtiger/bench/workgen/wtperf.py index 9da8c37fd3a..b059b31f8db 100644..100755 --- a/src/third_party/wiredtiger/bench/workgen/wtperf.py +++ b/src/third_party/wiredtiger/bench/workgen/wtperf.py @@ -76,7 +76,8 @@ class Translator: self.error(msg) raise TranslateException(errtype) - supported_opt_list = [ 'close_conn', 'compression', 'compact', + supported_opt_list = [ 'checkpoint_interval', 'checkpoint_threads', + 'close_conn', 'compact', 'compression', 'conn_config', 'create', 'icount', 'key_sz', 'log_like_table', 'pareto', 'populate_ops_per_txn', 'populate_threads', @@ -219,13 +220,14 @@ class Translator: str(factor) + ' to compensate for log_like operations.\n' return (new_throttle, comment) - def parse_threads(self, threads_config): + def parse_threads(self, threads_config, checkpoint_threads): opts = self.options tdecls = '' tlist = self.split_config_parens(threads_config) table_count = self.get_int_opt('table_count', 1) log_like_table = self.get_boolean_opt('log_like_table', False) txn_config = self.get_string_opt('transaction_config', '') + checkpoint_interval = self.get_int_opt('checkpoint_interval', 120) run_ops = self.get_int_opt('run_ops', -1) if log_like_table: tdecls += 'log_name = "table:log"\n' @@ -317,6 +319,18 @@ class Translator: tnames += str(topts.count) + ' * ' tnames += thread_name + ' + ' + if checkpoint_threads != 0: + thread_name = 'checkpoint_thread' + + tdecls += 'ops = Operation(Operation.OP_SLEEP, "' + \ + str(checkpoint_interval) + \ + '") + \\\n Operation(Operation.OP_CHECKPOINT, "")\n' + tdecls += thread_name + ' = Thread(ops)\n' + tdecls += '\n' + if checkpoint_threads > 1: + tnames += str(checkpoint_threads) + ' * ' + tnames += thread_name + ' + ' + tnames = tnames.rstrip(' +') return (tdecls, tnames) @@ -552,8 +566,10 @@ class Translator: s += self.translate_populate() thread_config = self.get_string_opt('threads', '') - if thread_config != '': - (t_create, t_var) = self.parse_threads(thread_config) + checkpoint_threads = self.get_int_opt('checkpoint_threads', 0) + if thread_config != '' or checkpoint_threads != 0: + (t_create, t_var) = self.parse_threads(thread_config, + checkpoint_threads) s += '\n' + t_create if reopen_connection: s += '\n# reopen the connection\n' diff --git a/src/third_party/wiredtiger/bench/wtperf/runners/checkpoint-latency.wtperf b/src/third_party/wiredtiger/bench/wtperf/runners/checkpoint-latency.wtperf new file mode 100644 index 00000000000..80a579cc818 --- /dev/null +++ b/src/third_party/wiredtiger/bench/wtperf/runners/checkpoint-latency.wtperf @@ -0,0 +1,25 @@ +# A stress configuration to create checkpoints while doing a mix of inserts +# and reads. +conn_config="cache_size=1200MB,eviction=(threads_max=8),log=(enabled=false)" +table_config="leaf_page_max=32k,internal_page_max=16k,allocation_size=4k,split_pct=90,type=file" +# Enough data for 10x cache. 200k records sized 60k = 12G +# tables +checkpoint_interval=60 +checkpoint_threads=1 +create=true +close_conn=false +icount=200000 +log_like_table=true +populate_threads=4 +report_interval=1 +# Run for a longer duration to ensure checkpoints are completing. +run_time=1200 +sample_interval=1 +sample_rate=1 +# MongoDB always has multiple tables, and checkpoints behave differently when +# there is more than a single table. +table_count=10 +threads=((count=8,inserts=1,throttle=125),(count=8,reads=1,throttle=500)) +value_sz=60000 +# Wait for the throughput to stabilize +warmup=120 diff --git a/src/third_party/wiredtiger/dist/s_string.ok b/src/third_party/wiredtiger/dist/s_string.ok index ed5f27cdf11..247ad261085 100644 --- a/src/third_party/wiredtiger/dist/s_string.ok +++ b/src/third_party/wiredtiger/dist/s_string.ok @@ -265,6 +265,7 @@ NEEDVALUE NOLL NOLOCK NONINFRINGEMENT +NOOP NOTFOUND NOTREACHED NOVALUE diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 553ff5a7b55..813d52dea11 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -1,5 +1,5 @@ { - "commit": "e7d742daa2d2500cd94a7061f754a1d0c4aa963c", + "commit": "d235e0e71ef84c3f9d5a870f08feeff9a7c5581e", "github": "wiredtiger/wiredtiger.git", "vendor": "wiredtiger", "branch": "mongodb-4.2" diff --git a/src/third_party/wiredtiger/src/btree/bt_cursor.c b/src/third_party/wiredtiger/src/btree/bt_cursor.c index aa2aa24a7a9..24264ca44a6 100644 --- a/src/third_party/wiredtiger/src/btree/bt_cursor.c +++ b/src/third_party/wiredtiger/src/btree/bt_cursor.c @@ -1014,52 +1014,27 @@ err: if (ret == WT_RESTART) { * Remove a record from the tree. */ int -__wt_btcur_remove(WT_CURSOR_BTREE *cbt) +__wt_btcur_remove(WT_CURSOR_BTREE *cbt, bool positioned) { - enum { NO_POSITION, POSITIONED, SEARCH_POSITION } positioned; WT_BTREE *btree; WT_CURFILE_STATE state; WT_CURSOR *cursor; WT_DECL_RET; WT_SESSION_IMPL *session; uint64_t yield_count, sleep_usecs; - bool iterating, valid; + bool iterating, searched, valid; btree = cbt->btree; cursor = &cbt->iface; session = (WT_SESSION_IMPL *)cursor->session; yield_count = sleep_usecs = 0; iterating = F_ISSET(cbt, WT_CBT_ITERATE_NEXT | WT_CBT_ITERATE_PREV); + searched = false; WT_STAT_CONN_INCR(session, cursor_remove); WT_STAT_DATA_INCR(session, cursor_remove); WT_STAT_DATA_INCRV(session, cursor_remove_bytes, cursor->key.size); - /* - * WT_CURSOR.remove has a unique semantic, the cursor stays positioned - * if it starts positioned, otherwise clear the cursor on completion. - * - * However, if we unpin the page (because the page is in WT_REF_LIMBO or - * it was selected for forcible eviction), and every item on the page is - * deleted, eviction can delete the page and our subsequent search will - * re-instantiate an empty page for us, with no key/value pairs. Cursor - * remove will search that page and return not-found, which is OK unless - * cursor-overwrite is configured (which causes cursor remove to return - * success even if there's no item to delete). In that case, we're - * supposed to return a positioned cursor, but there's nothing to which - * we can position, and we'll fail attempting to point the cursor at the - * key on the page to satisfy the positioned requirement. - * - * Do the best we can: If we start with a positioned cursor, and we let - * go of our pinned page, reset our state to use the search position, - * that is, use a successful search to return to a "positioned" state. - * If we start with a positioned cursor, let go of our pinned page, and - * the search fails, leave the cursor's key set so the cursor appears - * positioned to the application. - */ - positioned = - F_ISSET(cursor, WT_CURSTD_KEY_INT) ? POSITIONED : NO_POSITION; - /* Save the cursor state. */ __cursor_state_save(cursor, &state); @@ -1103,36 +1078,41 @@ __wt_btcur_remove(WT_CURSOR_BTREE *cbt) goto err; } - /* - * The pinned page goes away if we do a search, including as a result of - * a restart. Get a local copy of any pinned key and re-save the cursor - * state: we may retry but eventually fail. - * +retry: /* * Note these steps must be repeatable, we'll continue to take this path * as long as we encounter WT_RESTART. + * + * Any pinned page goes away if we do a search, including as a result of + * a restart. Get a local copy of any pinned key and re-save the cursor + * state: we may retry but eventually fail. */ -retry: if (positioned == POSITIONED) - positioned = SEARCH_POSITION; WT_ERR(__cursor_localkey(cursor)); __cursor_state_save(cursor, &state); + searched = true; WT_ERR(__cursor_func_init(cbt, true)); if (btree->type == BTREE_ROW) { - WT_ERR(__cursor_row_search(session, cbt, NULL, false)); + ret = __cursor_row_search(session, cbt, NULL, false); + if (ret == WT_NOTFOUND) + goto search_notfound; + WT_ERR(ret); /* Check whether an update would conflict. */ WT_ERR(__curfile_update_check(cbt)); if (cbt->compare != 0) - WT_ERR(WT_NOTFOUND); + goto search_notfound; WT_ERR(__wt_cursor_valid(cbt, NULL, &valid)); if (!valid) - WT_ERR(WT_NOTFOUND); + goto search_notfound; ret = __cursor_row_modify(session, cbt, WT_UPDATE_TOMBSTONE); } else { - WT_ERR(__cursor_col_search(session, cbt, NULL)); + ret = __cursor_col_search(session, cbt, NULL); + if (ret == WT_NOTFOUND) + goto search_notfound; + WT_ERR(ret); /* * If we find a matching record, check whether an update would @@ -1147,7 +1127,7 @@ retry: if (positioned == POSITIONED) WT_ERR(__wt_cursor_valid(cbt, NULL, &valid)); if (cbt->compare != 0 || !valid) { if (!__cursor_fix_implicit(btree, cbt)) - WT_ERR(WT_NOTFOUND); + goto search_notfound; /* * Creating a record past the end of the tree in a * fixed-length column-store implicitly fills the @@ -1170,58 +1150,57 @@ err: if (ret == WT_RESTART) { } if (ret == 0) { -done: switch (positioned) { - case NO_POSITION: - /* - * Never positioned and we leave it that way, clear any - * key and reset the cursor. - */ + /* + * If positioned originally, but we had to do a search, acquire + * a position so we can return success. + * + * If not positioned originally, leave it that way, clear any + * key and reset the cursor. + */ + if (positioned) { + if (searched) + WT_TRET(__wt_key_return(session, cbt)); + } else { F_CLR(cursor, WT_CURSTD_KEY_SET); WT_TRET(__cursor_reset(cbt)); - break; - case POSITIONED: - /* - * Positioned and we used the pinned page, leave the key - * alone, whatever it is. - */ - break; - case SEARCH_POSITION: - /* - * Positioned and we did a search anyway, get a key to - * return. - */ - WT_TRET(__wt_key_return(session, cbt)); - break; } - } - if (ret != 0) { - WT_TRET(__cursor_reset(cbt)); - __cursor_state_restore(cursor, &state); + /* + * Check the return status again as we might have encountered an + * error setting the return key or resetting the cursor after an + * otherwise successful remove. + */ + if (ret != 0) { + WT_TRET(__cursor_reset(cbt)); + __cursor_state_restore(cursor, &state); + } + } else { + /* + * If the cursor is configured for overwrite and search returned + * not-found, that is what we want, try to return success. We + * can do that as long as it's not an iterating or positioned + * cursor. (Iterating or positioned cursors would have been + * forced to give up any pinned page, and when the search failed + * we've lost the cursor position. Since no subsequent iteration + * can succeed, we cannot return success.) + */ + if (0) { +search_notfound: ret = WT_NOTFOUND; + if (!iterating && !positioned && + F_ISSET(cursor, WT_CURSTD_OVERWRITE)) + ret = 0; + } /* - * If the record isn't found and the cursor is configured for - * overwrite, that is what we want, try to return success. - * - * We set the return to 0 after testing for success, the clause - * above dealing with the cursor position is only correct if we - * were successful. If search failed after positioned is set to - * SEARCH_POSITION, we cannot return a key. The only action to - * take is to set the cursor to its original key, which we just - * did. - * - * Finally, if an iterating or positioned cursor was forced to - * give up its pinned page and then a search failed, we've - * lost our cursor position. Since no subsequent iteration can - * succeed, we cannot return success. + * Reset the cursor and restore the original cursor key: done + * after clearing the return value in the clause immediately + * above so we don't lose an error value if cursor reset fails. */ - if (ret == WT_NOTFOUND && - F_ISSET(cursor, WT_CURSTD_OVERWRITE) && - !iterating && positioned == NO_POSITION) - ret = 0; + WT_TRET(__cursor_reset(cbt)); + __cursor_state_restore(cursor, &state); } - /* +done: /* * Upper level cursor removes don't expect the cursor value to be set * after a successful remove (and check in diagnostic mode). Error * handling may have converted failure to a success, do a final check. diff --git a/src/third_party/wiredtiger/src/btree/bt_read.c b/src/third_party/wiredtiger/src/btree/bt_read.c index 0d0cf17762c..7229c87df04 100644 --- a/src/third_party/wiredtiger/src/btree/bt_read.c +++ b/src/third_party/wiredtiger/src/btree/bt_read.c @@ -113,7 +113,7 @@ __las_page_instantiate_verbose(WT_SESSION_IMPL *session, uint64_t las_pageid) * Instantiate lookaside update records in a recently read page. */ static int -__las_page_instantiate(WT_SESSION_IMPL *session, WT_REF *ref) +__las_page_instantiate(WT_SESSION_IMPL *session, WT_REF *ref, bool *preparedp) { WT_CACHE *cache; WT_CURSOR *cursor; @@ -166,6 +166,7 @@ __las_page_instantiate(WT_SESSION_IMPL *session, WT_REF *ref) __wt_readlock(session, &cache->las_sweepwalk_lock); WT_PUBLISH(cache->las_reader, false); locked = true; + *preparedp = false; for (ret = __wt_las_cursor_position(cursor, las_pageid); ret == 0; ret = cursor->next(cursor)) { @@ -188,6 +189,8 @@ __las_page_instantiate(WT_SESSION_IMPL *session, WT_REF *ref) total_incr += incr; upd->txnid = las_txnid; upd->prepare_state = prepare_state; + if (prepare_state == WT_PREPARE_INPROGRESS) + *preparedp = true; #ifdef HAVE_TIMESTAMPS WT_ASSERT(session, las_timestamp.size == WT_TIMESTAMP_SIZE); memcpy(&upd->timestamp, las_timestamp.data, las_timestamp.size); @@ -374,8 +377,8 @@ __evict_force_check(WT_SESSION_IMPL *session, WT_REF *ref) * page access. */ static inline int -__page_read_lookaside(WT_SESSION_IMPL *session, - WT_REF *ref, uint32_t previous_state, uint32_t *final_statep) +__page_read_lookaside(WT_SESSION_IMPL *session, WT_REF *ref, + uint32_t previous_state, uint32_t *final_statep, bool *preparedp) { /* * Reading a lookaside ref for the first time, and not requiring the @@ -400,7 +403,7 @@ __page_read_lookaside(WT_SESSION_IMPL *session, cache_read_lookaside_delay_checkpoint); } - WT_RET(__las_page_instantiate(session, ref)); + WT_RET(__las_page_instantiate(session, ref, preparedp)); ref->page_las->eviction_to_lookaside = false; return (0); } @@ -419,7 +422,7 @@ __page_read(WT_SESSION_IMPL *session, WT_REF *ref, uint32_t flags) uint64_t time_start, time_stop; uint32_t page_flags, final_state, new_state, previous_state; const uint8_t *addr; - bool timer; + bool prepared, timer; time_start = time_stop = 0; @@ -517,6 +520,7 @@ __page_read(WT_SESSION_IMPL *session, WT_REF *ref, uint32_t flags) F_ISSET(ref->page->dsk, WT_PAGE_LAS_UPDATE)); skip_read: + prepared = false; switch (previous_state) { case WT_REF_DELETED: /* @@ -526,7 +530,7 @@ skip_read: * then apply the delete. */ if (ref->page_las != NULL) { - WT_ERR(__las_page_instantiate(session, ref)); + WT_ERR(__las_page_instantiate(session, ref, &prepared)); ref->page_las->eviction_to_lookaside = false; } @@ -536,26 +540,26 @@ skip_read: case WT_REF_LIMBO: case WT_REF_LOOKASIDE: WT_ERR(__page_read_lookaside( - session, ref, previous_state, &final_state)); + session, ref, previous_state, &final_state, &prepared)); break; } /* * Once the page is instantiated, we no longer need the history in * lookaside. We leave the lookaside sweep thread to do most cleanup, - * but it can only remove keys that skew newest (if there are entries - * in the lookaside newer than the page, they need to be read back into - * cache or they will be lost). + * but it can only remove committed updates and keys that skew newest + * (if there are entries in the lookaside newer than the page, they need + * to be read back into cache or they will be lost). * - * There is no reason for the lookaside remove should fail, but ignore - * it if for some reason it fails, we've got a valid page. + * Prepared updates can not be removed by the lookaside sweep, remove + * them as we read the page back in memory. * * Don't free WT_REF.page_las, there may be concurrent readers. */ if (final_state == WT_REF_MEM && - ref->page_las != NULL && !ref->page_las->skew_newest) - WT_IGNORE_RET(__wt_las_remove_block( - session, ref->page_las->las_pageid, false)); + ref->page_las != NULL && (prepared || !ref->page_las->skew_newest)) + WT_ERR(__wt_las_remove_block( + session, ref->page_las->las_pageid)); WT_PUBLISH(ref->state, final_state); return (ret); diff --git a/src/third_party/wiredtiger/src/cache/cache_las.c b/src/third_party/wiredtiger/src/cache/cache_las.c index f99bb0bbd9d..3ba2f3b3b06 100644 --- a/src/third_party/wiredtiger/src/cache/cache_las.c +++ b/src/third_party/wiredtiger/src/cache/cache_las.c @@ -859,8 +859,7 @@ __wt_las_cursor_position(WT_CURSOR *cursor, uint64_t pageid) * Remove all records for a given page from the lookaside table. */ int -__wt_las_remove_block( - WT_SESSION_IMPL *session, uint64_t pageid, bool lock_wait) +__wt_las_remove_block(WT_SESSION_IMPL *session, uint64_t pageid) { WT_CONNECTION_IMPL *conn; WT_CURSOR *cursor; @@ -878,8 +877,7 @@ __wt_las_remove_block( */ __wt_las_cursor(session, &cursor, &session_flags); - if ((ret = __las_remove_block( - cursor, pageid, lock_wait, &remove_cnt)) == 0) + if ((ret = __las_remove_block(cursor, pageid, true, &remove_cnt)) == 0) (void)__wt_atomic_add64( &conn->cache->las_remove_count, remove_cnt); diff --git a/src/third_party/wiredtiger/src/cursor/cur_file.c b/src/third_party/wiredtiger/src/cursor/cur_file.c index 80093fceb02..e8922cd86a7 100644 --- a/src/third_party/wiredtiger/src/cursor/cur_file.c +++ b/src/third_party/wiredtiger/src/cursor/cur_file.c @@ -389,17 +389,32 @@ __curfile_remove(WT_CURSOR *cursor) WT_DECL_RET; WT_SESSION_IMPL *session; uint64_t time_start, time_stop; + bool positioned; + + /* + * WT_CURSOR.remove has a unique semantic, the cursor stays positioned + * if it starts positioned, otherwise clear the cursor on completion. + * Track if starting with a positioned cursor and pass that information + * into the underlying Btree remove function so it tries to maintain a + * position in the tree. This is complicated by the loop in this code + * that restarts operations if they return prepare-conflict or restart. + */ + positioned = F_ISSET(cursor, WT_CURSTD_KEY_INT); cbt = (WT_CURSOR_BTREE *)cursor; CURSOR_REMOVE_API_CALL(cursor, session, cbt->btree); WT_ERR(__cursor_checkkey(cursor)); time_start = __wt_clock(session); - WT_ERR(__wt_btcur_remove(cbt)); + WT_ERR(__wt_btcur_remove(cbt, positioned)); time_stop = __wt_clock(session); __wt_stat_usecs_hist_incr_opwrite(session, WT_CLOCKDIFF_US(time_stop, time_start)); + /* If we've lost an initial position, we must fail. */ + if (positioned && !F_ISSET(cursor, WT_CURSTD_KEY_INT)) + WT_ERR(WT_ROLLBACK); + /* * Remove with a search-key is fire-and-forget, no position and no key. * Remove starting from a position maintains the position and a key, diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h index 05a2a2f3f52..9a614dc2c19 100644 --- a/src/third_party/wiredtiger/src/include/extern.h +++ b/src/third_party/wiredtiger/src/include/extern.h @@ -107,7 +107,7 @@ extern int __wt_btcur_search(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((w extern int __wt_btcur_search_near(WT_CURSOR_BTREE *cbt, int *exactp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_btcur_insert(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_btcur_insert_check(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); -extern int __wt_btcur_remove(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_btcur_remove(WT_CURSOR_BTREE *cbt, bool positioned) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_btcur_modify(WT_CURSOR_BTREE *cbt, WT_MODIFY *entries, int nentries) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_btcur_reserve(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_btcur_update(WT_CURSOR_BTREE *cbt) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); @@ -213,7 +213,7 @@ extern bool __wt_las_page_skip_locked(WT_SESSION_IMPL *session, WT_REF *ref) WT_ extern bool __wt_las_page_skip(WT_SESSION_IMPL *session, WT_REF *ref) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_las_insert_block(WT_CURSOR *cursor, WT_BTREE *btree, WT_PAGE *page, WT_MULTI *multi, WT_ITEM *key) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_las_cursor_position(WT_CURSOR *cursor, uint64_t pageid) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); -extern int __wt_las_remove_block(WT_SESSION_IMPL *session, uint64_t pageid, bool lock_wait) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_las_remove_block(WT_SESSION_IMPL *session, uint64_t pageid) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_las_save_dropped(WT_SESSION_IMPL *session) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_las_sweep(WT_SESSION_IMPL *session) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern uint32_t __wt_checksum_sw(const void *chunk, size_t len); diff --git a/src/third_party/wiredtiger/src/include/misc.h b/src/third_party/wiredtiger/src/include/misc.h index 8c6af3ca14c..9e2108a6be2 100644 --- a/src/third_party/wiredtiger/src/include/misc.h +++ b/src/third_party/wiredtiger/src/include/misc.h @@ -390,10 +390,3 @@ union __wt_rand_state { session, buf, (buf)->size + __len + 1)); \ } \ } while (0) - -/* - * HAVE_LONG_RUNNING_PREPARE - * To enable functionality of evicting prepared transactions using - * cache overflow mechanism. - */ -#undef HAVE_LONG_RUNNING_PREPARE diff --git a/src/third_party/wiredtiger/src/include/txn.i b/src/third_party/wiredtiger/src/include/txn.i index 0b7e9ae1aa2..81646af5217 100644 --- a/src/third_party/wiredtiger/src/include/txn.i +++ b/src/third_party/wiredtiger/src/include/txn.i @@ -249,7 +249,6 @@ __wt_txn_op_set_key(WT_SESSION_IMPL *session, const WT_ITEM *key) WT_IS_METADATA(op->btree->dhandle)) return (0); -#ifdef HAVE_LONG_RUNNING_PREPARE WT_ASSERT(session, op->type == WT_TXN_OP_BASIC_ROW || op->type == WT_TXN_OP_INMEM_ROW); @@ -263,10 +262,6 @@ __wt_txn_op_set_key(WT_SESSION_IMPL *session, const WT_ITEM *key) * prepared. */ return (__wt_buf_set(session, &op->u.op_row.key, key->data, key->size)); -#else - WT_UNUSED(key); - return (0); -#endif } /* @@ -359,17 +354,25 @@ __wt_txn_resolve_prepared_op( * case. */ WT_ASSERT(session, upd != NULL || txn->multi_update_count != 0); - if (upd == NULL) + + /* + * We track the update count only for commit, but not for rollback, as + * our tracking is based on transaction id, and in case of rollback, we + * set it to aborted. + */ + if (upd == NULL && commit) --txn->multi_update_count; #endif - op->u.op_upd = upd; WT_STAT_CONN_INCR(session, txn_prepared_updates_resolved); for (; upd != NULL; upd = upd->next) { if (upd->txnid != txn->id) continue; + if (op->u.op_upd == NULL) + op->u.op_upd = upd; + if (!commit) { upd->txnid = WT_TXN_ABORTED; continue; diff --git a/src/third_party/wiredtiger/src/reconcile/rec_write.c b/src/third_party/wiredtiger/src/reconcile/rec_write.c index 2b70db8443f..afb97d115fc 100644 --- a/src/third_party/wiredtiger/src/reconcile/rec_write.c +++ b/src/third_party/wiredtiger/src/reconcile/rec_write.c @@ -1352,21 +1352,6 @@ __rec_txn_read(WT_SESSION_IMPL *session, WT_RECONCILE *r, !__txn_visible_id(session, txnid)) uncommitted = r->update_uncommitted = true; - /* - * TODO: - * The following portion of code under #ifdef is there - * to temporarily disable lookaside eviction of the - * prepared updates. Once we have all the pieces put - * together to enable the feature, remove this temporary - * code. - */ -#ifndef HAVE_LONG_RUNNING_PREPARE - if (prepared) { - prepared = false; - uncommitted = r->update_uncommitted = true; - } -#endif - if (prepared || uncommitted) continue; } @@ -6286,8 +6271,7 @@ __rec_las_wrapup_err(WT_SESSION_IMPL *session, WT_RECONCILE *r) for (multi = r->multi, i = 0; i < r->multi_next; ++multi, ++i) if (multi->supd != NULL && (las_pageid = multi->page_las.las_pageid) != 0) - WT_TRET( - __wt_las_remove_block(session, las_pageid, true)); + WT_TRET(__wt_las_remove_block(session, las_pageid)); return (ret); } diff --git a/src/third_party/wiredtiger/src/txn/txn.c b/src/third_party/wiredtiger/src/txn/txn.c index 1add958e226..6f8c0969f40 100644 --- a/src/third_party/wiredtiger/src/txn/txn.c +++ b/src/third_party/wiredtiger/src/txn/txn.c @@ -828,11 +828,7 @@ __wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[]) * Need to resolve indirect references of transaction * operation, in case of prepared transaction. */ -#ifdef HAVE_LONG_RUNNING_PREPARE if (!prepare) { -#else - if (1) { -#endif /* * Switch reserved operations to abort to * simplify obsolete update list truncation. @@ -1033,9 +1029,7 @@ __wt_txn_prepare(WT_SESSION_IMPL *session, const char *cfg[]) __wt_timestamp_set(&upd->timestamp, &ts); WT_PUBLISH(upd->prepare_state, WT_PREPARE_INPROGRESS); -#ifdef HAVE_LONG_RUNNING_PREPARE op->u.op_upd = NULL; -#endif WT_STAT_CONN_INCR(session, txn_prepared_updates_count); break; case WT_TXN_OP_REF_DELETE: @@ -1121,14 +1115,10 @@ __wt_txn_rollback(WT_SESSION_IMPL *session, const char *cfg[]) * Need to resolve indirect references of transaction * operation, in case of prepared transaction. */ -#ifdef HAVE_LONG_RUNNING_PREPARE if (F_ISSET(txn, WT_TXN_PREPARE)) WT_RET(__wt_txn_resolve_prepared_op( session, op, false)); else { -#else - { -#endif WT_ASSERT(session, upd->txnid == txn->id || upd->txnid == WT_TXN_ABORTED); upd->txnid = WT_TXN_ABORTED; diff --git a/src/third_party/wiredtiger/test/csuite/timestamp_abort/main.c b/src/third_party/wiredtiger/test/csuite/timestamp_abort/main.c index f5f34fe3505..ae292ffa5b4 100644 --- a/src/third_party/wiredtiger/test/csuite/timestamp_abort/main.c +++ b/src/third_party/wiredtiger/test/csuite/timestamp_abort/main.c @@ -67,7 +67,6 @@ static char home[1024]; /* Program working dir */ #define PREPARE_FREQ 5 #define PREPARE_YIELD (PREPARE_FREQ * 10) #define RECORDS_FILE "records-%" PRIu32 -#define STABLE_PERIOD 100 #define SESSION_MAX MAX_TH + 3 /* Include program worker threads */ static const char * table_pfx = "table"; @@ -79,7 +78,6 @@ static const char * const ckpt_file = "checkpoint_done"; static bool compat, inmem, use_ts; static volatile uint64_t global_ts = 1; -static volatile uint64_t th_ts[MAX_TH]; #define ENV_CONFIG_COMPAT ",compatibility=(release=\"2.9\")" #define ENV_CONFIG_DEF \ @@ -103,6 +101,9 @@ typedef struct { uint32_t info; } THREAD_DATA; +/* Lock for transactional ops that set or query a timestamp. */ +static pthread_rwlock_t ts_lock; + static void handler(int) WT_GCC_FUNC_DECL_ATTRIBUTE((noreturn)); static void usage(void) @@ -122,41 +123,27 @@ usage(void) static WT_THREAD_RET thread_ts_run(void *arg) { + WT_DECL_RET; WT_SESSION *session; THREAD_DATA *td; - uint64_t i, last_ts, oldest_ts, this_ts; - char tscfg[64]; + char tscfg[64], ts_buf[WT_TIMESTAMP_SIZE]; td = (THREAD_DATA *)arg; - last_ts = 0; testutil_check(td->conn->open_session(td->conn, NULL, NULL, &session)); - /* - * Every N records we will record our stable timestamp into the stable - * table. That will define our threshold where we expect to find records - * after recovery. - */ + /* Update the oldest timestamp every 1 millisecond. */ for (;;) { - oldest_ts = UINT64_MAX; /* - * For the timestamp thread, the info field contains the number - * of worker threads. + * We get the last committed timestamp periodically in order to + * update the oldest timestamp, that requires locking out + * transactional ops that set or query a timestamp. */ - for (i = 0; i < td->info; ++i) { - /* - * We need to let all threads get started, so if we find - * any thread still with a zero timestamp we go to - * sleep. - */ - this_ts = th_ts[i]; - if (this_ts == 0) - goto ts_wait; - else if (this_ts < oldest_ts) - oldest_ts = this_ts; - } - - if (oldest_ts != UINT64_MAX && - oldest_ts - last_ts > STABLE_PERIOD) { + testutil_check(pthread_rwlock_wrlock(&ts_lock)); + ret = td->conn->query_timestamp( + td->conn, ts_buf, "get=all_committed"); + testutil_check(pthread_rwlock_unlock(&ts_lock)); + testutil_assert(ret == 0 || ret == WT_NOTFOUND); + if (ret == 0) { /* * Set both the oldest and stable timestamp so that we * don't need to maintain read availability at older @@ -164,14 +151,12 @@ thread_ts_run(void *arg) */ testutil_check(__wt_snprintf( tscfg, sizeof(tscfg), - "oldest_timestamp=%" PRIx64 - ",stable_timestamp=%" PRIx64, - oldest_ts, oldest_ts)); + "oldest_timestamp=%s,stable_timestamp=%s", + ts_buf, ts_buf)); testutil_check( td->conn->set_timestamp(td->conn, tscfg)); - last_ts = oldest_ts; - } else -ts_wait: __wt_sleep(0, 1000); + } + __wt_sleep(0, 1000); } /* NOTREACHED */ } @@ -242,9 +227,9 @@ thread_run(void *arg) WT_CURSOR *cur_coll, *cur_local, *cur_oplog; WT_ITEM data; WT_RAND_STATE rnd; - WT_SESSION *oplog_session, *session; + WT_SESSION *prepared_session, *session; THREAD_DATA *td; - uint64_t i, stable_ts; + uint64_t i, active_ts; char cbuf[MAX_VAL], lbuf[MAX_VAL], obuf[MAX_VAL]; char kname[64], tscfg[64], uri[128]; bool use_prep; @@ -255,6 +240,7 @@ thread_run(void *arg) memset(obuf, 0, sizeof(obuf)); memset(kname, 0, sizeof(kname)); + prepared_session = NULL; td = (THREAD_DATA *)arg; /* * Set up the separate file for checking. @@ -277,64 +263,73 @@ thread_run(void *arg) use_prep = (use_ts && td->info % 10 == 0) ? true : false; /* - * We may have two sessions so that the oplog session can have its own - * transaction in parallel with the collection session for threads - * that are going to be using prepared transactions. We need this - * because prepared transactions cannot have any operations that modify - * a table that is logged. But we also want to test mixed logged and - * not-logged transactions. + * For the prepared case we have two sessions so that the oplog session + * can have its own transaction in parallel with the collection session + * We need this because prepared transactions cannot have any operations + * that modify a table that is logged. But we also want to test mixed + * logged and not-logged transactions. */ testutil_check(td->conn->open_session(td->conn, NULL, NULL, &session)); + if (use_prep) + testutil_check(td->conn->open_session( + td->conn, NULL, NULL, &prepared_session)); /* * Open a cursor to each table. */ testutil_check(__wt_snprintf( uri, sizeof(uri), "%s:%s", table_pfx, uri_collection)); - testutil_check(session->open_cursor(session, - uri, NULL, NULL, &cur_coll)); + if (use_prep) + testutil_check(prepared_session->open_cursor(prepared_session, + uri, NULL, NULL, &cur_coll)); + else + testutil_check(session->open_cursor(session, + uri, NULL, NULL, &cur_coll)); testutil_check(__wt_snprintf( uri, sizeof(uri), "%s:%s", table_pfx, uri_local)); - testutil_check(session->open_cursor(session, - uri, NULL, NULL, &cur_local)); + if (use_prep) + testutil_check(prepared_session->open_cursor(prepared_session, + uri, NULL, NULL, &cur_local)); + else + testutil_check(session->open_cursor(session, + uri, NULL, NULL, &cur_local)); testutil_check(__wt_snprintf( uri, sizeof(uri), "%s:%s", table_pfx, uri_oplog)); - oplog_session = NULL; - if (use_prep) { - testutil_check(td->conn->open_session( - td->conn, NULL, NULL, &oplog_session)); - testutil_check(session->open_cursor(oplog_session, - uri, NULL, NULL, &cur_oplog)); - } else - testutil_check(session->open_cursor(session, - uri, NULL, NULL, &cur_oplog)); + testutil_check(session->open_cursor(session, + uri, NULL, NULL, &cur_oplog)); /* * Write our portion of the key space until we're killed. */ printf("Thread %" PRIu32 " starts at %" PRIu64 "\n", td->info, td->start); - stable_ts = 0; + active_ts = 0; for (i = td->start;; ++i) { - if (use_ts) - stable_ts = __wt_atomic_addv64(&global_ts, 1); testutil_check(__wt_snprintf( kname, sizeof(kname), "%" PRIu64, i)); testutil_check(session->begin_transaction(session, NULL)); if (use_prep) - testutil_check(oplog_session->begin_transaction( - oplog_session, NULL)); - /* - * If not using prepared transactions set the timestamp now - * before performing the operation. If we are using prepared - * transactions, it must be set after the prepare. - */ - if (use_ts && !use_prep) { - testutil_check(__wt_snprintf(tscfg, sizeof(tscfg), - "commit_timestamp=%" PRIx64, stable_ts)); - testutil_check( - session->timestamp_transaction(session, tscfg)); + testutil_check(prepared_session->begin_transaction( + prepared_session, NULL)); + + if (use_ts) { + testutil_check(pthread_rwlock_rdlock(&ts_lock)); + active_ts = __wt_atomic_addv64(&global_ts, 1); + testutil_check(__wt_snprintf(tscfg, + sizeof(tscfg), "commit_timestamp=%" PRIx64, + active_ts)); + /* + * Set the transaction's timestamp now before performing + * the operation. If we are using prepared transactions, + * set the timestamp for the session used for oplog. The + * collection session in that case would continue to use + * this timestamp. + */ + testutil_check(session->timestamp_transaction( + session, tscfg)); + testutil_check(pthread_rwlock_unlock(&ts_lock)); } + cur_coll->set_key(cur_coll, kname); cur_local->set_key(cur_local, kname); cur_oplog->set_key(cur_oplog, kname); @@ -344,13 +339,13 @@ thread_run(void *arg) */ testutil_check(__wt_snprintf(cbuf, sizeof(cbuf), "COLL: thread:%" PRIu64 " ts:%" PRIu64 " key: %" PRIu64, - td->info, stable_ts, i)); + td->info, active_ts, i)); testutil_check(__wt_snprintf(lbuf, sizeof(lbuf), "LOCAL: thread:%" PRIu64 " ts:%" PRIu64 " key: %" PRIu64, - td->info, stable_ts, i)); + td->info, active_ts, i)); testutil_check(__wt_snprintf(obuf, sizeof(obuf), "OPLOG: thread:%" PRIu64 " ts:%" PRIu64 " key: %" PRIu64, - td->info, stable_ts, i)); + td->info, active_ts, i)); data.size = __wt_random(&rnd) % MAX_VAL; data.data = cbuf; cur_coll->set_value(cur_coll, &data); @@ -359,55 +354,31 @@ thread_run(void *arg) data.data = obuf; cur_oplog->set_value(cur_oplog, &data); testutil_check(cur_oplog->insert(cur_oplog)); - if (use_ts) { + if (use_prep) { /* * Run with prepare every once in a while. And also * yield after prepare sometimes too. This is only done - * on the regular session. + * on the collection session. */ - if (use_prep && i % PREPARE_FREQ == 0) { - testutil_check(__wt_snprintf( - tscfg, sizeof(tscfg), - "prepare_timestamp=%" PRIx64, stable_ts)); - testutil_check(session->prepare_transaction( - session, tscfg)); + if (i % PREPARE_FREQ == 0) { + testutil_check(__wt_snprintf(tscfg, + sizeof(tscfg), "prepare_timestamp=%" + PRIx64, active_ts)); + testutil_check( + prepared_session->prepare_transaction( + prepared_session, tscfg)); if (i % PREPARE_YIELD == 0) __wt_yield(); } - /* - * If we did not set the timestamp above via - * timestamp_transaction send it now on commit. - */ - if (use_ts && !use_prep) - testutil_check( - session->commit_transaction(session, NULL)); - else { - testutil_check( - __wt_snprintf(tscfg, sizeof(tscfg), - "commit_timestamp=%" PRIx64, stable_ts)); - testutil_check( - session->commit_transaction(session, - tscfg)); - } - if (use_prep) - testutil_check( - oplog_session->commit_transaction( - oplog_session, tscfg)); - /* - * Update the thread's last-committed timestamp. - * Don't let the compiler re-order this statement, - * if we were to race with the timestamp thread, it - * might see our thread update before the commit. - */ - WT_PUBLISH(th_ts[td->info], stable_ts); - } else { testutil_check( - session->commit_transaction(session, NULL)); - if (use_prep) - testutil_check( - oplog_session->commit_transaction( - oplog_session, NULL)); + __wt_snprintf(tscfg, sizeof(tscfg), + "commit_timestamp=%" PRIx64, active_ts)); + testutil_check( + prepared_session->commit_transaction( + prepared_session, tscfg)); } + testutil_check( + session->commit_transaction(session, NULL)); /* * Insert into the local table outside the timestamp txn. */ @@ -420,7 +391,7 @@ thread_run(void *arg) * Save the timestamp and key separately for checking later. */ if (fprintf(fp, - "%" PRIu64 " %" PRIu64 "\n", stable_ts, i) < 0) + "%" PRIu64 " %" PRIu64 "\n", active_ts, i) < 0) testutil_die(EIO, "fprintf"); } /* NOTREACHED */ @@ -657,6 +628,8 @@ main(int argc, char *argv[]) usage(); testutil_work_dir_from_path(home, sizeof(home), working_dir); + testutil_check(pthread_rwlock_init(&ts_lock, NULL)); + /* * If the user wants to verify they need to tell us how many threads * there were so we can find the old record files. @@ -955,6 +928,7 @@ main(int argc, char *argv[]) absent_oplog, count); fatal = true; } + testutil_check(pthread_rwlock_destroy(&ts_lock)); if (fatal) return (EXIT_FAILURE); printf("%" PRIu64 " records verified\n", count); diff --git a/src/third_party/wiredtiger/test/suite/test_prepare_lookaside01.py b/src/third_party/wiredtiger/test/suite/test_prepare_lookaside01.py index ed905cdadcd..57dc4c7a116 100644 --- a/src/third_party/wiredtiger/test/suite/test_prepare_lookaside01.py +++ b/src/third_party/wiredtiger/test/suite/test_prepare_lookaside01.py @@ -127,9 +127,7 @@ class test_prepare_lookaside01(wttest.WiredTigerTestCase): # Check if lookaside is working properly with prepare transactions. # We put prepared updates in multiple sessions so that we do not hang # because of cache being full with uncommitted updates. - # TODO: Increase the nsessions below to start testing lookaside eviction - # of prepared updates. - nsessions = 1 + nsessions = 3 nkeys = 4000 self.prepare_updates(uri, ds, nrows, nsessions, nkeys) |