summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_recipient_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_recipient_service.cpp')
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp104
1 files changed, 94 insertions, 10 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index f15073da8e3..9b7842ac673 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -74,6 +74,8 @@ MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogFetcherMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout);
MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(fpAfterCollectionClonerDone);
+MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance);
+MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance);
MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion);
namespace {
@@ -421,12 +423,14 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
options.peekCacheSize = static_cast<size_t>(tenantMigrationOplogBufferPeekCacheSize);
options.dropCollectionAtStartup = false;
options.dropCollectionAtShutdown = false;
+ options.useTemporaryCollection = false;
NamespaceString oplogBufferNs(NamespaceString::kConfigDb,
kOplogBufferPrefix + getMigrationUUID().toString());
stdx::lock_guard lk(_mutex);
invariant(_stateDoc.getStartFetchingOpTime());
_donorOplogBuffer = std::make_unique<OplogBufferCollection>(
StorageInterface::get(opCtx.get()), oplogBufferNs, options);
+ _donorOplogBuffer->startup(opCtx.get());
_dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>();
_donorOplogFetcher = (*_createOplogFetcherFn)(
(**_scopedExecutor).get(),
@@ -477,8 +481,27 @@ Status TenantMigrationRecipientService::Instance::_enqueueDocuments(
}
void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status oplogFetcherStatus) {
- // TODO(SERVER-48812): Abort the migration unless the error is CallbackCanceled and
- // the migration has finished.
+ // The oplog fetcher is normally canceled when migration is done; any other error
+ // indicates failure.
+ if (oplogFetcherStatus.isOK()) {
+ // Oplog fetcher status of "OK" means the stopReplProducer failpoint is set. Migration
+ // cannot continue in this state so force a failure.
+ LOGV2_ERROR(
+ 4881205,
+ "Recipient migration service oplog fetcher stopped due to stopReplProducer failpoint",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID());
+ interrupt({ErrorCodes::Error(4881206),
+ "Recipient migration service oplog fetcher stopped due to stopReplProducer "
+ "failpoint"});
+ } else if (oplogFetcherStatus.code() != ErrorCodes::CallbackCanceled) {
+ LOGV2_ERROR(4881204,
+ "Recipient migration service oplog fetcher failed",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "error"_attr = oplogFetcherStatus);
+ interrupt(oplogFetcherStatus);
+ }
}
namespace {
@@ -608,6 +631,15 @@ void TenantMigrationRecipientService::Instance::_shutdownComponents(WithLock lk)
// interrupts running tenant oplog fetcher.
_oplogFetcherClient->shutdownAndDisallowReconnect();
}
+
+ if (_tenantOplogApplier) {
+ _tenantOplogApplier->shutdown();
+ }
+
+ if (_donorOplogBuffer) {
+ auto opCtx = cc().makeOperationContext();
+ _donorOplogBuffer->shutdown(opCtx.get());
+ }
}
void TenantMigrationRecipientService::Instance::interrupt(Status status) {
@@ -637,6 +669,12 @@ void TenantMigrationRecipientService::Instance::_cleanupOnTaskCompletion(Status
stdx::lock_guard lk(_mutex);
_shutdownComponents(lk);
+
+ if (_donorOplogFetcher) {
+ _donorOplogFetcher->shutdown();
+ _donorOplogFetcher->join();
+ }
+
if (_writerPool) {
_writerPool->join();
}
@@ -746,8 +784,22 @@ void TenantMigrationRecipientService::Instance::run(
_stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance);
stdx::lock_guard lk(_mutex);
- // Create the oplog applier and do not start now.
- // TODO SERVER-48812: Oplog application in MigrationServiceInstance.
+ // Create the oplog applier but do not start it yet.
+ invariant(_stateDoc.getStartApplyingOpTime());
+ LOGV2_DEBUG(4881202,
+ 1,
+ "Recipient migration service creating oplog applier",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "startApplyingOpTime"_attr = *_stateDoc.getStartApplyingOpTime());
+
+ _tenantOplogApplier =
+ std::make_unique<TenantOplogApplier>(_migrationUuid,
+ _tenantId,
+ *_stateDoc.getStartApplyingOpTime(),
+ _donorOplogBuffer.get(),
+ **_scopedExecutor,
+ _writerPool.get());
// Start the cloner.
auto clonerFuture = _startTenantAllDatabaseCloner(lk);
@@ -759,10 +811,16 @@ void TenantMigrationRecipientService::Instance::run(
.then([this] { return _onCloneSuccess(); })
.then([this] {
_stopOrHangOnFailPoint(&fpAfterCollectionClonerDone);
- // Start the oplog applier.
- // TODO SERVER-48812: Oplog application in MigrationServiceInstance
+ LOGV2_DEBUG(4881200,
+ 1,
+ "Recipient migration service starting oplog applier",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID());
+
+ uassertStatusOK(_tenantOplogApplier->startup());
+ _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance);
+ return _getDataConsistentFuture();
})
- .then([this] { return _getDataConsistentFuture(); })
.then([this] {
stdx::lock_guard lk(_mutex);
LOGV2_DEBUG(4881101,
@@ -775,10 +833,36 @@ void TenantMigrationRecipientService::Instance::run(
_dataConsistentPromise.emplaceValue(_stateDoc.getDataConsistentStopOpTime().get());
})
.then([this] {
- // wait for the oplog applier to complete/stop.
- // TODO SERVER-48812: Oplog application in MigrationServiceInstance
+ _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance);
+ // wait for oplog applier to complete/stop.
+ // The oplog applier does not exit normally; it must be shut down externally,
+ // e.g. by recipientForgetMigration.
+ return _tenantOplogApplier->getNotificationForOpTime(OpTime::max());
})
- .getAsync([this](Status status) {
+ .getAsync([this](StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) {
+ // We don't need the final optime from the oplog applier.
+ Status status = applierStatus.getStatus();
+ {
+ // If we were interrupted during oplog application, replace oplog application
+ // status with error state.
+ stdx::lock_guard lk(_mutex);
+ if ((status.isOK() || ErrorCodes::isCancelationError(status)) &&
+ _taskState.isInterrupted()) {
+ // We get an "OK" result when the stopReplProducer failpoint is set. This also
+ // cancels the migration. We will have already logged this in
+ // _oplogFetcherCallback()
+ if (!status.isOK()) {
+ LOGV2(4881207,
+ "Migration completed with both error and interrupt",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "completionStatus"_attr = status,
+ "interruptStatus"_attr = _taskState.getInterruptStatus());
+ }
+ status = _taskState.getInterruptStatus();
+ }
+ }
+
LOGV2(4878501,
"Tenant migration recipient instance: Data sync completed.",
"tenantId"_attr = getTenantId(),