From 927f7ecc749db6c83b6a5d34b0397fc004f5517c Mon Sep 17 00:00:00 2001 From: Bynn Lee Date: Fri, 17 Jul 2020 17:37:51 +0000 Subject: SERVER-49547 ephemeralForTest needs to keep track of the available history for storage transactions with open snapshots --- .../noPassthroughWithMongod/huge_multikey_index.js | 4 +- .../ephemeral_for_test_kv_engine.cpp | 77 ++++++- .../ephemeral_for_test_kv_engine.h | 28 ++- .../ephemeral_for_test_kv_engine_test.cpp | 241 +++++++++++++++++++++ .../ephemeral_for_test_recovery_unit.cpp | 28 ++- .../ephemeral_for_test_recovery_unit.h | 7 +- 6 files changed, 366 insertions(+), 19 deletions(-) diff --git a/jstests/noPassthroughWithMongod/huge_multikey_index.js b/jstests/noPassthroughWithMongod/huge_multikey_index.js index fce643eab8a..9c19c0d00a1 100644 --- a/jstests/noPassthroughWithMongod/huge_multikey_index.js +++ b/jstests/noPassthroughWithMongod/huge_multikey_index.js @@ -1,6 +1,8 @@ // https://jira.mongodb.org/browse/SERVER-4534 -// Building an index in the forground on a field with a large array and few documents in +// Building an index in the foreground on a field with a large array and few documents in // the collection used to open too many files and crash the server. +// SERVER-49547: Disabled for ephemeralForTest due to excessive memory usage +// @tags: [incompatible_with_eft] t = db.huge_multikey_index; t.drop(); diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp index 2801e7153f8..a16077f7626 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp @@ -43,10 +43,24 @@ namespace mongo { namespace ephemeral_for_test { +namespace { +static AtomicWord shuttingDown{false}; +} // namespace + +bool KVEngine::instanceExists() { + return shuttingDown.load(); +} KVEngine::KVEngine() - : mongo::KVEngine(), _visibilityManager(std::make_unique()) {} -KVEngine::~KVEngine() {} + : mongo::KVEngine(), _visibilityManager(std::make_unique()) { + _master = std::make_shared(); + _availableHistory[Timestamp(_masterVersion++)] = _master; + shuttingDown.store(false); +} + +KVEngine::~KVEngine() { + shuttingDown.store(true); +} mongo::RecoveryUnit* KVEngine::newRecoveryUnit() { return new RecoveryUnit(this, nullptr); @@ -95,11 +109,15 @@ std::unique_ptr KVEngine::getRecordStore(OperationContext* o bool KVEngine::trySwapMaster(StringStore& newMaster, uint64_t version) { stdx::lock_guard lock(_masterLock); - invariant(!newMaster.hasBranch() && !_master.hasBranch()); + invariant(!newMaster.hasBranch() && !_master->hasBranch()); if (_masterVersion != version) return false; - _master = newMaster; - _masterVersion++; + // TODO SERVER-48314: replace _masterVersion with a Timestamp of transaction. + Timestamp commitTimestamp(_masterVersion++); + auto newMasterPtr = std::make_shared(newMaster); + _availableHistory[commitTimestamp] = newMasterPtr; + _master = newMasterPtr; + _cleanHistory(lock); return true; } @@ -149,6 +167,55 @@ Status KVEngine::dropIdent(OperationContext* opCtx, mongo::RecoveryUnit* ru, Str return dropStatus; } +void KVEngine::cleanHistory() { + stdx::lock_guard lock(_masterLock); + _cleanHistory(lock); +} + +void KVEngine::_cleanHistory(WithLock) { + for (auto it = _availableHistory.cbegin(); it != _availableHistory.cend();) { + if (it->second.use_count() == 1) { + invariant(it->second.get() != _master.get()); + it = _availableHistory.erase(it); + } else { + break; + } + } + + // Check that pointer to master is not deleted. + invariant(_availableHistory.size() >= 1); +} + +Timestamp KVEngine::getOldestTimestamp() const { + stdx::lock_guard lock(_masterLock); + return _availableHistory.begin()->first; +} + +void KVEngine::setOldestTimestamp(Timestamp newOldestTimestamp, bool force) { + stdx::lock_guard lock(_masterLock); + if (newOldestTimestamp > _availableHistory.rbegin()->first) { + _availableHistory[newOldestTimestamp] = _master; + // TODO SERVER-48314: Remove when _masterVersion is no longer being used to mock commit + // timestamps. + _masterVersion = newOldestTimestamp.asULL(); + } + for (auto it = _availableHistory.cbegin(); it != _availableHistory.cend();) { + if (it->first < newOldestTimestamp) { + it = _availableHistory.erase(it); + } else { + break; + } + } + + // Check that pointer to master is not deleted. + invariant(_availableHistory.size() >= 1); +} + +std::map> KVEngine::getHistory_forTest() { + stdx::lock_guard lock(_masterLock); + return _availableHistory; +} + class EmptyRecordCursor final : public SeekableRecordCursor { public: boost::optional next() final { diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h index e5739d388f8..727c445bbe2 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h @@ -146,9 +146,9 @@ public: // Ephemeral for test Specific /** - * Returns a pair of the current version and copy of tree of the master. + * Returns a pair of the current version and a shared_ptr of tree of the master. */ - std::pair getMasterInfo() { + std::pair> getMasterInfo() { stdx::lock_guard lock(_masterLock); return std::make_pair(_masterVersion, _master); } @@ -163,6 +163,22 @@ public: return _visibilityManager.get(); } + /** + * History in the map that is older than the oldest timestamp can be removed. Additionally, if + * the tree at the oldest timestamp is no longer in use by any active transactions it can be + * cleaned up, up until the point where there's an active transaction in the map. That point + * also becomes the new oldest timestamp. + */ + void cleanHistory(); + + Timestamp getOldestTimestamp() const override; + + void setOldestTimestamp(Timestamp newOldestTimestamp, bool force) override; + + std::map> getHistory_forTest(); + + static bool instanceExists(); + private: std::shared_ptr _catalogInfo; int _cachePressureForTest = 0; @@ -171,8 +187,14 @@ private: std::unique_ptr _visibilityManager; mutable Mutex _masterLock = MONGO_MAKE_LATCH("KVEngine::_masterLock"); - StringStore _master; + std::shared_ptr _master; uint64_t _masterVersion = 0; + + void _cleanHistory(WithLock); + + // This map contains the different versions of the StringStore's referenced by their commit + // timestamps. + std::map> _availableHistory; }; } // namespace ephemeral_for_test } // namespace mongo diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine_test.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine_test.cpp index c7322bd8951..eab79c3d23f 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine_test.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine_test.cpp @@ -34,6 +34,7 @@ #include #include "mongo/base/init.h" +#include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_test_fixture.h" @@ -75,5 +76,245 @@ MONGO_INITIALIZER(RegisterEphemeralForTestKVHarnessFactory)(InitializerContext*) return Status::OK(); } +class EphemeralForTestKVEngineTest : public unittest::Test { +public: + EphemeralForTestKVEngineTest() : _helper(), _engine(_helper.getEngine()) {} + +protected: + std::unique_ptr helper; + KVHarnessHelper _helper; + KVEngine* _engine; +}; + +class OperationContextFromKVEngine : public OperationContextNoop { +public: + OperationContextFromKVEngine(KVEngine* engine) + : OperationContextNoop(engine->newRecoveryUnit()) {} +}; + + +TEST_F(EphemeralForTestKVEngineTest, AvailableHistoryUpdate) { + NamespaceString nss("a.b"); + std::string ident = "collection-1234"; + std::string record = "abcd"; + CollectionOptions defaultCollectionOptions; + + std::unique_ptr rs; + { + OperationContextFromKVEngine opCtx(_engine); + ASSERT_OK(_engine->createRecordStore(&opCtx, nss.ns(), ident, defaultCollectionOptions)); + rs = _engine->getRecordStore(&opCtx, nss.ns(), ident, defaultCollectionOptions); + ASSERT(rs); + } + + Timestamp lastMaster; + Timestamp currentMaster; + + ASSERT_EQ(1, _engine->getHistory_forTest().size()); + currentMaster = _engine->getHistory_forTest().rbegin()->first; + ASSERT_EQ(_engine->getOldestTimestamp(), currentMaster); + + { + OperationContextFromKVEngine opCtx(_engine); + WriteUnitOfWork uow(&opCtx); + StatusWith res = + rs->insertRecord(&opCtx, record.c_str(), record.length() + 1, Timestamp()); + ASSERT_OK(res.getStatus()); + uow.commit(); + } + + ASSERT_EQ(1, _engine->getHistory_forTest().size()); + lastMaster = currentMaster; + currentMaster = _engine->getHistory_forTest().rbegin()->first; + ASSERT_GT(currentMaster, lastMaster); + ASSERT_EQ(_engine->getOldestTimestamp(), currentMaster); +} + +TEST_F(EphemeralForTestKVEngineTest, PinningOldestTimestampWithReadTransaction) { + NamespaceString nss("a.b"); + std::string ident = "collection-1234"; + std::string record = "abcd"; + CollectionOptions defaultCollectionOptions; + + std::unique_ptr rs; + { + OperationContextFromKVEngine opCtx(_engine); + ASSERT_OK(_engine->createRecordStore(&opCtx, nss.ns(), ident, defaultCollectionOptions)); + rs = _engine->getRecordStore(&opCtx, nss.ns(), ident, defaultCollectionOptions); + ASSERT(rs); + } + + // _availableHistory starts off with master at Timestamp(0, 0). + ASSERT_EQ(1, _engine->getHistory_forTest().size()); + + RecordId loc; + { + OperationContextFromKVEngine opCtx(_engine); + WriteUnitOfWork uow(&opCtx); + StatusWith res = + rs->insertRecord(&opCtx, record.c_str(), record.length() + 1, Timestamp()); + ASSERT_OK(res.getStatus()); + loc = res.getValue(); + uow.commit(); + } + + OperationContextFromKVEngine opCtxRead(_engine); + RecordData rd; + ASSERT(rs->findRecord(&opCtxRead, loc, &rd)); + + { + OperationContextFromKVEngine opCtx(_engine); + WriteUnitOfWork uow(&opCtx); + StatusWith res = + rs->insertRecord(&opCtx, record.c_str(), record.length() + 1, Timestamp()); + ASSERT_OK(res.getStatus()); + uow.commit(); + } + + // Open read transaction prevents deletion of history. + ASSERT_EQ(2, _engine->getHistory_forTest().size()); + ASSERT_GT(_engine->getHistory_forTest().rbegin()->first, _engine->getOldestTimestamp()); +} + +TEST_F(EphemeralForTestKVEngineTest, SettingOldestTimestampClearsHistory) { + NamespaceString nss("a.b"); + std::string ident = "collection-1234"; + std::string record = "abcd"; + CollectionOptions defaultCollectionOptions; + + std::unique_ptr rs; + { + OperationContextFromKVEngine opCtx(_engine); + ASSERT_OK(_engine->createRecordStore(&opCtx, nss.ns(), ident, defaultCollectionOptions)); + rs = _engine->getRecordStore(&opCtx, nss.ns(), ident, defaultCollectionOptions); + ASSERT(rs); + } + + // _availableHistory starts off with master at Timestamp(0, 0). + ASSERT_EQ(1, _engine->getHistory_forTest().size()); + + RecordId loc; + { + OperationContextFromKVEngine opCtx(_engine); + WriteUnitOfWork uow(&opCtx); + StatusWith res = + rs->insertRecord(&opCtx, record.c_str(), record.length() + 1, Timestamp()); + ASSERT_OK(res.getStatus()); + loc = res.getValue(); + uow.commit(); + } + + OperationContextFromKVEngine opCtxRead(_engine); + RecordData rd; + ASSERT(rs->findRecord(&opCtxRead, loc, &rd)); + + { + OperationContextFromKVEngine opCtx(_engine); + WriteUnitOfWork uow(&opCtx); + StatusWith res = + rs->insertRecord(&opCtx, record.c_str(), record.length() + 1, Timestamp()); + ASSERT_OK(res.getStatus()); + uow.commit(); + } + + ASSERT_EQ(2, _engine->getHistory_forTest().size()); + _engine->setOldestTimestamp(_engine->getHistory_forTest().rbegin()->first, false); + ASSERT_EQ(1, _engine->getHistory_forTest().size()); +} + +TEST_F(EphemeralForTestKVEngineTest, SettingOldestTimestampToMax) { + NamespaceString nss("a.b"); + std::string ident = "collection-1234"; + std::string record = "abcd"; + CollectionOptions defaultCollectionOptions; + + std::unique_ptr rs; + { + OperationContextFromKVEngine opCtx(_engine); + ASSERT_OK(_engine->createRecordStore(&opCtx, nss.ns(), ident, defaultCollectionOptions)); + rs = _engine->getRecordStore(&opCtx, nss.ns(), ident, defaultCollectionOptions); + ASSERT(rs); + } + + { + OperationContextFromKVEngine opCtx(_engine); + WriteUnitOfWork uow(&opCtx); + StatusWith res = + rs->insertRecord(&opCtx, record.c_str(), record.length() + 1, Timestamp()); + ASSERT_OK(res.getStatus()); + uow.commit(); + } + + // Check that setting oldest to Timestamp::max() does not clear history. + ASSERT_GTE(_engine->getHistory_forTest().size(), 1); + ASSERT_LT(_engine->getHistory_forTest().rbegin()->first, Timestamp::max()); + _engine->setOldestTimestamp(Timestamp::max(), true); + ASSERT_GTE(_engine->getHistory_forTest().size(), 1); + ASSERT_EQ(Timestamp::max(), _engine->getHistory_forTest().rbegin()->first); +} + +TEST_F(EphemeralForTestKVEngineTest, CleanHistoryWithOpenTransaction) { + NamespaceString nss("a.b"); + std::string ident = "collection-1234"; + std::string record = "abcd"; + CollectionOptions defaultCollectionOptions; + + std::unique_ptr rs; + { + OperationContextFromKVEngine opCtx(_engine); + ASSERT_OK(_engine->createRecordStore(&opCtx, nss.ns(), ident, defaultCollectionOptions)); + rs = _engine->getRecordStore(&opCtx, nss.ns(), ident, defaultCollectionOptions); + ASSERT(rs); + } + + // _availableHistory starts off with master at Timestamp(0, 0). + ASSERT_EQ(1, _engine->getHistory_forTest().size()); + + RecordId loc; + { + OperationContextFromKVEngine opCtx(_engine); + WriteUnitOfWork uow(&opCtx); + StatusWith res = + rs->insertRecord(&opCtx, record.c_str(), record.length() + 1, Timestamp()); + ASSERT_OK(res.getStatus()); + loc = res.getValue(); + uow.commit(); + } + + OperationContextFromKVEngine opCtxRead(_engine); + Timestamp readTime1 = _engine->getHistory_forTest().rbegin()->first; + RecordData rd; + ASSERT(rs->findRecord(&opCtxRead, loc, &rd)); + + { + OperationContextFromKVEngine opCtx(_engine); + WriteUnitOfWork uow(&opCtx); + StatusWith res = + rs->insertRecord(&opCtx, record.c_str(), record.length() + 1, Timestamp()); + ASSERT_OK(res.getStatus()); + uow.commit(); + } + + Timestamp readTime2 = _engine->getHistory_forTest().rbegin()->first; + + { + OperationContextFromKVEngine opCtx(_engine); + WriteUnitOfWork uow(&opCtx); + StatusWith res = + rs->insertRecord(&opCtx, record.c_str(), record.length() + 1, Timestamp()); + ASSERT_OK(res.getStatus()); + uow.commit(); + } + + Timestamp readTime3 = _engine->getHistory_forTest().rbegin()->first; + _engine->cleanHistory(); + + // use_count() should be {2, 1, 2} without the copy from getHistory_forTest(). + ASSERT_EQ(3, _engine->getHistory_forTest().size()); + ASSERT_EQ(3, _engine->getHistory_forTest().at(readTime1).use_count()); + ASSERT_EQ(2, _engine->getHistory_forTest().at(readTime2).use_count()); + ASSERT_EQ(3, _engine->getHistory_forTest().at(readTime3).use_count()); +} + } // namespace ephemeral_for_test } // namespace mongo diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp index 2b684221ee8..0131568a01f 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp @@ -59,9 +59,10 @@ void RecoveryUnit::doCommitUnitOfWork() { if (_dirty) { invariant(_forked); while (true) { - std::pair masterInfo = _KVEngine->getMasterInfo(); + auto masterInfo = _KVEngine->getMasterInfo(); try { - _workingCopy.merge3(_mergeBase, masterInfo.second); + invariant(_mergeBase); + _workingCopy.merge3(*_mergeBase, *masterInfo.second); } catch (const merge_conflict_exception&) { throw WriteConflictException(); } @@ -78,7 +79,7 @@ void RecoveryUnit::doCommitUnitOfWork() { _dirty = false; } else if (_forked) { if (kDebugBuild) - invariant(_mergeBase == _workingCopy); + invariant(*_mergeBase == _workingCopy); } _setState(State::kCommitting); @@ -109,6 +110,7 @@ void RecoveryUnit::doAbandonSnapshot() { invariant(!_inUnitOfWork(), toString(_getState())); _forked = false; _dirty = false; + _setMergeNull(); } bool RecoveryUnit::forkIfNeeded() { @@ -116,13 +118,13 @@ bool RecoveryUnit::forkIfNeeded() { return false; // Update the copies of the trees when not in a WUOW so cursors can retrieve the latest data. + auto masterInfo = _KVEngine->getMasterInfo(); + _mergeBase = masterInfo.second; + _workingCopy = *masterInfo.second; + invariant(_mergeBase); - std::pair masterInfo = _KVEngine->getMasterInfo(); - StringStore master = masterInfo.second; - - _mergeBase = master; - _workingCopy = master; - + // Call cleanHistory in case _mergeBase was holding a shared_ptr to an older tree. + _KVEngine->cleanHistory(); _forked = true; return true; } @@ -140,11 +142,19 @@ void RecoveryUnit::setOrderedCommit(bool orderedCommit) {} void RecoveryUnit::_abort() { _forked = false; _dirty = false; + _setMergeNull(); _setState(State::kAborting); abortRegisteredChanges(); _setState(State::kInactive); } +void RecoveryUnit::_setMergeNull() { + _mergeBase = nullptr; + if (!KVEngine::instanceExists()) { + _KVEngine->cleanHistory(); + } +} + RecoveryUnit* RecoveryUnit::get(OperationContext* opCtx) { return checked_cast(opCtx->recoveryUnit()); } diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h index 327c40e86b8..a70af5f78ab 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h @@ -108,10 +108,15 @@ private: void _abort(); + void _setMergeNull(); + std::function _waitUntilDurableCallback; // Official master is kept by KVEngine KVEngine* _KVEngine; - StringStore _mergeBase; + // We need _mergeBase to be a shared_ptr to hold references in KVEngine::_availableHistory. + // _mergeBase will be initialized in forkIfNeeded(). + std::shared_ptr _mergeBase; + // We need _workingCopy to be a unique copy, not a shared_ptr. StringStore _workingCopy; bool _forked = false; -- cgit v1.2.1