summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_recipient_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_recipient_service.cpp')
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp443
1 files changed, 262 insertions, 181 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 297ee928c6f..ab648a0526d 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -481,7 +481,6 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() {
}
const auto kDelayedMajorityOpTimeErrorCode = 5272000;
-
return AsyncTry([this,
self = shared_from_this(),
getHostCancelSource,
@@ -490,7 +489,6 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() {
// Get all donor hosts that we have excluded.
const auto& excludedHosts = _getExcludedDonorHosts(lk);
-
return _donorReplicaSetMonitor
->getHostOrRefresh(_readPreference, excludedHosts, getHostCancelSource.token())
.thenRunOn(**_scopedExecutor)
@@ -784,14 +782,18 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
options.dropCollectionAtStartup = false;
options.dropCollectionAtShutdown = false;
options.useTemporaryCollection = false;
- // Create the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown.
+
auto oplogBufferNS = getOplogBufferNs(getMigrationUUID());
- auto bufferCollection = std::make_unique<OplogBufferCollection>(
- StorageInterface::get(opCtx.get()), oplogBufferNS, options);
+ if (!_donorOplogBuffer) {
+ // Create the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown.
+ auto bufferCollection = std::make_unique<OplogBufferCollection>(
+ StorageInterface::get(opCtx.get()), oplogBufferNS, options);
+ stdx::lock_guard lk(_mutex);
+ _donorOplogBuffer = std::move(bufferCollection);
+ }
+
stdx::unique_lock lk(_mutex);
invariant(_stateDoc.getStartFetchingDonorOpTime());
- _donorOplogBuffer = std::move(bufferCollection);
-
{
// Ensure we are primary when trying to startup and create the oplog buffer collection.
auto coordinator = repl::ReplicationCoordinator::get(opCtx.get());
@@ -926,6 +928,7 @@ void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status opl
"error"_attr = oplogFetcherStatus);
_interrupt(oplogFetcherStatus, /*skipWaitingForForgetMigration=*/false);
}
+ _oplogFetcherStatus = oplogFetcherStatus;
}
void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint* fp) {
@@ -1250,7 +1253,8 @@ void TenantMigrationRecipientService::Instance::_interrupt(Status status,
}
}
- _taskState.setState(TaskState::kInterrupted, status);
+ _taskState.setState(
+ TaskState::kInterrupted, status, skipWaitingForForgetMigration /* isExternalInterrupt */);
}
void TenantMigrationRecipientService::Instance::interrupt(Status status) {
@@ -1352,7 +1356,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForMa
void TenantMigrationRecipientService::Instance::_fetchAndStoreDonorClusterTimeKeyDocs(
const CancelationToken& token) {
std::vector<ExternalKeysCollectionDocument> keyDocs;
-
auto cursor =
_client->query(NamespaceString::kKeysCollectionNamespace,
Query().readPref(_readPreference.pref, _readPreference.tags.getTagBSON()));
@@ -1411,179 +1414,257 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
"readPreference"_attr = _readPreference);
pauseBeforeRunTenantMigrationRecipientInstance.pauseWhileSet();
+ // The 'AsyncTry' is run on the cleanup executor as we rely on the 'CancelationToken' to signal
+ // when work should be canceled rather than letting the scoped executor be destroyed on
+ // shutdown/stepdown.
+ return AsyncTry([this, self = shared_from_this(), executor, token] {
+ return ExecutorFuture(**executor)
+ .then([this, self = shared_from_this()] {
+ stdx::lock_guard lk(_mutex);
+ // Instance task can be started only once for the current term on a primary.
+ invariant(!_taskState.isDone());
+ // If the task state is interrupted, then don't start the task.
+ if (_taskState.isInterrupted()) {
+ uassertStatusOK(_taskState.getInterruptStatus());
+ }
- return ExecutorFuture(**executor)
- .then([this, self = shared_from_this()] {
- stdx::lock_guard lk(_mutex);
- // Instance task can be started only once for the current term on a primary.
- invariant(!_taskState.isDone());
- // If the task state is interrupted, then don't start the task.
- if (_taskState.isInterrupted()) {
- uassertStatusOK(_taskState.getInterruptStatus());
- }
-
- _taskState.setState(TaskState::kRunning);
-
- pauseAfterRunTenantMigrationRecipientInstance.pauseWhileSet();
-
- return _initializeStateDoc(lk);
- })
- .then([this, self = shared_from_this()] {
- _stateDocPersistedPromise.emplaceValue();
- uassert(ErrorCodes::TenantMigrationForgotten,
- str::stream() << "Migration " << getMigrationUUID()
- << " already marked for garbage collect",
- !_stateDoc.getExpireAt());
- _stopOrHangOnFailPoint(&fpAfterPersistingTenantMigrationRecipientInstanceStateDoc);
- return _createAndConnectClients();
- })
- .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) {
- stdx::lock_guard lk(_mutex);
- if (_taskState.isInterrupted()) {
- uassertStatusOK(_taskState.getInterruptStatus());
- }
-
- // interrupt() called after this code block will interrupt the cloner and fetcher.
- _client = std::move(ConnectionPair.first);
- _oplogFetcherClient = std::move(ConnectionPair.second);
+ // The task state will already have been set to 'kRunning' if we restarted
+ // the future chain on donor failover.
+ if (!_taskState.isRunning()) {
+ _taskState.setState(TaskState::kRunning);
+ }
+ pauseAfterRunTenantMigrationRecipientInstance.pauseWhileSet();
+ return _initializeStateDoc(lk);
+ })
+ .then([this, self = shared_from_this()] {
+ if (_stateDocPersistedPromise.getFuture().isReady()) {
+ // This is a retry of the future chain.
+ auto opCtx = cc().makeOperationContext();
+ TenantMigrationRecipientDocument stateDoc;
+ {
+ stdx::lock_guard lk(_mutex);
+ _stateDoc.setNumRestartsDueToDonorConnectionFailure(
+ _stateDoc.getNumRestartsDueToDonorConnectionFailure() + 1);
+ stateDoc = _stateDoc;
+ }
+ uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(
+ opCtx.get(), _stateDoc));
+ } else {
+ // Avoid fulfilling the promise twice on restart of the future chain.
+ _stateDocPersistedPromise.emplaceValue();
+ }
+ uassert(ErrorCodes::TenantMigrationForgotten,
+ str::stream() << "Migration " << getMigrationUUID()
+ << " already marked for garbage collect",
+ !_stateDoc.getExpireAt());
+ _stopOrHangOnFailPoint(
+ &fpAfterPersistingTenantMigrationRecipientInstanceStateDoc);
+ return _createAndConnectClients();
+ })
+ .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) {
+ stdx::lock_guard lk(_mutex);
+ if (_taskState.isInterrupted()) {
+ uassertStatusOK(_taskState.getInterruptStatus());
+ }
- // Create the writer pool and shared data.
- _writerPool = makeTenantMigrationWriterPool();
- _sharedData = std::make_unique<TenantMigrationSharedData>(
- getGlobalServiceContext()->getFastClockSource(),
- getMigrationUUID(),
- _stateDoc.getStartFetchingDonorOpTime().has_value());
- })
- .then([this, self = shared_from_this(), token] {
- _fetchAndStoreDonorClusterTimeKeyDocs(token);
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance);
- stdx::lock_guard lk(_mutex);
+ // interrupt() called after this code block will interrupt the cloner and
+ // fetcher.
+ _client = std::move(ConnectionPair.first);
+ _oplogFetcherClient = std::move(ConnectionPair.second);
- // Record the FCV at the start of a migration and check for changes in every subsequent
- // attempt. Fail if there is any mismatch in FCV or upgrade/downgrade state.
- // (Generic FCV reference): This FCV check should exist across LTS binary versions.
- auto currentFCV = serverGlobalParams.featureCompatibility.getVersion();
- auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV();
+ if (!_writerPool) {
+ // Create the writer pool and shared data.
+ _writerPool = makeTenantMigrationWriterPool();
+ }
+ _sharedData = std::make_unique<TenantMigrationSharedData>(
+ getGlobalServiceContext()->getFastClockSource(),
+ getMigrationUUID(),
+ _stateDoc.getStartFetchingDonorOpTime().has_value());
+ })
+ .then([this, self = shared_from_this(), token] {
+ _fetchAndStoreDonorClusterTimeKeyDocs(token);
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance);
+ stdx::lock_guard lk(_mutex);
+
+ // Record the FCV at the start of a migration and check for changes in every
+ // subsequent attempt. Fail if there is any mismatch in FCV or
+ // upgrade/downgrade state. (Generic FCV reference): This FCV check should
+ // exist across LTS binary versions.
+ auto currentFCV = serverGlobalParams.featureCompatibility.getVersion();
+ auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV();
+
+ if (!startingFCV) {
+ _stateDoc.setRecipientPrimaryStartingFCV(currentFCV);
+ return _updateStateDocForMajority(lk);
+ }
- if (!startingFCV) {
- _stateDoc.setRecipientPrimaryStartingFCV(currentFCV);
- return _updateStateDocForMajority(lk);
- }
+ if (startingFCV != currentFCV) {
+ LOGV2_ERROR(5356200,
+ "FCV may not change during migration",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "startingFCV"_attr = startingFCV,
+ "currentFCV"_attr = currentFCV);
+ uasserted(5356201, "Detected FCV change from last migration attempt.");
+ }
- if (startingFCV != currentFCV) {
- LOGV2_ERROR(5356200,
- "FCV may not change during migration",
- "tenantId"_attr = getTenantId(),
- "migrationId"_attr = getMigrationUUID(),
- "startingFCV"_attr = startingFCV,
- "currentFCV"_attr = currentFCV);
- uasserted(5356201, "Detected FCV change from last migration attempt.");
- }
+ return SemiFuture<void>::makeReady();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV);
+ _compareRecipientAndDonorFCV();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpAfterComparingRecipientAndDonorFCV);
+ stdx::lock_guard lk(_mutex);
+ _getStartOpTimesFromDonor(lk);
+ return _updateStateDocForMajority(lk);
+ })
+ .then([this, self = shared_from_this()] {
+ _fetchRetryableWritesOplogBeforeStartOpTime();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(
+ &fpAfterRetrievingStartOpTimesMigrationRecipientInstance);
+ _startOplogFetcher();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(
+ &fpAfterStartingOplogFetcherMigrationRecipientInstance);
+
+ stdx::unique_lock lk(_mutex);
- return SemiFuture<void>::makeReady();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV);
- _compareRecipientAndDonorFCV();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterComparingRecipientAndDonorFCV);
- stdx::lock_guard lk(_mutex);
- _getStartOpTimesFromDonor(lk);
- return _updateStateDocForMajority(lk);
- })
- .then([this, self = shared_from_this()] { _fetchRetryableWritesOplogBeforeStartOpTime(); })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterRetrievingStartOpTimesMigrationRecipientInstance);
- _startOplogFetcher();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance);
+ {
+ // Throwing error when cloner is canceled externally via interrupt(),
+ // makes the instance to skip the remaining task (i.e., starting oplog
+ // applier) in the sync process. This step is necessary to prevent race
+ // between interrupt() and starting oplog applier for the failover
+ // scenarios where we don't start the cloner if the tenant data is
+ // already in consistent state.
+ stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData);
+ uassertStatusOK(_sharedData->getStatus(sharedDatalk));
+ }
+ // Create the oplog applier but do not start it yet.
+ invariant(_stateDoc.getStartApplyingDonorOpTime());
+
+ OpTime beginApplyingAfterOpTime;
+ bool isResuming = false;
+ if (_isCloneCompletedMarkerSet(lk)) {
+ const auto startApplyingDonorOpTime =
+ *_stateDoc.getStartApplyingDonorOpTime();
+ const auto cloneFinishedRecipientOptime =
+ *_stateDoc.getCloneFinishedRecipientOpTime();
+ lk.unlock();
+ // We avoid holding the mutex while scanning the local oplog which
+ // acquires the RSTL in IX mode. This is to allow us to be interruptable
+ // via a concurrent stepDown which acquires the RSTL in X mode.
+ beginApplyingAfterOpTime = _getOplogResumeApplyingDonorOptime(
+ startApplyingDonorOpTime, cloneFinishedRecipientOptime);
+ isResuming = beginApplyingAfterOpTime > startApplyingDonorOpTime;
+ lk.lock();
+ } else {
+ beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime();
+ }
+ LOGV2_DEBUG(4881202,
+ 1,
+ "Recipient migration service creating oplog applier",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime);
+ _tenantOplogApplier =
+ std::make_shared<TenantOplogApplier>(_migrationUuid,
+ _tenantId,
+ beginApplyingAfterOpTime,
+ _donorOplogBuffer.get(),
+ **_scopedExecutor,
+ _writerPool.get(),
+ isResuming);
+
+ // Start the cloner.
+ auto clonerFuture = _startTenantAllDatabaseCloner(lk);
+
+ // Signal that the data sync has started successfully.
+ if (!_dataSyncStartedPromise.getFuture().isReady()) {
+ _dataSyncStartedPromise.emplaceValue();
+ }
+ return clonerFuture;
+ })
+ .then([this, self = shared_from_this()] { return _onCloneSuccess(); })
+ .then([this, self = shared_from_this()] {
+ _fetchCommittedTransactionsBeforeStartOpTime();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpAfterCollectionClonerDone);
+ LOGV2_DEBUG(4881200,
+ 1,
+ "Recipient migration service starting oplog applier",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID());
+ {
+ stdx::lock_guard lk(_mutex);
+ uassertStatusOK(_tenantOplogApplier->startup());
+ }
+ _stopOrHangOnFailPoint(
+ &fpAfterStartingOplogApplierMigrationRecipientInstance);
+ return _getDataConsistentFuture();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise);
+ stdx::lock_guard lk(_mutex);
+ LOGV2_DEBUG(4881101,
+ 1,
+ "Tenant migration recipient instance is in consistent state",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = getTenantId(),
+ "donorConsistentOpTime"_attr =
+ _stateDoc.getDataConsistentStopDonorOpTime());
+
+ if (!_dataConsistentPromise.getFuture().isReady()) {
+ _dataConsistentPromise.emplaceValue(
+ _stateDoc.getDataConsistentStopDonorOpTime().get());
+ }
+ });
+ })
+ .until([this, self = shared_from_this()](Status status) {
stdx::unique_lock lk(_mutex);
-
- {
- // Throwing error when cloner is canceled externally via interrupt(), makes the
- // instance to skip the remaining task (i.e., starting oplog applier) in the
- // sync process. This step is necessary to prevent race between interrupt()
- // and starting oplog applier for the failover scenarios where we don't start
- // the cloner if the tenant data is already in consistent state.
- stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData);
- uassertStatusOK(_sharedData->getStatus(sharedDatalk));
+ if (_taskState.isInterrupted()) {
+ status = _taskState.getInterruptStatus();
}
-
- // Create the oplog applier but do not start it yet.
- invariant(_stateDoc.getStartApplyingDonorOpTime());
-
- OpTime beginApplyingAfterOpTime;
- bool isResuming = false;
- if (_isCloneCompletedMarkerSet(lk)) {
- const auto startApplyingDonorOpTime = *_stateDoc.getStartApplyingDonorOpTime();
- const auto cloneFinishedRecipientOptime =
- *_stateDoc.getCloneFinishedRecipientOpTime();
+ if (ErrorCodes::isRetriableError(status) && !_taskState.isExternalInterrupt() &&
+ _stateDocPersistedPromise.getFuture().isReady()) {
+ // Reset the task state and clear the interrupt status.
+ if (!_taskState.isRunning()) {
+ _taskState.setState(TaskState::kRunning);
+ }
+ _taskState.clearInterruptStatus();
+ // Clean up the async components before retrying the future chain.
+ _oplogFetcherStatus = boost::none;
+ std::unique_ptr<OplogFetcher> savedDonorOplogFetcher;
+ std::shared_ptr<TenantOplogApplier> savedTenantOplogApplier;
+
+ _cancelRemainingWork(lk);
+ shutdownTarget(lk, _donorOplogFetcher);
+
+ // Swap the oplog fetcher and applier to join() outside of '_mutex'.
+ using std::swap;
+ swap(savedDonorOplogFetcher, _donorOplogFetcher);
+ swap(savedTenantOplogApplier, _tenantOplogApplier);
lk.unlock();
- // We avoid holding the mutex while scanning the local oplog which acquires the RSTL
- // in IX mode. This is to allow us to be interruptable via a concurrent stepDown
- // which acquires the RSTL in X mode.
- beginApplyingAfterOpTime = _getOplogResumeApplyingDonorOptime(
- startApplyingDonorOpTime, cloneFinishedRecipientOptime);
- isResuming = beginApplyingAfterOpTime > startApplyingDonorOpTime;
- lk.lock();
- } else {
- beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime();
- }
- LOGV2_DEBUG(4881202,
- 1,
- "Recipient migration service creating oplog applier",
- "tenantId"_attr = getTenantId(),
- "migrationId"_attr = getMigrationUUID(),
- "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime);
- _tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid,
- _tenantId,
- beginApplyingAfterOpTime,
- _donorOplogBuffer.get(),
- **_scopedExecutor,
- _writerPool.get(),
- isResuming);
-
- // Start the cloner.
- auto clonerFuture = _startTenantAllDatabaseCloner(lk);
-
- // Signal that the data sync has started successfully.
- _dataSyncStartedPromise.emplaceValue();
- return clonerFuture;
- })
- .then([this, self = shared_from_this()] { return _onCloneSuccess(); })
- .then([this, self = shared_from_this()] { _fetchCommittedTransactionsBeforeStartOpTime(); })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterCollectionClonerDone);
- LOGV2_DEBUG(4881200,
- 1,
- "Recipient migration service starting oplog applier",
- "tenantId"_attr = getTenantId(),
- "migrationId"_attr = getMigrationUUID());
- {
- stdx::lock_guard lk(_mutex);
- uassertStatusOK(_tenantOplogApplier->startup());
- }
- _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance);
- return _getDataConsistentFuture();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise);
- stdx::lock_guard lk(_mutex);
- LOGV2_DEBUG(4881101,
- 1,
- "Tenant migration recipient instance is in consistent state",
- "migrationId"_attr = getMigrationUUID(),
- "tenantId"_attr = getTenantId(),
- "donorConsistentOpTime"_attr =
- _stateDoc.getDataConsistentStopDonorOpTime());
- _dataConsistentPromise.emplaceValue(_stateDoc.getDataConsistentStopDonorOpTime().get());
+ // Perform join outside the lock to avoid deadlocks.
+ joinTarget(savedDonorOplogFetcher);
+ joinTarget(savedTenantOplogApplier);
+ return false;
+ }
+ return true;
})
+ .withBackoffBetweenIterations(kExponentialBackoff)
+ .on(_recipientService->getInstanceCleanupExecutor(), token)
+ .semi()
+ .thenRunOn(**_scopedExecutor)
.then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance);
stdx::lock_guard lk(_mutex);
@@ -1595,8 +1676,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
.thenRunOn(_recipientService->getInstanceCleanupExecutor())
.onCompletion([this, self = shared_from_this()](
StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) {
- // On shutDown/stepDown, the _scopedExecutor may have already been shut down. So we need
- // to schedule the clean up work on the parent executor.
+ // On shutDown/stepDown, the _scopedExecutor may have already been shut down. So we
+ // need to schedule the clean up work on the parent executor.
// We don't need the final optime from the oplog applier. The data sync does not
// normally stop by itself on success. It completes only on errors or on external
@@ -1646,8 +1727,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
})
.thenRunOn(**_scopedExecutor)
.then([this, self = shared_from_this()] {
- // Schedule on the _scopedExecutor to make sure we are still the primary when waiting
- // for the recipientForgetMigration command.
+ // Schedule on the _scopedExecutor to make sure we are still the primary when
+ // waiting for the recipientForgetMigration command.
return _receivedRecipientForgetMigrationPromise.getFuture();
})
.then([this, self = shared_from_this()] { return _markStateDocAsGarbageCollectable(); })
@@ -1656,22 +1737,22 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
auto opCtx = cc().makeOperationContext();
auto storageInterface = StorageInterface::get(opCtx.get());
- // The oplog buffer collection can be safely dropped at this point. In case it doesn't
- // exist, dropping will be a no-op.
- // It isn't necessary that the drop is majority-committed. A new primary will attempt
- // to drop the collection anyway.
+ // The oplog buffer collection can be safely dropped at this point. In case it
+ // doesn't exist, dropping will be a no-op. It isn't necessary that the drop is
+ // majority-committed. A new primary will attempt to drop the collection anyway.
return storageInterface->dropCollection(opCtx.get(),
getOplogBufferNs(getMigrationUUID()));
})
.thenRunOn(_recipientService->getInstanceCleanupExecutor())
.onCompletion([this, self = shared_from_this()](Status status) {
- // Schedule on the parent executor to mark the completion of the whole chain so this is
- // safe even on shutDown/stepDown.
+ // Schedule on the parent executor to mark the completion of the whole chain so this
+ // is safe even on shutDown/stepDown.
stdx::lock_guard lk(_mutex);
invariant(_dataSyncCompletionPromise.getFuture().isReady());
if (status.isOK()) {
LOGV2(4881401,
- "Migration marked to be garbage collectable due to recipientForgetMigration "
+ "Migration marked to be garbage collectable due to "
+ "recipientForgetMigration "
"command",
"migrationId"_attr = getMigrationUUID(),
"tenantId"_attr = getTenantId(),