diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-11-17 16:39:51 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-11-20 15:01:11 -0500 |
commit | c98068d7c836e2d36a862189733d903b24b02d9a (patch) | |
tree | 6d072ccacc3853e47a848e1fdf57d40196964bda /src/mongo/db/s | |
parent | 697db2c561f006cb9e1be9312e72c5072dd12530 (diff) | |
download | mongo-c98068d7c836e2d36a862189733d903b24b02d9a.tar.gz |
SERVER-32027 Fix unit-tests which rely on having a valid wallclock time
Diffstat (limited to 'src/mongo/db/s')
4 files changed, 65 insertions, 83 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 05436465040..b12c13b2a05 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -207,8 +207,8 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { auto const replCoord = repl::ReplicationCoordinator::get(opCtx); if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { - _sessionCatalogSource = stdx::make_unique<SessionCatalogMigrationSource>(_args.getNss()); - _sessionCatalogSource->init(opCtx); + _sessionCatalogSource = + stdx::make_unique<SessionCatalogMigrationSource>(opCtx, _args.getNss()); // Prime up the session migration source if there are oplog entries to migrate. _sessionCatalogSource->fetchNextOplog(opCtx); diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 5328f0ac30e..9ca38970dc7 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -110,10 +110,52 @@ repl::OplogEntry makeSentinelOplogEntry(OperationSessionInfo sessionInfo) { kIncompleteHistoryStmtId); // statement id } -} // unnamed namespace +} // namespace -SessionCatalogMigrationSource::SessionCatalogMigrationSource(NamespaceString ns) - : _ns(std::move(ns)) {} +SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* opCtx, + NamespaceString ns) + : _ns(std::move(ns)), + _rollbackIdAtInit( + uassertStatusOK(repl::ReplicationProcess::get(opCtx)->getRollbackID(opCtx))) { + // Sort is not needed for correctness. This is just for making it easier to write deterministic + // tests. + Query query; + query.sort(BSON("_id" << 1)); + + DBDirectClient client(opCtx); + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), query); + + while (cursor->more()) { + auto nextSession = SessionTxnRecord::parse( + IDLParserErrorContext("Session migration cloning"), cursor->next()); + if (!nextSession.getLastWriteOpTime().isNull()) { + _sessionOplogIterators.push_back( + stdx::make_unique<SessionOplogIterator>(std::move(nextSession), _rollbackIdAtInit)); + } + } + + { + AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX); + writeConflictRetry( + opCtx, + "session migration initialization majority commit barrier", + NamespaceString::kRsOplogNamespace.ns(), + [&] { + const auto message = BSON("sessionMigrateCloneStart" << _ns.ns()); + + WriteUnitOfWork wuow(opCtx); + opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( + opCtx, _ns, {}, {}, message); + wuow.commit(); + }); + } + + auto opTimeToWait = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + WriteConcernResult result; + WriteConcernOptions majority( + WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0); + uassertStatusOK(waitForWriteConcern(opCtx, opTimeToWait, majority, &result)); +} bool SessionCatalogMigrationSource::hasMoreOplog() { return _hasMoreOplogFromSessionCatalog() || _hasNewWrites(); @@ -188,7 +230,6 @@ repl::OplogEntry SessionCatalogMigrationSource::_getLastFetchedOplogFromSessionC bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lk(_sessionCloneMutex); - invariant(_alreadyInitialized); if (!_lastFetchedOplogBuffer.empty()) { _lastFetchedOplog = _lastFetchedOplogBuffer.back(); _lastFetchedOplogBuffer.pop_back(); @@ -230,7 +271,6 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op { stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); - invariant(_alreadyInitialized); if (_newWriteOpTimeList.empty()) { _lastFetchedNewWriteOplog.reset(); return false; @@ -262,54 +302,6 @@ void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) { _newWriteOpTimeList.push_back(opTime); } -void SessionCatalogMigrationSource::init(OperationContext* opCtx) { - invariant(!_alreadyInitialized); - - _rollbackIdAtInit = uassertStatusOK(repl::ReplicationProcess::get(opCtx)->getRollbackID(opCtx)); - - DBDirectClient client(opCtx); - Query query; - // Sort is not needed for correctness. This is just for making it easier to write deterministic - // tests. - query.sort(BSON("_id" << 1)); - auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), query); - - std::vector<std::unique_ptr<SessionOplogIterator>> sessionOplogIterators; - while (cursor->more()) { - auto nextSession = SessionTxnRecord::parse( - IDLParserErrorContext("Session migration cloning"), cursor->next()); - if (!nextSession.getLastWriteOpTime().isNull()) { - sessionOplogIterators.push_back( - stdx::make_unique<SessionOplogIterator>(std::move(nextSession), _rollbackIdAtInit)); - } - } - - { - auto message = BSON("sessionMigrateCloneStart" << _ns.ns()); - AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX); - writeConflictRetry( - opCtx, - "session migration initialization majority commit barrier", - NamespaceString::kRsOplogNamespace.ns(), - [&] { - WriteUnitOfWork wuow(opCtx); - opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( - opCtx, _ns, {}, {}, message); - wuow.commit(); - }); - } - - auto opTimeToWait = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - WriteConcernResult result; - WriteConcernOptions majority( - WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0); - uassertStatusOK(waitForWriteConcern(opCtx, opTimeToWait, majority, &result)); - - stdx::lock_guard<stdx::mutex> lk(_sessionCloneMutex); - _alreadyInitialized = true; - _sessionOplogIterators.swap(sessionOplogIterators); -} - SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator( SessionTxnRecord txnRecord, int expectedRollbackId) : _record(std::move(txnRecord)), _initialRollbackId(expectedRollbackId) { diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 2e5faf393c2..75d7a0073f1 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -83,9 +83,7 @@ public: bool shouldWaitForMajority = false; }; - explicit SessionCatalogMigrationSource(NamespaceString ns); - - void init(OperationContext* opCtx); + SessionCatalogMigrationSource(OperationContext* opCtx, NamespaceString ns); /** * Returns true if there are more oplog entries to fetch at this moment. Note that new writes @@ -190,14 +188,16 @@ private: */ repl::OplogEntry _getLastFetchedNewWriteOplog(); + // Namespace for which the migration is happening const NamespaceString _ns; - // Protects _alreadyInitialized, _sessionCatalogCursor, _sessionOplogIterators - // _currentOplogIterator, _lastFetchedOplogBuffer, _lastFetchedOplog - stdx::mutex _sessionCloneMutex; - bool _alreadyInitialized = false; + // The rollback id just before migration started. This value is needed so that step-down + // followed by step-up situations can be discovered. + const int _rollbackIdAtInit; - int _rollbackIdAtInit = 0; + // Protects _sessionCatalogCursor, _sessionOplogIterators, _currentOplogIterator, + // _lastFetchedOplogBuffer, _lastFetchedOplog + stdx::mutex _sessionCloneMutex; // List of remaining session records that needs to be cloned. std::vector<std::unique_ptr<SessionOplogIterator>> _sessionOplogIterators; diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index 4b8c7ad3bdf..74f87bbba25 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -101,8 +101,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, } TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog) { - SessionCatalogMigrationSource migrationSource(kNs); - migrationSource.init(opCtx()); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); } @@ -137,8 +136,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(kNs); - migrationSource.init(opCtx()); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -223,8 +221,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { insertOplogEntry(entry1b); insertOplogEntry(entry2b); - SessionCatalogMigrationSource migrationSource(kNs); - migrationSource.init(opCtx()); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto checkNextBatch = [this, &migrationSource](const repl::OplogEntry& firstExpectedOplog, @@ -320,8 +317,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(kNs); - migrationSource.init(opCtx()); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto expectedSequece = {entry3, entry4, entry1, entry2}; @@ -382,8 +378,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord2.toBSON()); - SessionCatalogMigrationSource migrationSource(kNs); - migrationSource.init(opCtx()); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -438,8 +433,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction insertOplogEntry(entry3); - SessionCatalogMigrationSource migrationSource(kNs); - migrationSource.init(opCtx()); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); migrationSource.notifyNewWriteOpTime(entry2.getOpTime()); @@ -474,8 +468,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { } TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) { - SessionCatalogMigrationSource migrationSource(kNs); - migrationSource.init(opCtx()); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); migrationSource.notifyNewWriteOpTime(repl::OpTime(Timestamp(100, 3), 1)); @@ -484,8 +477,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) { } TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBufferWasDepleted) { - SessionCatalogMigrationSource migrationSource(kNs); - migrationSource.init(opCtx()); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); { @@ -581,8 +573,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(kNs); - migrationSource.init(opCtx()); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -638,8 +629,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) { DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(kNs); - migrationSource.init(opCtx()); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { |