diff options
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_recipient_service.cpp')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 104 |
1 files changed, 94 insertions, 10 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index f15073da8e3..9b7842ac673 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -74,6 +74,8 @@ MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogFetcherMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout); MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(fpAfterCollectionClonerDone); +MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion); namespace { @@ -421,12 +423,14 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { options.peekCacheSize = static_cast<size_t>(tenantMigrationOplogBufferPeekCacheSize); options.dropCollectionAtStartup = false; options.dropCollectionAtShutdown = false; + options.useTemporaryCollection = false; NamespaceString oplogBufferNs(NamespaceString::kConfigDb, kOplogBufferPrefix + getMigrationUUID().toString()); stdx::lock_guard lk(_mutex); invariant(_stateDoc.getStartFetchingOpTime()); _donorOplogBuffer = std::make_unique<OplogBufferCollection>( StorageInterface::get(opCtx.get()), oplogBufferNs, options); + _donorOplogBuffer->startup(opCtx.get()); _dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>(); _donorOplogFetcher = (*_createOplogFetcherFn)( (**_scopedExecutor).get(), @@ -477,8 +481,27 @@ Status TenantMigrationRecipientService::Instance::_enqueueDocuments( } void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status oplogFetcherStatus) { - // TODO(SERVER-48812): Abort the migration unless the error is CallbackCanceled and - // the migration has finished. + // The oplog fetcher is normally canceled when migration is done; any other error + // indicates failure. + if (oplogFetcherStatus.isOK()) { + // Oplog fetcher status of "OK" means the stopReplProducer failpoint is set. Migration + // cannot continue in this state so force a failure. + LOGV2_ERROR( + 4881205, + "Recipient migration service oplog fetcher stopped due to stopReplProducer failpoint", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID()); + interrupt({ErrorCodes::Error(4881206), + "Recipient migration service oplog fetcher stopped due to stopReplProducer " + "failpoint"}); + } else if (oplogFetcherStatus.code() != ErrorCodes::CallbackCanceled) { + LOGV2_ERROR(4881204, + "Recipient migration service oplog fetcher failed", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "error"_attr = oplogFetcherStatus); + interrupt(oplogFetcherStatus); + } } namespace { @@ -608,6 +631,15 @@ void TenantMigrationRecipientService::Instance::_shutdownComponents(WithLock lk) // interrupts running tenant oplog fetcher. _oplogFetcherClient->shutdownAndDisallowReconnect(); } + + if (_tenantOplogApplier) { + _tenantOplogApplier->shutdown(); + } + + if (_donorOplogBuffer) { + auto opCtx = cc().makeOperationContext(); + _donorOplogBuffer->shutdown(opCtx.get()); + } } void TenantMigrationRecipientService::Instance::interrupt(Status status) { @@ -637,6 +669,12 @@ void TenantMigrationRecipientService::Instance::_cleanupOnTaskCompletion(Status stdx::lock_guard lk(_mutex); _shutdownComponents(lk); + + if (_donorOplogFetcher) { + _donorOplogFetcher->shutdown(); + _donorOplogFetcher->join(); + } + if (_writerPool) { _writerPool->join(); } @@ -746,8 +784,22 @@ void TenantMigrationRecipientService::Instance::run( _stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance); stdx::lock_guard lk(_mutex); - // Create the oplog applier and do not start now. - // TODO SERVER-48812: Oplog application in MigrationServiceInstance. + // Create the oplog applier but do not start it yet. + invariant(_stateDoc.getStartApplyingOpTime()); + LOGV2_DEBUG(4881202, + 1, + "Recipient migration service creating oplog applier", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "startApplyingOpTime"_attr = *_stateDoc.getStartApplyingOpTime()); + + _tenantOplogApplier = + std::make_unique<TenantOplogApplier>(_migrationUuid, + _tenantId, + *_stateDoc.getStartApplyingOpTime(), + _donorOplogBuffer.get(), + **_scopedExecutor, + _writerPool.get()); // Start the cloner. auto clonerFuture = _startTenantAllDatabaseCloner(lk); @@ -759,10 +811,16 @@ void TenantMigrationRecipientService::Instance::run( .then([this] { return _onCloneSuccess(); }) .then([this] { _stopOrHangOnFailPoint(&fpAfterCollectionClonerDone); - // Start the oplog applier. - // TODO SERVER-48812: Oplog application in MigrationServiceInstance + LOGV2_DEBUG(4881200, + 1, + "Recipient migration service starting oplog applier", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID()); + + uassertStatusOK(_tenantOplogApplier->startup()); + _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance); + return _getDataConsistentFuture(); }) - .then([this] { return _getDataConsistentFuture(); }) .then([this] { stdx::lock_guard lk(_mutex); LOGV2_DEBUG(4881101, @@ -775,10 +833,36 @@ void TenantMigrationRecipientService::Instance::run( _dataConsistentPromise.emplaceValue(_stateDoc.getDataConsistentStopOpTime().get()); }) .then([this] { - // wait for the oplog applier to complete/stop. - // TODO SERVER-48812: Oplog application in MigrationServiceInstance + _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance); + // wait for oplog applier to complete/stop. + // The oplog applier does not exit normally; it must be shut down externally, + // e.g. by recipientForgetMigration. + return _tenantOplogApplier->getNotificationForOpTime(OpTime::max()); }) - .getAsync([this](Status status) { + .getAsync([this](StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) { + // We don't need the final optime from the oplog applier. + Status status = applierStatus.getStatus(); + { + // If we were interrupted during oplog application, replace oplog application + // status with error state. + stdx::lock_guard lk(_mutex); + if ((status.isOK() || ErrorCodes::isCancelationError(status)) && + _taskState.isInterrupted()) { + // We get an "OK" result when the stopReplProducer failpoint is set. This also + // cancels the migration. We will have already logged this in + // _oplogFetcherCallback() + if (!status.isOK()) { + LOGV2(4881207, + "Migration completed with both error and interrupt", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "completionStatus"_attr = status, + "interruptStatus"_attr = _taskState.getInterruptStatus()); + } + status = _taskState.getInterruptStatus(); + } + } + LOGV2(4878501, "Tenant migration recipient instance: Data sync completed.", "tenantId"_attr = getTenantId(), |