summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp33
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h9
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp14
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp50
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h35
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp80
6 files changed, 164 insertions, 57 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index e3438030741..41daf5be6b7 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -204,6 +204,8 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
invariant(_state == kNew);
invariant(!opCtx->lockState()->isLocked());
+ _sessionCatalogSource.init(opCtx);
+
// Load the ids of the currently available documents
auto storeCurrentLocsStatus = _storeCurrentLocs(opCtx);
if (!storeCurrentLocsStatus.isOK()) {
@@ -729,18 +731,31 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx,
arr.done();
}
-void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx,
- BSONArrayBuilder* arrBuilder) {
+repl::OpTime MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(
+ OperationContext* opCtx, BSONArrayBuilder* arrBuilder) {
+ repl::OpTime opTimeToWait;
+ auto seenOpTimeTerm = repl::OpTime::kUninitializedTerm;
+
while (_sessionCatalogSource.hasMoreOplog()) {
- auto oplog = _sessionCatalogSource.getLastFetchedOplog();
+ auto result = _sessionCatalogSource.getLastFetchedOplog();
- if (!oplog) {
+ if (!result.oplog) {
// Last fetched turned out empty, try to see if there are more
_sessionCatalogSource.fetchNextOplog(opCtx);
continue;
}
- auto oplogDoc = oplog->toBSON();
+ auto newOpTime = result.oplog->getOpTime();
+ if (seenOpTimeTerm == repl::OpTime::kUninitializedTerm) {
+ seenOpTimeTerm = newOpTime.getTerm();
+ } else {
+ uassert(40650,
+ str::stream() << "detected change of term from " << seenOpTimeTerm << " to "
+ << newOpTime.getTerm(),
+ seenOpTimeTerm == newOpTime.getTerm());
+ }
+
+ auto oplogDoc = result.oplog->toBSON();
// Use the builder size instead of accumulating the document sizes directly so that we
// take into consideration the overhead of BSONArray indices.
@@ -751,7 +766,15 @@ void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContex
arrBuilder->append(oplogDoc);
_sessionCatalogSource.fetchNextOplog(opCtx);
+
+ if (result.shouldWaitForMajority) {
+ if (opTimeToWait < newOpTime) {
+ opTimeToWait = newOpTime;
+ }
+ }
}
+
+ return opTimeToWait;
}
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 5bd20b0907d..5bf180bcea7 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -35,6 +35,7 @@
#include "mongo/client/connection_string.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/plan_executor.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/db/s/session_catalog_migration_source.h"
@@ -130,7 +131,13 @@ public:
*/
Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder);
- void nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder);
+ /**
+ * Appends to the buffer oplogs that contain session information for this migration.
+ * If this function returns a valid OpTime, this means that the oplog appended are
+ * not guaranteed to be majority committed and the caller has to use wait for the
+ * returned opTime to be majority committed.
+ */
+ repl::OpTime nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder);
private:
friend class DeleteNotificationStage;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
index d42c6bdaa99..a6f0f5c7f18 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/write_concern.h"
/**
* This file contains commands, which are specific to the legacy chunk cloner source.
@@ -259,8 +260,17 @@ public:
BSONArrayBuilder arrBuilder;
- AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
- autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder);
+ repl::OpTime opTime;
+
+ {
+ AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
+ opTime = autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder);
+ }
+
+ WriteConcernResult wcResult;
+ WriteConcernOptions majorityWC(
+ WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0);
+ uassertStatusOK(waitForWriteConcern(opCtx, opTime, majorityWC, &wcResult));
result.appendArray("oplog", arrBuilder.arr());
return true;
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index ac835502952..021eb013c73 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -30,11 +30,16 @@
#include "mongo/db/s/session_catalog_migration_source.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_history_iterator.h"
+#include "mongo/db/write_concern.h"
#include "mongo/stdx/memory.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
@@ -69,17 +74,17 @@ bool SessionCatalogMigrationSource::hasMoreOplog() {
return _hasMoreOplogFromSessionCatalog() || _hasNewWrites();
}
-boost::optional<repl::OplogEntry> SessionCatalogMigrationSource::getLastFetchedOplog() {
+SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLastFetchedOplog() {
{
stdx::lock_guard<stdx::mutex> _lk(_sessionCloneMutex);
if (_lastFetchedOplog) {
- return _lastFetchedOplog;
+ return OplogResult(_lastFetchedOplog, false);
}
}
{
stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
- return _lastFetchedNewWriteOplog;
+ return OplogResult(_lastFetchedNewWriteOplog, true);
}
}
@@ -135,6 +140,7 @@ repl::OplogEntry SessionCatalogMigrationSource::_getLastFetchedOplogFromSessionC
bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationContext* opCtx) {
stdx::unique_lock<stdx::mutex> lk(_sessionCloneMutex);
+ invariant(_alreadyInitialized);
if (!_lastFetchedOplogBuffer.empty()) {
_lastFetchedOplog = _lastFetchedOplogBuffer.back();
_lastFetchedOplogBuffer.pop_back();
@@ -147,8 +153,6 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC
return true;
}
- _initIfNotYet(lk, opCtx);
-
while (!_sessionLastWriteOpTimes.empty()) {
auto lowestOpTimeIter = _sessionLastWriteOpTimes.begin();
auto nextOpTime = *lowestOpTimeIter;
@@ -180,6 +184,7 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
{
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
+ invariant(_alreadyInitialized);
if (_newWriteOpTimeList.empty()) {
_lastFetchedNewWriteOplog.reset();
return false;
@@ -211,21 +216,46 @@ void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) {
_newWriteOpTimeList.push_back(opTime);
}
-void SessionCatalogMigrationSource::_initIfNotYet(WithLock, OperationContext* opCtx) {
- if (_alreadyInitialized) {
- return;
- }
+void SessionCatalogMigrationSource::init(OperationContext* opCtx) {
+ invariant(!_alreadyInitialized);
DBDirectClient client(opCtx);
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), {});
+ std::set<repl::OpTime> opTimes;
while (cursor->more()) {
auto nextSession = SessionTxnRecord::parse(
IDLParserErrorContext("Session migration cloning"), cursor->next());
- _sessionLastWriteOpTimes.insert(nextSession.getLastWriteOpTime());
+ auto opTime = nextSession.getLastWriteOpTime();
+ if (!opTime.isNull()) {
+ opTimes.insert(nextSession.getLastWriteOpTime());
+ }
+ }
+
+ {
+ auto message = BSON("sessionMigrateCloneStart" << _ns.ns());
+ AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX);
+ writeConflictRetry(
+ opCtx,
+ "session migration initialization majority commit barrier",
+ NamespaceString::kRsOplogNamespace.ns(),
+ [&] {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
+ opCtx, _ns, {}, {}, message);
+ wuow.commit();
+ });
}
+ auto opTimeToWait = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ WriteConcernResult result;
+ WriteConcernOptions majority(
+ WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0);
+ uassertStatusOK(waitForWriteConcern(opCtx, opTimeToWait, majority, &result));
+
+ stdx::lock_guard<stdx::mutex> lk(_sessionCloneMutex);
_alreadyInitialized = true;
+ _sessionLastWriteOpTimes.swap(opTimes);
}
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index c1e6729aca3..0133281df07 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -48,13 +48,44 @@ class ServiceContext;
/**
* Provides facilities for extracting oplog entries of writes in a particular namespace that needs
* to be migrated.
+ *
+ * This also ensures that oplog returned are majority committed. This is achieved by calling
+ * waitForWriteConcern. However, waitForWriteConcern does not support waiting for opTimes of
+ * previous terms. To get around this, the waitForWriteConcern is performed in two phases:
+ *
+ * During init() call phase:
+ * 1. Scan the entire config.transactions and extract all the lastWriteOpTime.
+ * 2. Insert a no-op oplog entry and wait for it to be majority committed.
+ * 3. At this point any writes before should be majority committed (including all the oplog
+ * entries that the collected lastWriteOpTime points to). If the particular oplog with the
+ * opTime cannot be found: it either means that the oplog was truncated or rolled back.
+ *
+ * New writes/xfer mods phase oplog entries:
+ * In this case, caller is responsible for calling waitForWriteConcern. If getLastFetchedOplog
+ * returns shouldWaitForMajority == true, it should wait for the highest opTime it has got from
+ * getLastFetchedOplog. It should also error if it detects a change of term within a batch since
+ * it would be wrong to wait for the highest opTime in this case.
*/
class SessionCatalogMigrationSource {
MONGO_DISALLOW_COPYING(SessionCatalogMigrationSource);
public:
+ struct OplogResult {
+ OplogResult(boost::optional<repl::OplogEntry> _oplog, bool _shouldWaitForMajority)
+ : oplog(std::move(_oplog)), shouldWaitForMajority(_shouldWaitForMajority) {}
+
+ // The oplog fetched.
+ boost::optional<repl::OplogEntry> oplog;
+
+ // If this is set to true, oplog returned is not confirmed to be majority committed,
+ // so the caller has to explicitly wait for it to be committed to majority.
+ bool shouldWaitForMajority = false;
+ };
+
explicit SessionCatalogMigrationSource(NamespaceString ns);
+ void init(OperationContext* opCtx);
+
/**
* Returns true if there are more oplog entries to fetch at this moment. Note that new writes
* can still continue to come in after this has returned false, so it can become true again.
@@ -72,7 +103,7 @@ public:
* Returns the oplog document that was last fetched by the fetchNextOplog call.
* Returns an empty object if there are no oplog to fetch.
*/
- boost::optional<repl::OplogEntry> getLastFetchedOplog();
+ OplogResult getLastFetchedOplog();
/**
* Remembers the oplog timestamp of a new write that just occurred.
@@ -83,8 +114,6 @@ private:
///////////////////////////////////////////////////////////////////////////
// Methods for extracting the oplog entries from session information.
- void _initIfNotYet(WithLock, OperationContext* opCtx);
-
/**
* If this returns false, it just means that there are no more oplog entry in the buffer that
* needs to be moved over. However, there can still be new incoming operations that can add
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index f7051904a9a..c1e707c0c9f 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -47,6 +47,7 @@ class SessionCatalogMigrationSourceTest : public MockReplCoordServerFixture {};
TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog) {
const NamespaceString kNs("a.b");
SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
}
@@ -73,23 +74,24 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
// Cannot compare directly because of SERVER-31356
- ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplog->toBSON());
+ ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplogResult.oplog->toBSON());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
}
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
// Cannot compare directly because of SERVER-31356
- ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON());
+ ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplogResult.oplog->toBSON());
}
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
@@ -138,24 +140,25 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
insertOplogEntry(entry2b);
SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto checkNextBatch = [this, &migrationSource](const repl::OplogEntry& firstExpectedOplog,
const repl::OplogEntry& secondExpectedOplog) {
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
// Cannot compare directly because of SERVER-31356
- ASSERT_BSONOBJ_EQ(firstExpectedOplog.toBSON(), nextOplog->toBSON());
+ ASSERT_BSONOBJ_EQ(firstExpectedOplog.toBSON(), nextOplogResult.oplog->toBSON());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
}
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
- ASSERT_BSONOBJ_EQ(secondExpectedOplog.toBSON(), nextOplog->toBSON());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ ASSERT_BSONOBJ_EQ(secondExpectedOplog.toBSON(), nextOplogResult.oplog->toBSON());
}
};
@@ -210,16 +213,17 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto expectedSequece = {entry3, entry4, entry1, entry2};
for (auto oplog : expectedSequece) {
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
// Cannot compare directly because of SERVER-31356
- ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplog->toBSON());
+ ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON());
migrationSource.fetchNextOplog(opCtx());
}
@@ -261,13 +265,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
sessionRecord2.toBSON());
SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
// Cannot compare directly because of SERVER-31356
- ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON());
+ ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplogResult.oplog->toBSON());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -301,6 +306,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
insertOplogEntry(entry3);
SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(entry2.getOpTime());
@@ -308,26 +314,26 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
// Cannot compare directly because of SERVER-31356
- ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplog->toBSON());
+ ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplogResult.oplog->toBSON());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
}
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
- ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplog->toBSON());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplogResult.shouldWaitForMajority);
+ ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplogResult.oplog->toBSON());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
}
{
ASSERT_TRUE(migrationSource.hasMoreOplog());
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
- ASSERT_BSONOBJ_EQ(entry3.toBSON(), nextOplog->toBSON());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplogResult.shouldWaitForMajority);
+ ASSERT_BSONOBJ_EQ(entry3.toBSON(), nextOplogResult.oplog->toBSON());
}
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
@@ -338,6 +344,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
const NamespaceString kNs("a.b");
SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(repl::OpTime(Timestamp(100, 3), 1));
@@ -349,6 +356,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
const NamespaceString kNs("a.b");
SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -364,10 +372,10 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplogResult.shouldWaitForMajority);
// Cannot compare directly because of SERVER-31356
- ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON());
+ ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -383,9 +391,9 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
- ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplogResult.shouldWaitForMajority);
+ ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -401,9 +409,9 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- auto nextOplog = migrationSource.getLastFetchedOplog();
- ASSERT_TRUE(nextOplog);
- ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplog->toBSON());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_TRUE(nextOplogResult.shouldWaitForMajority);
+ ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());