diff options
author | Randolph Tan <randolph@10gen.com> | 2017-10-03 16:44:56 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-10-07 20:09:35 -0400 |
commit | 155db509a8e376211095cec062f7aa5be1b7707e (patch) | |
tree | 9ef6f0130061db25fad109ab275c982eab79af5a /src | |
parent | 3d42a318626e059b2669356bb5d39056b8fa1b76 (diff) | |
download | mongo-155db509a8e376211095cec062f7aa5be1b7707e.tar.gz |
SERVER-31030 Make sure that _getNextSessionMods will only contain oplog entries that are majority committed
Diffstat (limited to 'src')
6 files changed, 164 insertions, 57 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index e3438030741..41daf5be6b7 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -204,6 +204,8 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { invariant(_state == kNew); invariant(!opCtx->lockState()->isLocked()); + _sessionCatalogSource.init(opCtx); + // Load the ids of the currently available documents auto storeCurrentLocsStatus = _storeCurrentLocs(opCtx); if (!storeCurrentLocsStatus.isOK()) { @@ -729,18 +731,31 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, arr.done(); } -void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx, - BSONArrayBuilder* arrBuilder) { +repl::OpTime MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch( + OperationContext* opCtx, BSONArrayBuilder* arrBuilder) { + repl::OpTime opTimeToWait; + auto seenOpTimeTerm = repl::OpTime::kUninitializedTerm; + while (_sessionCatalogSource.hasMoreOplog()) { - auto oplog = _sessionCatalogSource.getLastFetchedOplog(); + auto result = _sessionCatalogSource.getLastFetchedOplog(); - if (!oplog) { + if (!result.oplog) { // Last fetched turned out empty, try to see if there are more _sessionCatalogSource.fetchNextOplog(opCtx); continue; } - auto oplogDoc = oplog->toBSON(); + auto newOpTime = result.oplog->getOpTime(); + if (seenOpTimeTerm == repl::OpTime::kUninitializedTerm) { + seenOpTimeTerm = newOpTime.getTerm(); + } else { + uassert(40650, + str::stream() << "detected change of term from " << seenOpTimeTerm << " to " + << newOpTime.getTerm(), + seenOpTimeTerm == newOpTime.getTerm()); + } + + auto oplogDoc = result.oplog->toBSON(); // Use the builder size instead of accumulating the document sizes directly so that we // take into consideration the overhead of BSONArray indices. @@ -751,7 +766,15 @@ void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContex arrBuilder->append(oplogDoc); _sessionCatalogSource.fetchNextOplog(opCtx); + + if (result.shouldWaitForMajority) { + if (opTimeToWait < newOpTime) { + opTimeToWait = newOpTime; + } + } } + + return opTimeToWait; } } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 5bd20b0907d..5bf180bcea7 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -35,6 +35,7 @@ #include "mongo/client/connection_string.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/plan_executor.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/s/migration_chunk_cloner_source.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/db/s/session_catalog_migration_source.h" @@ -130,7 +131,13 @@ public: */ Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder); - void nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder); + /** + * Appends to the buffer oplogs that contain session information for this migration. + * If this function returns a valid OpTime, this means that the oplog appended are + * not guaranteed to be majority committed and the caller has to use wait for the + * returned opTime to be majority committed. + */ + repl::OpTime nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder); private: friend class DeleteNotificationStage; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index d42c6bdaa99..a6f0f5c7f18 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -41,6 +41,7 @@ #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/write_concern.h" /** * This file contains commands, which are specific to the legacy chunk cloner source. @@ -259,8 +260,17 @@ public: BSONArrayBuilder arrBuilder; - AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); - autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder); + repl::OpTime opTime; + + { + AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); + opTime = autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder); + } + + WriteConcernResult wcResult; + WriteConcernOptions majorityWC( + WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0); + uassertStatusOK(waitForWriteConcern(opCtx, opTime, majorityWC, &wcResult)); result.appendArray("oplog", arrBuilder.arr()); return true; diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index ac835502952..021eb013c73 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -30,11 +30,16 @@ #include "mongo/db/s/session_catalog_migration_source.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/transaction_history_iterator.h" +#include "mongo/db/write_concern.h" #include "mongo/stdx/memory.h" +#include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" namespace mongo { @@ -69,17 +74,17 @@ bool SessionCatalogMigrationSource::hasMoreOplog() { return _hasMoreOplogFromSessionCatalog() || _hasNewWrites(); } -boost::optional<repl::OplogEntry> SessionCatalogMigrationSource::getLastFetchedOplog() { +SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLastFetchedOplog() { { stdx::lock_guard<stdx::mutex> _lk(_sessionCloneMutex); if (_lastFetchedOplog) { - return _lastFetchedOplog; + return OplogResult(_lastFetchedOplog, false); } } { stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); - return _lastFetchedNewWriteOplog; + return OplogResult(_lastFetchedNewWriteOplog, true); } } @@ -135,6 +140,7 @@ repl::OplogEntry SessionCatalogMigrationSource::_getLastFetchedOplogFromSessionC bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lk(_sessionCloneMutex); + invariant(_alreadyInitialized); if (!_lastFetchedOplogBuffer.empty()) { _lastFetchedOplog = _lastFetchedOplogBuffer.back(); _lastFetchedOplogBuffer.pop_back(); @@ -147,8 +153,6 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC return true; } - _initIfNotYet(lk, opCtx); - while (!_sessionLastWriteOpTimes.empty()) { auto lowestOpTimeIter = _sessionLastWriteOpTimes.begin(); auto nextOpTime = *lowestOpTimeIter; @@ -180,6 +184,7 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op { stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); + invariant(_alreadyInitialized); if (_newWriteOpTimeList.empty()) { _lastFetchedNewWriteOplog.reset(); return false; @@ -211,21 +216,46 @@ void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) { _newWriteOpTimeList.push_back(opTime); } -void SessionCatalogMigrationSource::_initIfNotYet(WithLock, OperationContext* opCtx) { - if (_alreadyInitialized) { - return; - } +void SessionCatalogMigrationSource::init(OperationContext* opCtx) { + invariant(!_alreadyInitialized); DBDirectClient client(opCtx); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), {}); + std::set<repl::OpTime> opTimes; while (cursor->more()) { auto nextSession = SessionTxnRecord::parse( IDLParserErrorContext("Session migration cloning"), cursor->next()); - _sessionLastWriteOpTimes.insert(nextSession.getLastWriteOpTime()); + auto opTime = nextSession.getLastWriteOpTime(); + if (!opTime.isNull()) { + opTimes.insert(nextSession.getLastWriteOpTime()); + } + } + + { + auto message = BSON("sessionMigrateCloneStart" << _ns.ns()); + AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX); + writeConflictRetry( + opCtx, + "session migration initialization majority commit barrier", + NamespaceString::kRsOplogNamespace.ns(), + [&] { + WriteUnitOfWork wuow(opCtx); + opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( + opCtx, _ns, {}, {}, message); + wuow.commit(); + }); } + auto opTimeToWait = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + WriteConcernResult result; + WriteConcernOptions majority( + WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0); + uassertStatusOK(waitForWriteConcern(opCtx, opTimeToWait, majority, &result)); + + stdx::lock_guard<stdx::mutex> lk(_sessionCloneMutex); _alreadyInitialized = true; + _sessionLastWriteOpTimes.swap(opTimes); } } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index c1e6729aca3..0133281df07 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -48,13 +48,44 @@ class ServiceContext; /** * Provides facilities for extracting oplog entries of writes in a particular namespace that needs * to be migrated. + * + * This also ensures that oplog returned are majority committed. This is achieved by calling + * waitForWriteConcern. However, waitForWriteConcern does not support waiting for opTimes of + * previous terms. To get around this, the waitForWriteConcern is performed in two phases: + * + * During init() call phase: + * 1. Scan the entire config.transactions and extract all the lastWriteOpTime. + * 2. Insert a no-op oplog entry and wait for it to be majority committed. + * 3. At this point any writes before should be majority committed (including all the oplog + * entries that the collected lastWriteOpTime points to). If the particular oplog with the + * opTime cannot be found: it either means that the oplog was truncated or rolled back. + * + * New writes/xfer mods phase oplog entries: + * In this case, caller is responsible for calling waitForWriteConcern. If getLastFetchedOplog + * returns shouldWaitForMajority == true, it should wait for the highest opTime it has got from + * getLastFetchedOplog. It should also error if it detects a change of term within a batch since + * it would be wrong to wait for the highest opTime in this case. */ class SessionCatalogMigrationSource { MONGO_DISALLOW_COPYING(SessionCatalogMigrationSource); public: + struct OplogResult { + OplogResult(boost::optional<repl::OplogEntry> _oplog, bool _shouldWaitForMajority) + : oplog(std::move(_oplog)), shouldWaitForMajority(_shouldWaitForMajority) {} + + // The oplog fetched. + boost::optional<repl::OplogEntry> oplog; + + // If this is set to true, oplog returned is not confirmed to be majority committed, + // so the caller has to explicitly wait for it to be committed to majority. + bool shouldWaitForMajority = false; + }; + explicit SessionCatalogMigrationSource(NamespaceString ns); + void init(OperationContext* opCtx); + /** * Returns true if there are more oplog entries to fetch at this moment. Note that new writes * can still continue to come in after this has returned false, so it can become true again. @@ -72,7 +103,7 @@ public: * Returns the oplog document that was last fetched by the fetchNextOplog call. * Returns an empty object if there are no oplog to fetch. */ - boost::optional<repl::OplogEntry> getLastFetchedOplog(); + OplogResult getLastFetchedOplog(); /** * Remembers the oplog timestamp of a new write that just occurred. @@ -83,8 +114,6 @@ private: /////////////////////////////////////////////////////////////////////////// // Methods for extracting the oplog entries from session information. - void _initIfNotYet(WithLock, OperationContext* opCtx); - /** * If this returns false, it just means that there are no more oplog entry in the buffer that * needs to be moved over. However, there can still be new incoming operations that can add diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index f7051904a9a..c1e707c0c9f 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -47,6 +47,7 @@ class SessionCatalogMigrationSourceTest : public MockReplCoordServerFixture {}; TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog) { const NamespaceString kNs("a.b"); SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); } @@ -73,23 +74,24 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplogResult.oplog->toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplogResult.oplog->toBSON()); } ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); @@ -138,24 +140,25 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { insertOplogEntry(entry2b); SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto checkNextBatch = [this, &migrationSource](const repl::OplogEntry& firstExpectedOplog, const repl::OplogEntry& secondExpectedOplog) { { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(firstExpectedOplog.toBSON(), nextOplog->toBSON()); + ASSERT_BSONOBJ_EQ(firstExpectedOplog.toBSON(), nextOplogResult.oplog->toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); - ASSERT_BSONOBJ_EQ(secondExpectedOplog.toBSON(), nextOplog->toBSON()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + ASSERT_BSONOBJ_EQ(secondExpectedOplog.toBSON(), nextOplogResult.oplog->toBSON()); } }; @@ -210,16 +213,17 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto expectedSequece = {entry3, entry4, entry1, entry2}; for (auto oplog : expectedSequece) { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplog->toBSON()); + ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON()); migrationSource.fetchNextOplog(opCtx()); } @@ -261,13 +265,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { sessionRecord2.toBSON()); SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplogResult.oplog->toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -301,6 +306,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { insertOplogEntry(entry3); SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); migrationSource.notifyNewWriteOpTime(entry2.getOpTime()); @@ -308,26 +314,26 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplogResult.oplog->toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); - ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplog->toBSON()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); + ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplogResult.oplog->toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } { ASSERT_TRUE(migrationSource.hasMoreOplog()); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); - ASSERT_BSONOBJ_EQ(entry3.toBSON(), nextOplog->toBSON()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); + ASSERT_BSONOBJ_EQ(entry3.toBSON(), nextOplogResult.oplog->toBSON()); } ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); @@ -338,6 +344,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) { const NamespaceString kNs("a.b"); SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); migrationSource.notifyNewWriteOpTime(repl::OpTime(Timestamp(100, 3), 1)); @@ -349,6 +356,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer const NamespaceString kNs("a.b"); SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); { @@ -364,10 +372,10 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer ASSERT_TRUE(migrationSource.hasMoreOplog()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -383,9 +391,9 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer ASSERT_TRUE(migrationSource.hasMoreOplog()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); - ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); + ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -401,9 +409,9 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer ASSERT_TRUE(migrationSource.hasMoreOplog()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - auto nextOplog = migrationSource.getLastFetchedOplog(); - ASSERT_TRUE(nextOplog); - ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); + ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); |