summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSuganthi Mani <suganthi.mani@mongodb.com>2022-04-26 12:03:36 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-26 12:36:10 +0000
commit61087a02978226541e7f17f51a8755b680e26364 (patch)
tree7c3713b2c5a0b8517c1858695650f084e55d4967 /src
parent35f03b7ef5739264c9de4b9a6d3e38bdc19b5c17 (diff)
downloadmongo-61087a02978226541e7f17f51a8755b680e26364.tar.gz
SERVER-65300 Refactor tenant migration recipient state machinery code.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/tenant_all_database_cloner.cpp2
-rw-r--r--src/mongo/db/repl/tenant_all_database_cloner_test.cpp8
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner.cpp5
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner_test.cpp12
-rw-r--r--src/mongo/db/repl/tenant_database_cloner.cpp2
-rw-r--r--src/mongo/db/repl/tenant_database_cloner_test.cpp8
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp687
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h70
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp19
-rw-r--r--src/mongo/db/repl/tenant_migration_shared_data.h18
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp18
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.h12
12 files changed, 452 insertions, 409 deletions
diff --git a/src/mongo/db/repl/tenant_all_database_cloner.cpp b/src/mongo/db/repl/tenant_all_database_cloner.cpp
index 1b633cf7ecf..97f89ab2fe7 100644
--- a/src/mongo/db/repl/tenant_all_database_cloner.cpp
+++ b/src/mongo/db/repl/tenant_all_database_cloner.cpp
@@ -175,7 +175,7 @@ BaseCloner::AfterStageBehavior TenantAllDatabaseCloner::listExistingDatabasesSta
}
}
- if (!getSharedData()->isResuming()) {
+ if (getSharedData()->getResumePhase() == ResumePhase::kNone) {
uassert(ErrorCodes::NamespaceExists,
str::stream() << "Tenant '" << _tenantId
<< "': databases already exist prior to data sync",
diff --git a/src/mongo/db/repl/tenant_all_database_cloner_test.cpp b/src/mongo/db/repl/tenant_all_database_cloner_test.cpp
index 1326a042403..22765d9eb37 100644
--- a/src/mongo/db/repl/tenant_all_database_cloner_test.cpp
+++ b/src/mongo/db/repl/tenant_all_database_cloner_test.cpp
@@ -402,7 +402,7 @@ TEST_F(TenantAllDatabaseClonerTest, ResumingFromLastClonedDb) {
_mockServer->setCommandReply("find", createFindResponse());
_mockServer->setCommandReply("dbStats", fromjson("{ok:1, dataSize: 30}"));
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeAllDatabaseCloner(&resumingSharedData);
cloner->setStopAfterStage_forTest("initializeStatsStage");
@@ -445,7 +445,7 @@ TEST_F(TenantAllDatabaseClonerTest, LastClonedDbDeleted_AllGreater) {
_mockServer->setCommandReply("find", createFindResponse());
_mockServer->setCommandReply("dbStats", fromjson("{ok:1, dataSize: 30}"));
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeAllDatabaseCloner(&resumingSharedData);
cloner->setStopAfterStage_forTest("initializeStatsStage");
@@ -502,7 +502,7 @@ TEST_F(TenantAllDatabaseClonerTest, LastClonedDbDeleted_SomeGreater) {
_mockServer->setCommandReply("find", createFindResponse());
_mockServer->setCommandReply("dbStats", fromjson("{ok:1, dataSize: 30}"));
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeAllDatabaseCloner(&resumingSharedData);
cloner->setStopAfterStage_forTest("initializeStatsStage");
@@ -570,7 +570,7 @@ TEST_F(TenantAllDatabaseClonerTest, LastClonedDbDeleted_AllLess) {
_mockServer->setCommandReply("find", createFindResponse());
_mockServer->setCommandReply("dbStats", fromjson("{ok:1, dataSize: 30}"));
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeAllDatabaseCloner(&resumingSharedData);
cloner->setStopAfterStage_forTest("initializeStatsStage");
diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp
index d74b0760cd4..cd6ea25a650 100644
--- a/src/mongo/db/repl/tenant_collection_cloner.cpp
+++ b/src/mongo/db/repl/tenant_collection_cloner.cpp
@@ -324,7 +324,7 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::createCollectionStage() {
uassert(ErrorCodes::NamespaceExists,
str::stream() << "Tenant '" << _tenantId << "': collection '" << collection->ns()
<< "' already exists prior to data sync",
- getSharedData()->isResuming());
+ getSharedData()->getResumePhase() == ResumePhase::kDataSync);
_existingNss = collection->ns();
LOGV2(5342502,
@@ -393,7 +393,8 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::createCollectionStage() {
_collectionOptions,
!_idIndexSpec.isEmpty() /* createIdIndex */,
_idIndexSpec);
- if (status == ErrorCodes::NamespaceExists && getSharedData()->isResuming()) {
+ if (status == ErrorCodes::NamespaceExists &&
+ getSharedData()->getResumePhase() == ResumePhase::kDataSync) {
// If we are resuming from a recipient failover we can get ErrorCodes::NamespaceExists
// due to following conditions:
//
diff --git a/src/mongo/db/repl/tenant_collection_cloner_test.cpp b/src/mongo/db/repl/tenant_collection_cloner_test.cpp
index a7ae564e6e8..978a5efe234 100644
--- a/src/mongo/db/repl/tenant_collection_cloner_test.cpp
+++ b/src/mongo/db/repl/tenant_collection_cloner_test.cpp
@@ -890,7 +890,7 @@ TEST_F(TenantCollectionClonerTest, QueryPlanKilledThenNamespaceNotFoundSubsequen
}
TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingAllSecondaryIndexes) {
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData);
// Simulate that the collection already exists with no data and no secondary index.
@@ -922,7 +922,7 @@ TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingAllSecondaryI
}
TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingSomeSecondaryIndexes) {
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData);
// Simulate that the collection already exists with no data and some secondary indexes.
@@ -956,7 +956,7 @@ TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingSomeSecondary
}
TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingNoSecondaryIndexes) {
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData);
// Simulate that the collection already exists with no data and all matching secondary indexes.
@@ -982,7 +982,7 @@ TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingNoSecondaryIn
}
TEST_F(TenantCollectionClonerTest, ResumeFromNonEmptyCollection) {
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData);
// Simulate that the collection already exists with some data.
@@ -1015,7 +1015,7 @@ TEST_F(TenantCollectionClonerTest, ResumeFromNonEmptyCollection) {
}
TEST_F(TenantCollectionClonerTest, ResumeFromRecreatedCollection) {
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData);
// Simulate that the namespace already exists under a different uuid.
@@ -1045,7 +1045,7 @@ TEST_F(TenantCollectionClonerTest, ResumeFromRecreatedCollection) {
}
TEST_F(TenantCollectionClonerTest, ResumeFromRenamedCollection) {
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData);
// Simulate that the collection already exists under a different name with no index and no data.
diff --git a/src/mongo/db/repl/tenant_database_cloner.cpp b/src/mongo/db/repl/tenant_database_cloner.cpp
index 0cde8bfa265..7885c67eaa5 100644
--- a/src/mongo/db/repl/tenant_database_cloner.cpp
+++ b/src/mongo/db/repl/tenant_database_cloner.cpp
@@ -238,7 +238,7 @@ BaseCloner::AfterStageBehavior TenantDatabaseCloner::listExistingCollectionsStag
}
}
- if (!getSharedData()->isResuming()) {
+ if (getSharedData()->getResumePhase() == ResumePhase::kNone) {
uassert(ErrorCodes::NamespaceExists,
str::stream() << "Tenant '" << _tenantId
<< "': collections already exist prior to data sync",
diff --git a/src/mongo/db/repl/tenant_database_cloner_test.cpp b/src/mongo/db/repl/tenant_database_cloner_test.cpp
index 01540fe3d57..4b089101465 100644
--- a/src/mongo/db/repl/tenant_database_cloner_test.cpp
+++ b/src/mongo/db/repl/tenant_database_cloner_test.cpp
@@ -694,7 +694,7 @@ TEST_F(TenantDatabaseClonerTest, ResumingFromLastClonedCollection) {
sizeOfOneCollection = swSize.getValue();
}
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeDatabaseCloner(&resumingSharedData);
cloner->setStopAfterStage_forTest("listExistingCollections");
@@ -758,7 +758,7 @@ TEST_F(TenantDatabaseClonerTest, LastClonedCollectionDeleted_AllGreater) {
sizeANss = swSize.getValue();
}
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeDatabaseCloner(&resumingSharedData);
cloner->setStopAfterStage_forTest("listExistingCollections");
@@ -837,7 +837,7 @@ TEST_F(TenantDatabaseClonerTest, LastClonedCollectionDeleted_SomeGreater) {
ANssBNssSize += swSizeBNss.getValue();
}
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeDatabaseCloner(&resumingSharedData);
cloner->setStopAfterStage_forTest("listExistingCollections");
@@ -927,7 +927,7 @@ TEST_F(TenantDatabaseClonerTest, LastClonedCollectionDeleted_AllLess) {
sizeOfAllColls += swSizeCNss.getValue();
}
- TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true);
+ TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync);
auto cloner = makeDatabaseCloner(&resumingSharedData);
cloner->setStopAfterStage_forTest("listExistingCollections");
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 3f32411c555..32666685c4b 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -141,7 +141,6 @@ MONGO_FAIL_POINT_DEFINE(pauseAfterRunTenantMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(skipTenantMigrationRecipientAuth);
MONGO_FAIL_POINT_DEFINE(skipComparingRecipientAndDonorFCV);
MONGO_FAIL_POINT_DEFINE(autoRecipientForgetMigration);
-MONGO_FAIL_POINT_DEFINE(pauseAfterCreatingOplogBuffer);
MONGO_FAIL_POINT_DEFINE(skipFetchingCommittedTransactions);
MONGO_FAIL_POINT_DEFINE(skipFetchingRetryableWritesEntriesBeforeStartOpTime);
@@ -160,7 +159,7 @@ MONGO_FAIL_POINT_DEFINE(fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime);
MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogFetcherMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout);
MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance);
-MONGO_FAIL_POINT_DEFINE(fpBeforeMarkingCollectionClonerDone);
+MONGO_FAIL_POINT_DEFINE(fpBeforeMarkingCloneSuccess);
MONGO_FAIL_POINT_DEFINE(fpBeforeFetchingCommittedTransactions);
MONGO_FAIL_POINT_DEFINE(fpAfterFetchingCommittedTransactions);
MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance);
@@ -515,8 +514,14 @@ TenantMigrationRecipientService::Instance::waitUntilMigrationReachesReturnAfterR
// In the event of a donor failover, it is possible that a new donor has stepped up and
// initiated this 'recipientSyncData' cmd. Make sure the recipient is not in the middle of
// restarting the oplog applier to retry the future chain.
- opCtx->waitForConditionOrInterrupt(
- _restartOplogApplierCondVar, lk, [&] { return !_isRestartingOplogApplier; });
+ //
+ // For shard merge protocol, we start tenant oplog applier after recipient informs donor,
+ // the data is in consistent state. So, there is a possibility, recipient might receive
+ // recipientSyncData cmd with `returnAfterReachingDonorTimestamp` from donor before the
+ // recipient has started the tenant oplog applier.
+ opCtx->waitForConditionOrInterrupt(_oplogApplierReadyCondVar, lk, [&] {
+ return _oplogApplierReady || _dataSyncCompletionPromise.getFuture().isReady();
+ });
if (_dataSyncCompletionPromise.getFuture().isReady()) {
// When the data sync is done, we reset _tenantOplogApplier, so just throw the data sync
// completion future result.
@@ -649,8 +654,7 @@ OpTime TenantMigrationRecipientService::Instance::_getDonorMajorityOpTime(
return majorityOpTime;
}
-SemiFuture<TenantMigrationRecipientService::Instance::ConnectionPair>
-TenantMigrationRecipientService::Instance::_createAndConnectClients() {
+SemiFuture<void> TenantMigrationRecipientService::Instance::_createAndConnectClients() {
LOGV2_DEBUG(4880401,
1,
"Recipient migration service connecting clients",
@@ -763,12 +767,19 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() {
applicationName += "_oplogFetcher";
auto oplogFetcherClient = _connectAndAuth(serverAddress, applicationName);
return ConnectionPair(std::move(client), std::move(oplogFetcherClient));
+ })
+ .then([this, self = shared_from_this()](ConnectionPair connectionPair) {
+ stdx::lock_guard lk(_mutex);
+ if (_taskState.isInterrupted()) {
+ uassertStatusOK(_taskState.getInterruptStatus());
+ }
+
+ _client = std::move(connectionPair.first);
+ _oplogFetcherClient = std::move(connectionPair.second);
});
})
.until([this, self = shared_from_this(), kDelayedMajorityOpTimeErrorCode](
- const StatusWith<ConnectionPair>& statusWith) {
- auto status = statusWith.getStatus();
-
+ const Status& status) {
if (status.isOK()) {
return true;
}
@@ -1105,12 +1116,13 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam
});
}
-void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock lk) {
+SemiFuture<void> TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor() {
+ stdx::lock_guard lk(_mutex);
// Get the last oplog entry at the read concern majority optime in the remote oplog. It
// does not matter which tenant it is for.
- if (_sharedData->isResuming()) {
+ if (_sharedData->getResumePhase() != ResumePhase::kNone) {
// We are resuming a migration.
- return;
+ return SemiFuture<void>::makeReady();
}
auto isShardMerge = _stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge;
@@ -1180,6 +1192,8 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo
// between startFetchingDonorOpTime and startApplyingDonorOpTime.
_stateDoc.setStartFetchingDonorOpTime(
std::min(startFetchingDonorOpTime, *_stateDoc.getStartApplyingDonorOpTime()));
+
+ return _updateStateDocForMajority(lk);
}
AggregateCommandRequest
@@ -1328,6 +1342,11 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr
SemiFuture<void>
TenantMigrationRecipientService::Instance::_fetchCommittedTransactionsBeforeStartOpTime() {
+ {
+ auto opCtx = cc().makeOperationContext();
+ _stopOrHangOnFailPoint(&fpBeforeFetchingCommittedTransactions, opCtx.get());
+ }
+
if (MONGO_unlikely(skipFetchingCommittedTransactions.shouldFail())) { // Test-only.
return SemiFuture<void>::makeReady();
}
@@ -1378,8 +1397,8 @@ TenantMigrationRecipientService::Instance::_fetchCommittedTransactionsBeforeStar
.semi();
}
-void TenantMigrationRecipientService::Instance::_createOplogBuffer() {
- auto opCtx = cc().makeOperationContext();
+void TenantMigrationRecipientService::Instance::_createOplogBuffer(WithLock,
+ OperationContext* opCtx) {
OplogBufferCollection::Options options;
options.peekCacheSize = static_cast<size_t>(tenantMigrationOplogBufferPeekCacheSize);
options.dropCollectionAtStartup = false;
@@ -1388,35 +1407,16 @@ void TenantMigrationRecipientService::Instance::_createOplogBuffer() {
auto oplogBufferNS = getOplogBufferNs(getMigrationUUID());
if (!_donorOplogBuffer) {
- // Create the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown.
+
auto bufferCollection = std::make_unique<OplogBufferCollection>(
- StorageInterface::get(opCtx.get()), oplogBufferNS, options);
- stdx::lock_guard lk(_mutex);
+ StorageInterface::get(opCtx), oplogBufferNS, options);
_donorOplogBuffer = std::move(bufferCollection);
}
-
- {
- stdx::lock_guard lk(_mutex);
- invariant(_stateDoc.getStartFetchingDonorOpTime());
- }
-
- {
- // Ensure we are primary when trying to startup and create the oplog buffer collection.
- auto coordinator = repl::ReplicationCoordinator::get(opCtx.get());
- Lock::GlobalLock globalLock(opCtx.get(), MODE_IX);
- if (!coordinator->canAcceptWritesForDatabase(opCtx.get(), oplogBufferNS.db())) {
- uassertStatusOK(
- Status(ErrorCodes::NotWritablePrimary,
- "Recipient node is not primary, cannot create oplog buffer collection."));
- }
- _donorOplogBuffer->startup(opCtx.get());
- }
-
- pauseAfterCreatingOplogBuffer.pauseWhileSet();
}
SemiFuture<void>
TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBeforeStartOpTime() {
+ _stopOrHangOnFailPoint(&fpAfterRetrievingStartOpTimesMigrationRecipientInstance);
if (MONGO_unlikely(
skipFetchingRetryableWritesEntriesBeforeStartOpTime.shouldFail())) { // Test-only.
return SemiFuture<void>::makeReady();
@@ -1575,6 +1575,8 @@ TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBeforeStart
}
void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
+ _stopOrHangOnFailPoint(&fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime);
+
auto opCtx = cc().makeOperationContext();
OpTime startFetchOpTime;
auto resumingFromOplogBuffer = false;
@@ -1586,7 +1588,7 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
startFetchOpTime = *_stateDoc.getStartFetchingDonorOpTime();
}
- if (_sharedData->isResuming()) {
+ if (_sharedData->getResumePhase() != ResumePhase::kNone) {
// If the oplog buffer already contains fetched documents, we must be resuming a
// migration.
if (auto topOfOplogBuffer = _donorOplogBuffer->lastObjectPushed(opCtx.get())) {
@@ -1614,7 +1616,7 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
"sync source selection");
}
- stdx::lock_guard lk(_mutex);
+ stdx::unique_lock lk(_mutex);
OplogFetcher::Config oplogFetcherConfig(
startFetchOpTime,
_oplogFetcherClient->getServerHostAndPort(),
@@ -1650,6 +1652,9 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
std::move(oplogFetcherConfig));
_donorOplogFetcher->setConnection(std::move(_oplogFetcherClient));
uassertStatusOK(_donorOplogFetcher->startup());
+
+ lk.unlock();
+ _stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance);
}
Status TenantMigrationRecipientService::Instance::_enqueueDocuments(
@@ -1772,17 +1777,8 @@ void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint
}
}
-bool TenantMigrationRecipientService::Instance::_isCloneCompletedMarkerSet(WithLock) const {
- return _stateDoc.getCloneFinishedRecipientOpTime().has_value();
-}
-
-OpTime TenantMigrationRecipientService::Instance::_getOplogResumeApplyingDonorOptime() const {
- auto cloneFinishedRecipientOpTime = [this, self = shared_from_this()] {
- stdx::lock_guard lk(_mutex);
- auto opt = _stateDoc.getCloneFinishedRecipientOpTime();
- invariant(opt.has_value());
- return *opt;
- }();
+OpTime TenantMigrationRecipientService::Instance::_getOplogResumeApplyingDonorOptime(
+ const OpTime& cloneFinishedRecipientOpTime) const {
auto opCtx = cc().makeOperationContext();
OplogInterfaceLocal oplog(opCtx.get());
auto oplogIter = oplog.makeIterator();
@@ -1820,7 +1816,7 @@ OpTime TenantMigrationRecipientService::Instance::_getOplogResumeApplyingDonorOp
Future<void> TenantMigrationRecipientService::Instance::_startTenantAllDatabaseCloner(WithLock lk) {
// If the state is data consistent, do not start the cloner.
- if (_isCloneCompletedMarkerSet(lk)) {
+ if (_sharedData->getResumePhase() == ResumePhase::kOplogCatchup) {
return {Future<void>::makeReady()};
}
@@ -1908,39 +1904,42 @@ void TenantMigrationRecipientService::Instance::_advanceStableTimestampToStartAp
}
SemiFuture<void> TenantMigrationRecipientService::Instance::_onCloneSuccess() {
+ _stopOrHangOnFailPoint(&fpBeforeMarkingCloneSuccess);
stdx::lock_guard lk(_mutex);
// PrimaryOnlyService::onStepUp() before starting instance makes sure that the state doc
// is majority committed, so we can also skip waiting for it to be majority replicated.
- if (_isCloneCompletedMarkerSet(lk)) {
+ if (_sharedData->getResumePhase() == ResumePhase::kOplogCatchup) {
return SemiFuture<void>::makeReady();
}
- {
+ if (_protocol == MigrationProtocolEnum::kMultitenantMigrations) {
stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData);
auto lastVisibleMajorityCommittedDonorOpTime =
_sharedData->getLastVisibleOpTime(sharedDatalk);
invariant(!lastVisibleMajorityCommittedDonorOpTime.isNull());
_stateDoc.setDataConsistentStopDonorOpTime(lastVisibleMajorityCommittedDonorOpTime);
}
+
_stateDoc.setCloneFinishedRecipientOpTime(
repl::ReplicationCoordinator::get(cc().getServiceContext())->getMyLastAppliedOpTime());
+ return _updateStateDocForMajority(lk);
+}
- return ExecutorFuture(**_scopedExecutor)
- .then([this, self = shared_from_this(), stateDoc = _stateDoc] {
- auto opCtx = cc().makeOperationContext();
-
- _stopOrHangOnFailPoint(&fpBeforeMarkingCollectionClonerDone, opCtx.get());
- uassertStatusOK(
- tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc));
+SemiFuture<TenantOplogApplier::OpTimePair>
+TenantMigrationRecipientService::Instance::_waitForDataToBecomeConsistent() {
+ stdx::lock_guard lk(_mutex);
+ // PrimaryOnlyService::onStepUp() before starting instance makes sure that the state doc
+ // is majority committed, so we can also skip waiting for it to be majority replicated.
+ if (_stateDoc.getState() == TenantMigrationRecipientStateEnum::kConsistent) {
+ return SemiFuture<TenantOplogApplier::OpTimePair>::makeReady(
+ TenantOplogApplier::OpTimePair());
+ }
- auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(writeOpTime, CancellationToken::uncancelable());
- })
- .semi();
+ return _tenantOplogApplier->getNotificationForOpTime(
+ _stateDoc.getDataConsistentStopDonorOpTime().get());
}
-SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFuture() {
+SemiFuture<void> TenantMigrationRecipientService::Instance::_persistConsistentState() {
stdx::lock_guard lk(_mutex);
// PrimaryOnlyService::onStepUp() before starting instance makes sure that the state doc
// is majority committed, so we can also skip waiting for it to be majority replicated.
@@ -1948,24 +1947,41 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFu
return SemiFuture<void>::makeReady();
}
- return _tenantOplogApplier
- ->getNotificationForOpTime(_stateDoc.getDataConsistentStopDonorOpTime().get())
+ // Persist the state that tenant migration instance has reached
+ // consistent state.
+ _stateDoc.setState(TenantMigrationRecipientStateEnum::kConsistent);
+ return _updateStateDocForMajority(lk);
+}
+
+SemiFuture<void> TenantMigrationRecipientService::Instance::_enterConsistentState() {
+ return _persistConsistentState()
.thenRunOn(**_scopedExecutor)
- .then(
- [this, self = shared_from_this()](TenantOplogApplier::OpTimePair donorRecipientOpTime) {
- stdx::lock_guard lk(_mutex);
- // Persist the state that tenant migration instance has reached
- // consistent state.
- _stateDoc.setState(TenantMigrationRecipientStateEnum::kConsistent);
- return _stateDoc;
- })
- .then([this, self = shared_from_this()](TenantMigrationRecipientDocument stateDoc) {
- auto opCtx = cc().makeOperationContext();
- uassertStatusOK(
- tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc));
- return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- CancellationToken::uncancelable());
+ .then([this, self = shared_from_this()]() {
+ _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise);
+ stdx::lock_guard lk(_mutex);
+
+ auto donorConsistentOpTime = [&]() {
+ switch (_protocol) {
+ case MigrationProtocolEnum::kMultitenantMigrations:
+ return _stateDoc.getDataConsistentStopDonorOpTime();
+ case MigrationProtocolEnum::kShardMerge:
+ return _stateDoc.getStartApplyingDonorOpTime();
+ default:
+ MONGO_UNREACHABLE;
+ }
+ boost::optional<repl::OpTime>();
+ }();
+ invariant(donorConsistentOpTime && !donorConsistentOpTime->isNull());
+
+ LOGV2_DEBUG(4881101,
+ 1,
+ "Tenant migration recipient instance is in consistent state",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = getTenantId(),
+ "donorConsistentOpTime"_attr = *donorConsistentOpTime);
+ if (!_dataConsistentPromise.getFuture().isReady()) {
+ _dataConsistentPromise.emplaceValue(*donorConsistentOpTime);
+ }
})
.semi();
}
@@ -2208,9 +2224,6 @@ void TenantMigrationRecipientService::Instance::_cleanupOnDataSyncCompletion(Sta
std::unique_ptr<ThreadPool> savedWriterPool;
{
stdx::lock_guard lk(_mutex);
- _isRestartingOplogApplier = false;
- _restartOplogApplierCondVar.notify_all();
-
_cancelRemainingWork(lk);
shutdownTarget(lk, _donorOplogFetcher);
@@ -2223,6 +2236,9 @@ void TenantMigrationRecipientService::Instance::_cleanupOnDataSyncCompletion(Sta
setPromiseErrorifNotReady(lk, _dataConsistentPromise, status);
setPromiseErrorifNotReady(lk, _dataSyncCompletionPromise, status);
+ _oplogApplierReady = false;
+ _oplogApplierReadyCondVar.notify_all();
+
// Save them to join() with it outside of _mutex.
using std::swap;
swap(savedDonorOplogFetcher, _donorOplogFetcher);
@@ -2288,6 +2304,35 @@ void TenantMigrationRecipientService::Instance::_fetchAndStoreDonorClusterTimeKe
tenant_migration_util::storeExternalClusterTimeKeyDocs(std::move(keyDocs));
}
+SemiFuture<void>
+TenantMigrationRecipientService::Instance::_checkIfFcvHasChangedSinceLastAttempt() {
+ stdx::lock_guard lk(_mutex);
+
+ // Record the FCV at the start of a migration and check for changes in every
+ // subsequent attempt. Fail if there is any mismatch in FCV or
+ // upgrade/downgrade state. (Generic FCV reference): This FCV check should
+ // exist across LTS binary versions.
+ auto currentFCV = serverGlobalParams.featureCompatibility.getVersion();
+ auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV();
+
+ if (!startingFCV) {
+ _stateDoc.setRecipientPrimaryStartingFCV(currentFCV);
+ return _updateStateDocForMajority(lk);
+ }
+
+ if (startingFCV != currentFCV) {
+ LOGV2_ERROR(5356200,
+ "FCV may not change during migration",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "startingFCV"_attr = startingFCV,
+ "currentFCV"_attr = currentFCV);
+ uasserted(5356201, "Detected FCV change from last migration attempt.");
+ }
+
+ return SemiFuture<void>::makeReady();
+}
+
void TenantMigrationRecipientService::Instance::_compareRecipientAndDonorFCV() const {
if (skipComparingRecipientAndDonorFCV.shouldFail()) { // Test-only.
return;
@@ -2347,6 +2392,207 @@ bool TenantMigrationRecipientService::Instance::_checkifProtocolRemainsFCVCompat
return true;
}
+void TenantMigrationRecipientService::Instance::_startOplogApplier() {
+ _stopOrHangOnFailPoint(&fpAfterFetchingCommittedTransactions);
+
+ stdx::unique_lock lk(_mutex);
+ const auto& cloneFinishedRecipientOpTime = _stateDoc.getCloneFinishedRecipientOpTime();
+ invariant(cloneFinishedRecipientOpTime);
+
+ OpTime resumeOpTime;
+ if (_sharedData->getResumePhase() == ResumePhase::kOplogCatchup) {
+ lk.unlock();
+ // We avoid holding the mutex while scanning the local oplog which
+ // acquires the RSTL in IX mode. This is to allow us to be interruptable
+ // via a concurrent stepDown which acquires the RSTL in X mode.
+ resumeOpTime = _getOplogResumeApplyingDonorOptime(*cloneFinishedRecipientOpTime);
+ lk.lock();
+ }
+
+ // Throwing error when cloner is canceled externally via interrupt(),
+ // makes the instance to skip the remaining task (i.e., starting oplog
+ // applier) in the sync process. This step is necessary to prevent race
+ // between interrupt() and starting oplog applier for the failover
+ // scenarios where we don't start the cloner if the tenant data is
+ // already in consistent state.
+ if (_taskState.isInterrupted()) {
+ uassertStatusOK(_taskState.getInterruptStatus());
+ }
+
+ const auto& startApplyingDonorOpTime = _stateDoc.getStartApplyingDonorOpTime();
+ invariant(startApplyingDonorOpTime);
+
+ _tenantOplogApplier = std::make_shared<TenantOplogApplier>(
+ _migrationUuid,
+ _tenantId,
+ (!resumeOpTime.isNull()) ? std::max(resumeOpTime, *startApplyingDonorOpTime)
+ : *startApplyingDonorOpTime,
+ _donorOplogBuffer.get(),
+ **_scopedExecutor,
+ _writerPool.get(),
+ resumeOpTime.getTimestamp());
+ _tenantOplogApplier->setCloneFinishedRecipientOpTime(*cloneFinishedRecipientOpTime);
+
+ LOGV2_DEBUG(4881202,
+ 1,
+ "Recipient migration service starting oplog applier",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "startApplyingAfterDonorOpTime"_attr =
+ _tenantOplogApplier->getStartApplyingAfterOpTime(),
+ "resumeBatchingTs"_attr = _tenantOplogApplier->getResumeBatchingTs());
+
+ uassertStatusOK(_tenantOplogApplier->startup());
+ _oplogApplierReady = true;
+ _oplogApplierReadyCondVar.notify_all();
+
+ lk.unlock();
+ _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance);
+}
+
+void TenantMigrationRecipientService::Instance::_setup() {
+ auto opCtx = cc().makeOperationContext();
+ {
+ stdx::lock_guard lk(_mutex);
+ // Do not set the internal states if the migration is already interrupted.
+ if (_taskState.isInterrupted()) {
+ uassertStatusOK(_taskState.getInterruptStatus());
+ }
+
+ // Reuse the _writerPool for retry of the future chain.
+ if (!_writerPool) {
+ _writerPool = makeTenantMigrationWriterPool();
+ }
+
+ ResumePhase resumePhase = [&] {
+ if (_stateDoc.getCloneFinishedRecipientOpTime()) {
+ invariant(_stateDoc.getStartFetchingDonorOpTime());
+ return ResumePhase::kOplogCatchup;
+ }
+ if (_stateDoc.getStartFetchingDonorOpTime()) {
+ return ResumePhase::kDataSync;
+ }
+ return ResumePhase::kNone;
+ }();
+
+ _sharedData = std::make_unique<TenantMigrationSharedData>(
+ getGlobalServiceContext()->getFastClockSource(), getMigrationUUID(), resumePhase);
+
+ _createOplogBuffer(lk, opCtx.get());
+ }
+
+ // Start the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown.
+ try {
+ _donorOplogBuffer->startup(opCtx.get());
+ } catch (DBException& ex) {
+ ex.addContext("Failed to create oplog buffer collection.");
+ throw;
+ }
+}
+
+SemiFuture<TenantOplogApplier::OpTimePair>
+TenantMigrationRecipientService::Instance::_waitForOplogApplierToStop() {
+ _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance);
+
+ stdx::lock_guard lk(_mutex);
+ // wait for oplog applier to complete/stop.
+ // The oplog applier does not exit normally; it must be shut down externally,
+ // e.g. by recipientForgetMigration.
+ return _tenantOplogApplier->getNotificationForOpTime(OpTime::max());
+}
+
+SemiFuture<TenantOplogApplier::OpTimePair>
+TenantMigrationRecipientService::Instance::_migrateUsingMTMProtocol(
+ const CancellationToken& token) {
+ return ExecutorFuture(**_scopedExecutor)
+ .then([this, self = shared_from_this()] { return _getStartOpTimesFromDonor(); })
+ .then([this, self = shared_from_this()] {
+ return _fetchRetryableWritesOplogBeforeStartOpTime();
+ })
+ .then([this, self = shared_from_this()] { _startOplogFetcher(); })
+ .then([this, self = shared_from_this()] {
+ stdx::lock_guard lk(_mutex);
+ return _startTenantAllDatabaseCloner(lk);
+ })
+ .then([this, self = shared_from_this()] { return _onCloneSuccess(); })
+ .then([this, self = shared_from_this()] {
+ return _fetchCommittedTransactionsBeforeStartOpTime();
+ })
+ .then([this, self = shared_from_this()] { return _startOplogApplier(); })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance);
+ return _waitForDataToBecomeConsistent();
+ })
+ .then(
+ [this, self = shared_from_this()](TenantOplogApplier::OpTimePair donorRecipientOpTime) {
+ return _enterConsistentState();
+ })
+ .then([this, self = shared_from_this()] { return _waitForOplogApplierToStop(); })
+ .semi();
+}
+
+SemiFuture<TenantOplogApplier::OpTimePair>
+TenantMigrationRecipientService::Instance::_migrateUsingShardMergeProtocol(
+ const CancellationToken& token) {
+ return ExecutorFuture(**_scopedExecutor)
+ .then([this, self = shared_from_this(), token] {
+ return AsyncTry([this, self = shared_from_this(), token] {
+ return _getDonorFilenames(token);
+ })
+ .until([](Status status) {
+ if (status.code() == ErrorCodes::BackupCursorOpenConflictWithCheckpoint) {
+ LOGV2_DEBUG(6113008,
+ 1,
+ "Retrying backup cursor creation after error",
+ "status"_attr = status);
+ // A checkpoint took place while opening a backup cursor. We
+ // should retry and *not* cancel migration.
+ return false;
+ }
+
+ return true;
+ })
+ .on(**_scopedExecutor, token);
+ })
+ .then([this, self = shared_from_this(), token] {
+ LOGV2_DEBUG(6113200,
+ 1,
+ "Starting periodic 'getMore' requests to keep "
+ "backup cursor alive.");
+ stdx::lock_guard lk(_mutex);
+ _backupCursorKeepAliveCancellation = CancellationSource(token);
+ _backupCursorKeepAliveFuture =
+ shard_merge_utils::keepBackupCursorAlive(_backupCursorKeepAliveCancellation,
+ **_scopedExecutor,
+ _client->getServerHostAndPort(),
+ _donorFilenameBackupCursorId,
+ _donorFilenameBackupCursorNamespaceString);
+ })
+ .then([this, self = shared_from_this()] {
+ stdx::lock_guard lk(_mutex);
+ _stateDoc.setState(TenantMigrationRecipientStateEnum::kLearnedFilenames);
+ return _updateStateDocForMajority(lk);
+ })
+ .then([this, self = shared_from_this()] { return _getStartOpTimesFromDonor(); })
+ .then([this, self = shared_from_this()] {
+ return _fetchRetryableWritesOplogBeforeStartOpTime();
+ })
+ .then([this, self = shared_from_this()] { _startOplogFetcher(); })
+ .then([this, self = shared_from_this()] {
+ LOGV2_INFO(6113402, "Waiting for all nodes to call recipientVoteImportedFiles");
+ return _importedFilesPromise.getFuture().semi();
+ })
+ .then([this, self = shared_from_this()] { return _killBackupCursor(); })
+ .then([this, self = shared_from_this()] { return _onCloneSuccess(); })
+ .then([this, self = shared_from_this()] { return _enterConsistentState(); })
+ .then([this, self = shared_from_this()] {
+ return _fetchCommittedTransactionsBeforeStartOpTime();
+ })
+ .then([this, self = shared_from_this()] { return _startOplogApplier(); })
+ .then([this, self = shared_from_this()] { return _waitForOplogApplierToStop(); })
+ .semi();
+}
+
SemiFuture<void> TenantMigrationRecipientService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
@@ -2474,7 +2720,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
stdx::lock_guard<Latch> lg(_mutex);
uassert(ErrorCodes::TenantMigrationForgotten,
str::stream() << "Migration " << getMigrationUUID()
- << " already marked for garbage collect",
+ << " already marked for garbage collection",
_stateDoc.getState() !=
TenantMigrationRecipientStateEnum::kDone &&
!_stateDoc.getExpireAt());
@@ -2489,277 +2735,32 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
&fpAfterPersistingTenantMigrationRecipientInstanceStateDoc);
return _createAndConnectClients();
})
- .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) {
- stdx::lock_guard lk(_mutex);
- if (_taskState.isInterrupted()) {
- uassertStatusOK(_taskState.getInterruptStatus());
- }
-
- // interrupt() called after this code block will interrupt the cloner and
- // fetcher.
- _client = std::move(ConnectionPair.first);
- _oplogFetcherClient = std::move(ConnectionPair.second);
-
- if (!_writerPool) {
- // Create the writer pool and shared data.
- _writerPool = makeTenantMigrationWriterPool();
- }
- _sharedData = std::make_unique<TenantMigrationSharedData>(
- getGlobalServiceContext()->getFastClockSource(),
- getMigrationUUID(),
- _stateDoc.getStartFetchingDonorOpTime().has_value());
- })
.then([this, self = shared_from_this(), token] {
_stopOrHangOnFailPoint(&fpBeforeFetchingDonorClusterTimeKeys);
_fetchAndStoreDonorClusterTimeKeyDocs(token);
})
.then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance);
- stdx::lock_guard lk(_mutex);
-
- // Record the FCV at the start of a migration and check for changes in every
- // subsequent attempt. Fail if there is any mismatch in FCV or
- // upgrade/downgrade state. (Generic FCV reference): This FCV check should
- // exist across LTS binary versions.
- auto currentFCV = serverGlobalParams.featureCompatibility.getVersion();
- auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV();
-
- if (!startingFCV) {
- _stateDoc.setRecipientPrimaryStartingFCV(currentFCV);
- return _updateStateDocForMajority(lk);
- }
-
- if (startingFCV != currentFCV) {
- LOGV2_ERROR(5356200,
- "FCV may not change during migration",
- "tenantId"_attr = getTenantId(),
- "migrationId"_attr = getMigrationUUID(),
- "startingFCV"_attr = startingFCV,
- "currentFCV"_attr = currentFCV);
- uasserted(5356201, "Detected FCV change from last migration attempt.");
- }
-
- return SemiFuture<void>::makeReady();
+ return _checkIfFcvHasChangedSinceLastAttempt();
})
.then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV);
_compareRecipientAndDonorFCV();
})
- .then([this, self = shared_from_this(), token] {
+ .then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterComparingRecipientAndDonorFCV);
- if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) {
- return SemiFuture<void>::makeReady().thenRunOn(**_scopedExecutor);
- }
-
- return AsyncTry([this, self = shared_from_this(), token] {
- return _getDonorFilenames(token);
- })
- .until([](Status status) {
- if (status.code() ==
- ErrorCodes::BackupCursorOpenConflictWithCheckpoint) {
- LOGV2_DEBUG(6113008,
- 1,
- "Retrying backup cursor creation after error",
- "status"_attr = status);
- // A checkpoint took place while opening a backup cursor. We
- // should retry and *not* cancel migration.
- return false;
- }
-
- return true;
- })
- .on(**_scopedExecutor, token);
+ // Sets up internal state to begin migration.
+ _setup();
})
.then([this, self = shared_from_this(), token] {
- if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) {
- return;
- }
-
- LOGV2_DEBUG(6113200,
- 1,
- "Starting periodic 'getMore' requests to keep "
- "backup cursor alive.");
- stdx::lock_guard lk(_mutex);
- _backupCursorKeepAliveCancellation = CancellationSource(token);
- _backupCursorKeepAliveFuture = shard_merge_utils::keepBackupCursorAlive(
- _backupCursorKeepAliveCancellation,
- **_scopedExecutor,
- _client->getServerHostAndPort(),
- _donorFilenameBackupCursorId,
- _donorFilenameBackupCursorNamespaceString);
- })
- .then([this, self = shared_from_this()] {
- stdx::lock_guard lk(_mutex);
- _getStartOpTimesFromDonor(lk);
- return _updateStateDocForMajority(lk);
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(
- &fpAfterRetrievingStartOpTimesMigrationRecipientInstance);
- _createOplogBuffer();
- return _fetchRetryableWritesOplogBeforeStartOpTime();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(
- &fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime);
- _startOplogFetcher();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(
- &fpAfterStartingOplogFetcherMigrationRecipientInstance);
-
- stdx::unique_lock lk(_mutex);
-
- // Create the oplog applier but do not start it yet.
- invariant(_stateDoc.getStartApplyingDonorOpTime());
-
- OpTime beginApplyingAfterOpTime;
- Timestamp resumeBatchingTs;
- if (_isCloneCompletedMarkerSet(lk)) {
- lk.unlock();
- // We avoid holding the mutex while scanning the local oplog which
- // acquires the RSTL in IX mode. This is to allow us to be interruptable
- // via a concurrent stepDown which acquires the RSTL in X mode.
- const auto resumeOpTime = _getOplogResumeApplyingDonorOptime();
- if (!resumeOpTime.isNull()) {
- // It's possible we've applied retryable writes no-op oplog entries
- // with donor opTimes earlier than 'startApplyingDonorOpTime'. In
- // this case, we resume batching from a timestamp earlier than the
- // 'beginApplyingAfterOpTime'.
- resumeBatchingTs = resumeOpTime.getTimestamp();
- }
-
- lk.lock();
-
- // We are retrying from failure. Find the point at which we should resume
- // oplog batching and oplog application.
- beginApplyingAfterOpTime =
- std::max(resumeOpTime, *_stateDoc.getStartApplyingDonorOpTime());
- LOGV2_DEBUG(5394601,
- 1,
- "Resuming oplog application from previous tenant "
- "migration attempt",
- "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime,
- "resumeBatchingOpTime"_attr = resumeOpTime);
- } else {
- beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime();
- }
-
- {
- // Throwing error when cloner is canceled externally via interrupt(),
- // makes the instance to skip the remaining task (i.e., starting oplog
- // applier) in the sync process. This step is necessary to prevent race
- // between interrupt() and starting oplog applier for the failover
- // scenarios where we don't start the cloner if the tenant data is
- // already in consistent state.
- stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData);
- uassertStatusOK(_sharedData->getStatus(sharedDatalk));
- }
-
- LOGV2_DEBUG(4881202,
- 1,
- "Recipient migration service creating oplog applier",
- "tenantId"_attr = getTenantId(),
- "migrationId"_attr = getMigrationUUID(),
- "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime);
- _tenantOplogApplier =
- std::make_shared<TenantOplogApplier>(_migrationUuid,
- _tenantId,
- beginApplyingAfterOpTime,
- _donorOplogBuffer.get(),
- **_scopedExecutor,
- _writerPool.get(),
- resumeBatchingTs);
- })
- .then([this, self = shared_from_this()] {
- if (_stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge) {
- return Future<void>::makeReady();
+ switch (_protocol) {
+ case MigrationProtocolEnum::kMultitenantMigrations:
+ return _migrateUsingMTMProtocol(token);
+ case MigrationProtocolEnum::kShardMerge:
+ return _migrateUsingShardMergeProtocol(token);
+ default:
+ MONGO_UNREACHABLE;
}
- stdx::lock_guard lk(_mutex);
- return _startTenantAllDatabaseCloner(lk);
- })
- .then([this, self = shared_from_this()] {
- if (_stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge) {
- return SemiFuture<void>::makeReady();
- }
- return _onCloneSuccess();
- })
- .then([this, self = shared_from_this()] {
- if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) {
- return SemiFuture<void>::makeReady();
- }
-
- stdx::lock_guard lk(_mutex);
- _stateDoc.setDataConsistentStopDonorOpTime(
- _stateDoc.getStartApplyingDonorOpTime());
- _stateDoc.setState(TenantMigrationRecipientStateEnum::kLearnedFilenames);
- return _updateStateDocForMajority(lk);
- })
- .then([this, self = shared_from_this()] {
- if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) {
- return SemiFuture<void>::makeReady();
- }
-
- LOGV2_INFO(6113402,
- "Waiting for all nodes to call recipientVoteImportedFiles");
- return _importedFilesPromise.getFuture().semi();
- })
- .then([this, self = shared_from_this()] { return _killBackupCursor(); })
- .then([this, self = shared_from_this()] {
- {
- auto opCtx = cc().makeOperationContext();
- _stopOrHangOnFailPoint(&fpBeforeFetchingCommittedTransactions,
- opCtx.get());
- }
- return _fetchCommittedTransactionsBeforeStartOpTime();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterFetchingCommittedTransactions);
- LOGV2_DEBUG(4881200,
- 1,
- "Recipient migration service starting oplog applier",
- "tenantId"_attr = getTenantId(),
- "migrationId"_attr = getMigrationUUID());
- {
- stdx::lock_guard lk(_mutex);
- auto cloneFinishedRecipientOpTime =
- _stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge
- ? repl::ReplicationCoordinator::get(cc().getServiceContext())
- ->getMyLastAppliedOpTime()
- : *_stateDoc.getCloneFinishedRecipientOpTime();
- _tenantOplogApplier->setCloneFinishedRecipientOpTime(
- cloneFinishedRecipientOpTime);
- uassertStatusOK(_tenantOplogApplier->startup());
- _isRestartingOplogApplier = false;
- _restartOplogApplierCondVar.notify_all();
- }
- _stopOrHangOnFailPoint(
- &fpAfterStartingOplogApplierMigrationRecipientInstance);
- return _getDataConsistentFuture();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise);
- stdx::lock_guard lk(_mutex);
- LOGV2_DEBUG(4881101,
- 1,
- "Tenant migration recipient instance is in consistent state",
- "migrationId"_attr = getMigrationUUID(),
- "tenantId"_attr = getTenantId(),
- "donorConsistentOpTime"_attr =
- _stateDoc.getDataConsistentStopDonorOpTime());
-
- if (!_dataConsistentPromise.getFuture().isReady()) {
- _dataConsistentPromise.emplaceValue(
- _stateDoc.getDataConsistentStopDonorOpTime().get());
- }
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance);
- stdx::lock_guard lk(_mutex);
- // wait for oplog applier to complete/stop.
- // The oplog applier does not exit normally; it must be shut down externally,
- // e.g. by recipientForgetMigration.
- return _tenantOplogApplier->getNotificationForOpTime(OpTime::max());
});
})
.until([this, self = shared_from_this()](
@@ -2792,7 +2793,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
if (!_taskState.isRunning()) {
_taskState.setState(TaskState::kRunning);
}
- _isRestartingOplogApplier = true;
+ _oplogApplierReady = false;
// Clean up the async components before retrying the future chain.
std::unique_ptr<OplogFetcher> savedDonorOplogFetcher;
std::shared_ptr<TenantOplogApplier> savedTenantOplogApplier;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 6972379144f..3378ffac337 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -357,7 +357,7 @@ public:
* Creates and connects both the oplog fetcher client and the client used for other
* operations.
*/
- SemiFuture<ConnectionPair> _createAndConnectClients();
+ SemiFuture<void> _createAndConnectClients();
/**
* Fetches all key documents from the donor's admin.system.keys collection, stores them in
@@ -376,9 +376,9 @@ public:
ExecutorFuture<void> _killBackupCursor();
/**
- * Retrieves the start optimes from the donor and updates the in-memory state accordingly.
+ * Retrieves the start optimes from the donor and updates the on-disk state accordingly.
*/
- void _getStartOpTimesFromDonor(WithLock lk);
+ SemiFuture<void> _getStartOpTimesFromDonor();
/**
* Pushes documents from oplog fetcher to oplog buffer.
@@ -394,7 +394,7 @@ public:
* Creates the oplog buffer that will be populated by donor oplog entries from the retryable
* writes fetching stage and oplog fetching stage.
*/
- void _createOplogBuffer();
+ void _createOplogBuffer(WithLock, OperationContext* opCtx);
/**
* Runs an aggregation that gets the entire oplog chain for every retryable write entry in
@@ -439,16 +439,11 @@ public:
BSONObj _getOplogFetcherFilter() const;
/*
- * Indicates that the recipient has completed the tenant cloning phase.
- */
- bool _isCloneCompletedMarkerSet(WithLock) const;
-
- /*
* Traverse backwards through the oplog to find the optime which tenant oplog application
* should resume from. The oplog applier should resume applying entries that have a greater
* optime than the returned value.
*/
- OpTime _getOplogResumeApplyingDonorOptime() const;
+ OpTime _getOplogResumeApplyingDonorOptime(const OpTime& cloneFinishedRecipientOpTime) const;
/*
* Starts the tenant cloner.
@@ -457,6 +452,16 @@ public:
Future<void> _startTenantAllDatabaseCloner(WithLock lk);
/*
+ * Starts the tenant oplog applier.
+ */
+ void _startOplogApplier();
+
+ /*
+ * Waits for tenant oplog applier to stop.
+ */
+ SemiFuture<TenantOplogApplier::OpTimePair> _waitForOplogApplierToStop();
+
+ /*
* Advances the stableTimestamp to be >= startApplyingDonorOpTime by:
* 1. Advancing the clusterTime to startApplyingDonorOpTime
* 2. Writing a no-op oplog entry with ts > startApplyingDonorOpTime
@@ -466,7 +471,7 @@ public:
const CancellationToken& token);
/*
- * Gets called when the cloner completes cloning data successfully.
+ * Gets called when the logical/file cloner completes cloning data successfully.
* And, it is responsible to populate the 'dataConsistentStopDonorOpTime'
* and 'cloneFinishedRecipientOpTime' fields in the state doc.
*/
@@ -479,6 +484,22 @@ public:
SemiFuture<void> _getDataConsistentFuture();
/*
+ * Wait for the data cloned via logical cloner to be consistent.
+ */
+ SemiFuture<TenantOplogApplier::OpTimePair> _waitForDataToBecomeConsistent();
+
+ /*
+ * Transitions the instance state to 'kConsistent'.
+ */
+ SemiFuture<void> _enterConsistentState();
+
+ /*
+ * Persists the instance state doc and waits for it to be majority replicated.
+ * Throws an user assertion on failure.
+ */
+ SemiFuture<void> _persistConsistentState();
+
+ /*
* Cancels the tenant migration recipient instance task work.
*/
void _cancelRemainingWork(WithLock lk);
@@ -514,19 +535,35 @@ public:
/*
* Returns the majority OpTime on the donor node that 'client' is connected to.
*/
-
OpTime _getDonorMajorityOpTime(std::unique_ptr<mongo::DBClientConnection>& client);
+
+ /*
+ * Detects recipient FCV changes during migration.
+ */
+ SemiFuture<void> _checkIfFcvHasChangedSinceLastAttempt();
+
/**
* Enforces that the donor and recipient share the same featureCompatibilityVersion.
*/
void _compareRecipientAndDonorFCV() const;
- /**
+ /*
* Increments either 'totalSuccessfulMigrationsReceived' or 'totalFailedMigrationsReceived'
* in TenantMigrationStatistics by examining status and promises.
*/
void _setMigrationStatsOnCompletion(Status completionStatus) const;
+ /*
+ * Sets up internal state to begin migration.
+ */
+ void _setup();
+
+ SemiFuture<TenantOplogApplier::OpTimePair> _migrateUsingMTMProtocol(
+ const CancellationToken& token);
+
+ SemiFuture<TenantOplogApplier::OpTimePair> _migrateUsingShardMergeProtocol(
+ const CancellationToken& token);
+
mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationRecipientService::_mutex");
// All member variables are labeled with one of the following codes indicating the
@@ -616,9 +653,10 @@ public:
// Waiters are notified when 'tenantOplogApplier' is valid on restart.
stdx::condition_variable _restartOplogApplierCondVar; // (M)
- // Indicates that the oplog applier is being cleaned up due to restart of the future chain.
- // This is set to true when the oplog applier is started up again.
- bool _isRestartingOplogApplier = false; // (M)
+ // Waiters are notified when 'tenantOplogApplier' is ready to use.
+ stdx::condition_variable _oplogApplierReadyCondVar; // (M)
+ // Indicates whether 'tenantOplogApplier' is ready to use or not.
+ bool _oplogApplierReady = false; // (M)
};
private:
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index 589b3e63cd0..02ba6104543 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -1725,7 +1725,7 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherResumesFromTopOfOplogBuf
// Hang after creating the oplog buffer collection but before starting the oplog fetcher.
const auto hangBeforeFetcherFp =
- globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer");
+ globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance");
auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn,
0,
BSON("action"
@@ -1839,7 +1839,7 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherNoDocInBufferToResumeFro
// Hang after creating the oplog buffer collection but before starting the oplog fetcher.
const auto hangBeforeFetcherFp =
- globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer");
+ globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance");
auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn,
0,
BSON("action"
@@ -2040,8 +2040,8 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplog
// The oplog applier should have started batching and applying at the donor opTime equal to
// 'resumeOpTime'.
const auto oplogApplier = getTenantOplogApplier(instance.get());
- ASSERT_EQUALS(resumeOpTime, oplogApplier->getBeginApplyingOpTime_forTest());
- ASSERT_EQUALS(resumeOpTime.getTimestamp(), oplogApplier->getResumeBatchingTs_forTest());
+ ASSERT_EQUALS(resumeOpTime, oplogApplier->getStartApplyingAfterOpTime());
+ ASSERT_EQUALS(resumeOpTime.getTimestamp(), oplogApplier->getResumeBatchingTs());
// Stop the oplog applier.
instance->stopOplogApplier_forTest();
@@ -2213,10 +2213,9 @@ TEST_F(TenantMigrationRecipientServiceTest,
const auto oplogApplier = getTenantOplogApplier(instance.get());
// Resume batching from the first migration no-op oplog entry. In this test, this is before
// the 'startApplyingDonorOpTime'.
- ASSERT_EQUALS(beforeStartApplyingOpTime.getTimestamp(),
- oplogApplier->getResumeBatchingTs_forTest());
+ ASSERT_EQUALS(beforeStartApplyingOpTime.getTimestamp(), oplogApplier->getResumeBatchingTs());
// The oplog applier starts applying from the donor opTime equal to 'beginApplyingOpTime'.
- ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getBeginApplyingOpTime_forTest());
+ ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getStartApplyingAfterOpTime());
// Stop the oplog applier.
instance->stopOplogApplier_forTest();
@@ -2360,9 +2359,9 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApp
const auto oplogApplier = getTenantOplogApplier(instance.get());
// There is no oplog entry to resume batching from, so we treat it as if we are resuming
// oplog application from the start. The 'resumeBatchingTs' will be a null timestamp.
- ASSERT_EQUALS(Timestamp(), oplogApplier->getResumeBatchingTs_forTest());
+ ASSERT_EQUALS(Timestamp(), oplogApplier->getResumeBatchingTs());
// The oplog applier starts applying from the donor opTime equal to 'beginApplyingOpTime'.
- ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getBeginApplyingOpTime_forTest());
+ ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getStartApplyingAfterOpTime());
// Stop the oplog applier.
instance->stopOplogApplier_forTest();
@@ -2401,7 +2400,7 @@ TEST_F(TenantMigrationRecipientServiceTest,
// Hang after creating the oplog buffer collection but before starting the oplog fetcher.
const auto hangBeforeFetcherFp =
- globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer");
+ globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance");
auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn,
0,
BSON("action"
diff --git a/src/mongo/db/repl/tenant_migration_shared_data.h b/src/mongo/db/repl/tenant_migration_shared_data.h
index 49c06c7ee7b..32ac179b13b 100644
--- a/src/mongo/db/repl/tenant_migration_shared_data.h
+++ b/src/mongo/db/repl/tenant_migration_shared_data.h
@@ -34,12 +34,15 @@
namespace mongo {
namespace repl {
+
+enum class ResumePhase { kNone, kDataSync, kOplogCatchup };
+
class TenantMigrationSharedData final : public ReplSyncSharedData {
public:
TenantMigrationSharedData(ClockSource* clock, const UUID& migrationId)
- : ReplSyncSharedData(clock), _migrationId(migrationId), _resuming(false) {}
- TenantMigrationSharedData(ClockSource* clock, const UUID& migrationId, bool resuming)
- : ReplSyncSharedData(clock), _migrationId(migrationId), _resuming(resuming) {}
+ : ReplSyncSharedData(clock), _migrationId(migrationId), _resumePhase(ResumePhase::kNone) {}
+ TenantMigrationSharedData(ClockSource* clock, const UUID& migrationId, ResumePhase resumePhase)
+ : ReplSyncSharedData(clock), _migrationId(migrationId), _resumePhase(resumePhase) {}
void setLastVisibleOpTime(WithLock, OpTime opTime);
@@ -49,8 +52,8 @@ public:
return _migrationId;
}
- bool isResuming() const {
- return _resuming;
+ ResumePhase getResumePhase() const {
+ return _resumePhase;
}
private:
@@ -61,8 +64,9 @@ private:
// Id of the current tenant migration.
const UUID _migrationId;
- // Indicate whether the tenant migration is resuming from a failover.
- const bool _resuming;
+ // Indicate the phase from which the tenant migration is resuming due to recipient/donor
+ // failovers.
+ const ResumePhase _resumePhase;
};
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 8a76189a52c..d92bd245187 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -65,7 +65,7 @@ MONGO_FAIL_POINT_DEFINE(fpBeforeTenantOplogApplyingBatch);
TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid,
const std::string& tenantId,
- OpTime applyFromOpTime,
+ OpTime startApplyingAfterOpTime,
RandomAccessOplogBuffer* oplogBuffer,
std::shared_ptr<executor::TaskExecutor> executor,
ThreadPool* writerPool,
@@ -73,7 +73,7 @@ TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid,
: AbstractAsyncComponent(executor.get(), std::string("TenantOplogApplier_") + tenantId),
_migrationUuid(migrationUuid),
_tenantId(tenantId),
- _beginApplyingAfterOpTime(applyFromOpTime),
+ _startApplyingAfterOpTime(startApplyingAfterOpTime),
_oplogBuffer(oplogBuffer),
_executor(std::move(executor)),
_writerPool(writerPool),
@@ -93,7 +93,7 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo
}
// If this optime has already passed, just return a ready future.
if (_lastAppliedOpTimesUpToLastBatch.donorOpTime >= donorOpTime ||
- _beginApplyingAfterOpTime >= donorOpTime) {
+ _startApplyingAfterOpTime >= donorOpTime) {
return SemiFuture<OpTimePair>::makeReady(_lastAppliedOpTimesUpToLastBatch);
}
@@ -103,11 +103,11 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo
return iter->second.getFuture().semi();
}
-OpTime TenantOplogApplier::getBeginApplyingOpTime_forTest() const {
- return _beginApplyingAfterOpTime;
+OpTime TenantOplogApplier::getStartApplyingAfterOpTime() const {
+ return _startApplyingAfterOpTime;
}
-Timestamp TenantOplogApplier::getResumeBatchingTs_forTest() const {
+Timestamp TenantOplogApplier::getResumeBatchingTs() const {
return _resumeBatchingTs;
}
@@ -121,7 +121,7 @@ void TenantOplogApplier::setCloneFinishedRecipientOpTime(OpTime cloneFinishedRec
Status TenantOplogApplier::_doStartup_inlock() noexcept {
_oplogBatcher = std::make_shared<TenantOplogBatcher>(
- _tenantId, _oplogBuffer, _executor, _resumeBatchingTs, _beginApplyingAfterOpTime);
+ _tenantId, _oplogBuffer, _executor, _resumeBatchingTs, _startApplyingAfterOpTime);
auto status = _oplogBatcher->startup();
if (!status.isOK())
return status;
@@ -906,9 +906,9 @@ std::vector<std::vector<const OplogEntry*>> TenantOplogApplier::_fillWriterVecto
CachedCollectionProperties collPropertiesCache;
for (auto&& op : batch->ops) {
- // If the operation's optime is before or the same as the beginApplyingAfterOpTime we don't
+ // If the operation's optime is before or the same as the startApplyingAfterOpTime we don't
// want to apply it, so don't include it in writerVectors.
- if (op.entry.getOpTime() <= _beginApplyingAfterOpTime)
+ if (op.entry.getOpTime() <= _startApplyingAfterOpTime)
continue;
uassert(4886006,
"Tenant oplog application does not support prepared transactions.",
diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h
index 58c4533b0d6..2ee03341c43 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.h
+++ b/src/mongo/db/repl/tenant_oplog_applier.h
@@ -73,7 +73,7 @@ public:
TenantOplogApplier(const UUID& migrationUuid,
const std::string& tenantId,
- OpTime applyFromOpTime,
+ OpTime StartApplyingAfterOpTime,
RandomAccessOplogBuffer* oplogBuffer,
std::shared_ptr<executor::TaskExecutor> executor,
ThreadPool* writerPool,
@@ -101,14 +101,14 @@ public:
void setCloneFinishedRecipientOpTime(OpTime cloneFinishedRecipientOpTime);
/**
- * Returns the optime the applier will start applying from. Used for testing.
+ * Returns the optime the applier will start applying from.
*/
- OpTime getBeginApplyingOpTime_forTest() const;
+ OpTime getStartApplyingAfterOpTime() const;
/**
- * Returns the timestamp the applier will resume batching from. Used for testing.
+ * Returns the timestamp the applier will resume batching from.
*/
- Timestamp getResumeBatchingTs_forTest() const;
+ Timestamp getResumeBatchingTs() const;
private:
Status _doStartup_inlock() noexcept final;
@@ -160,7 +160,7 @@ private:
std::shared_ptr<TenantOplogBatcher> _oplogBatcher; // (R)
const UUID _migrationUuid; // (R)
const std::string _tenantId; // (R)
- const OpTime _beginApplyingAfterOpTime; // (R)
+ const OpTime _startApplyingAfterOpTime; // (R)
RandomAccessOplogBuffer* _oplogBuffer; // (R)
std::shared_ptr<executor::TaskExecutor> _executor; // (R)
// All no-op entries written by this tenant migration should have OpTime greater than this