diff options
-rw-r--r-- | buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml | 2 | ||||
-rw-r--r-- | jstests/replsets/tenant_migration_donor_state_machine.js | 4 | ||||
-rw-r--r-- | src/mongo/base/error_codes.yml | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/tenant_migration_recipient_cmds.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/commands/tenant_migration_recipient_cmds.idl | 57 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 300 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.h | 54 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 455 | ||||
-rw-r--r-- | src/mongo/util/uuid.h | 1 |
11 files changed, 783 insertions, 141 deletions
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml index 4913806a87d..7e5cd979b0f 100644 --- a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml @@ -121,7 +121,7 @@ executor: mode: alwaysOn data: blockTimeMS: 250 - # TODO SERVER-48814: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. + # TODO SERVER-53312: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. failpoint.returnResponseOkForRecipientSyncDataCmd: mode: alwaysOn # Set the delay before migration state machine is garbage collected to be short to avoid diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js index 72a37ecb75b..4f172b7dc1b 100644 --- a/jstests/replsets/tenant_migration_donor_state_machine.js +++ b/jstests/replsets/tenant_migration_donor_state_machine.js @@ -74,9 +74,7 @@ const donorRst = new ReplSetTest({ donorRst.startSet(); donorRst.initiate(); -// TODO SERVER-48814: Remove 'enableRecipientTesting: false'. -const tenantMigrationTest = - new TenantMigrationTest({name: jsTestName(), enableRecipientTesting: false, donorRst}); +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst}); if (!tenantMigrationTest.isFeatureFlagEnabled()) { jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); donorRst.stopSet(); diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 9772d960b8f..c1d19fb1877 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -411,6 +411,8 @@ error_codes: - {code: 333, name: ServiceExecutorInShutdown, categories: [ShutdownError,CancelationError,InternalOnly]} - {code: 334, name: MechanismUnavailable} + - {code: 335, name: TenantMigrationForgotten} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp index 26c47c4328b..01728458942 100644 --- a/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp +++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp @@ -146,6 +146,27 @@ public: "recipientForgetMigration command not enabled", repl::feature_flags::gTenantMigrations.isEnabled( serverGlobalParams.featureCompatibility)); + const auto& cmd = request(); + + auto recipientService = + repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName(repl::TenantMigrationRecipientService:: + kTenantMigrationRecipientServiceName); + + // We may not have a document if recipientForgetMigration is received before + // recipientSyncData. But even if that's the case, we still need to create an instance + // and persist a state document that's marked garbage collectable (which is done by the + // main chain). + TenantMigrationRecipientDocument stateDoc(cmd.getMigrationId(), + cmd.getDonorConnectionString().toString(), + cmd.getTenantId().toString(), + cmd.getReadPreference()); + auto recipientInstance = repl::TenantMigrationRecipientService::Instance::getOrCreate( + opCtx, recipientService, stateDoc.toBSON()); + + // Instruct the instance run() function to mark this migration garbage collectable. + recipientInstance->onReceiveRecipientForgetMigration(opCtx); + recipientInstance->getCompletionFuture().get(opCtx); } void doCheckAuthorization(OperationContext* opCtx) const {} diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl index 35160bcf881..0d3d53c7c04 100644 --- a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl +++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl @@ -48,34 +48,42 @@ structs: type: optime description: "Majority applied donor optime by the recipient" + MigrationRecipientCommonData: + description: "Command data for tenant migration recipient commands." + strict: true + fields: + migrationId: + description: "Unique identifier for the tenant migration." + type: uuid + donorConnectionString: + description: >- + The URI string that the recipient will utilize to create a connection with the + donor. + type: string + validator: + callback: "validateConnectionString" + tenantId: + description: >- + The prefix from which the migrating database will be matched. The prefixes 'admin', + 'local', 'config', the empty string, are not allowed. + type: string + validator: + callback: "validateDatabasePrefix" + readPreference: + description: >- + The read preference settings that the donor will pass on to the recipient. + type: readPreference + commands: recipientSyncData: description: "Parser for the 'recipientSyncData' command." command_name: recipientSyncData strict: true namespace: ignored + inline_chained_structs: true + chained_structs: + MigrationRecipientCommonData: MigrationRecipientCommonData fields: - migrationId: - description: "Unique identifier for the tenant migration." - type: uuid - donorConnectionString: - description: >- - The URI string that the recipient will utilize to create a connection with the - donor. - type: string - validator: - callback: "validateConnectionString" - tenantId: - description: >- - The prefix from which the migrating database will be matched. The prefixes 'admin', - 'local', 'config', the empty string, are not allowed. - type: string - validator: - callback: "validateDatabasePrefix" - readPreference: - description: >- - The read preference settings that the donor will pass on to the recipient. - type: readPreference returnAfterReachingDonorTimestamp: description: >- If provided, the recipient should return after syncing up to this donor timestamp. @@ -90,7 +98,6 @@ commands: command_name: recipientForgetMigration strict: true namespace: ignored - fields: - migrationId: - description: "Unique identifier for the tenant migration." - type: uuid + inline_chained_structs: true + chained_structs: + MigrationRecipientCommonData: MigrationRecipientCommonData diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 3d2a8244b29..6459f563fb7 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1304,6 +1304,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/client/read_preference', + '$BUILD_DIR/mongo/util/future_util', 'primary_only_service', 'tenant_migration_recipient_utils', 'wait_for_majority_service', diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 4acfbc350a5..b55f7b0f7be 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -460,10 +460,12 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa BSONObj cmdObj = BSONObj([&]() { auto donorConnString = repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(); - RecipientSyncData request(_stateDoc.getId(), - donorConnString.toString(), - _stateDoc.getTenantId().toString(), - _stateDoc.getReadPreference()); + RecipientSyncData request; + request.setDbName(NamespaceString::kAdminDb); + request.setMigrationRecipientCommonData({_stateDoc.getId(), + donorConnString.toString(), + _stateDoc.getTenantId().toString(), + _stateDoc.getReadPreference()}); request.setReturnAfterReachingDonorTimestamp(_stateDoc.getBlockTimestamp()); return request.toBSON(BSONObj()); }()); @@ -475,10 +477,19 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, const CancelationToken& token) { - return _sendCommandToRecipient(executor, - recipientTargeterRS, - RecipientForgetMigration(_stateDoc.getId()).toBSON(BSONObj()), - token); + + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + auto donorConnString = + repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(); + RecipientForgetMigration request; + request.setDbName(NamespaceString::kAdminDb); + request.setMigrationRecipientCommonData({_stateDoc.getId(), + donorConnString.toString(), + _stateDoc.getTenantId().toString(), + _stateDoc.getReadPreference()}); + return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()), token); } SemiFuture<void> TenantMigrationDonorService::Instance::run( diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 580d52a916b..30309958a6a 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -56,6 +56,7 @@ #include "mongo/db/write_concern_options.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/util/future_util.h" namespace mongo { namespace repl { @@ -65,6 +66,8 @@ constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd; // A convenient place to set test-specific parameters. MONGO_FAIL_POINT_DEFINE(pauseBeforeRunTenantMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(pauseAfterRunTenantMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(autoRecipientForgetMigration); // Fails before waiting for the state doc to be majority replicated. MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc); @@ -78,6 +81,7 @@ MONGO_FAIL_POINT_DEFINE(fpAfterCollectionClonerDone); MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion); +MONGO_FAIL_POINT_DEFINE(fpAfterReceivingRecipientForgetMigration); namespace { // We never restart just the oplog fetcher. If a failure occurs, we restart the whole state machine @@ -186,7 +190,7 @@ boost::optional<BSONObj> TenantMigrationRecipientService::Instance::reportForCur BSONObjBuilder bob; bob.append("desc", "tenant recipient migration"); - bob.append("migrationCompleted", _completionPromise.getFuture().isReady()); + bob.append("migrationCompleted", _taskCompletionPromise.getFuture().isReady()); bob.append("instanceID", _stateDoc.getId().toBSON()); bob.append("tenantId", _stateDoc.getTenantId()); bob.append("readPreference", _stateDoc.getReadPreference().toInnerBSON()); @@ -229,11 +233,10 @@ OpTime TenantMigrationRecipientService::Instance::waitUntilTimestampIsMajorityCo auto getWaitOpTimeFuture = [&]() { stdx::lock_guard lk(_mutex); - if (_taskState.isDone()) { - // When task state is done, we reset _tenantOplogApplier, so just throw the - // task completion future result. - invariant(getCompletionFuture().isReady()); - getCompletionFuture().get(opCtx); + if (_dataSyncCompletionPromise.getFuture().isReady()) { + // When the data sync is done, we reset _tenantOplogApplier, so just throw the data sync + // completion future result. + _dataSyncCompletionPromise.getFuture().get(); MONGO_UNREACHABLE; } @@ -339,7 +342,7 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() { return _donorReplicaSetMonitor->getHostOrRefresh(_readPreference, getHostCancelSource.token()) .thenRunOn(**_scopedExecutor) - .then([this](const HostAndPort& serverAddress) { + .then([this, self = shared_from_this()](const HostAndPort& serverAddress) { // Application name is constructed such that it doesn't exceeds // kMaxApplicationNameByteLength (128 bytes). // "TenantMigration_" (16 bytes) + <tenantId> (61 bytes) + "_" (1 byte) + @@ -359,19 +362,20 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() { auto oplogFetcherClient = _connectAndAuth(serverAddress, applicationName, _authParams); return ConnectionPair(std::move(client), std::move(oplogFetcherClient)); }) - .onError([this](const Status& status) -> SemiFuture<ConnectionPair> { - LOGV2_ERROR(4880404, - "Connecting to donor failed", - "tenantId"_attr = getTenantId(), - "migrationId"_attr = getMigrationUUID(), - "error"_attr = status); - - // Make sure we don't end up with a partially initialized set of connections. - stdx::lock_guard lk(_mutex); - _client = nullptr; - _oplogFetcherClient = nullptr; - return status; - }) + .onError( + [this, self = shared_from_this()](const Status& status) -> SemiFuture<ConnectionPair> { + LOGV2_ERROR(4880404, + "Connecting to donor failed", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "error"_attr = status); + + // Make sure we don't end up with a partially initialized set of connections. + stdx::lock_guard lk(_mutex); + _client = nullptr; + _oplogFetcherClient = nullptr; + return status; + }) .semi(); } @@ -523,12 +527,12 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { ReplicationProcess::kUninitializedRollbackId, false /* requireFresherSyncSource */, _dataReplicatorExternalState.get(), - [this](OplogFetcher::Documents::const_iterator first, - OplogFetcher::Documents::const_iterator last, - const OplogFetcher::DocumentsInfo& info) { + [this, self = shared_from_this()](OplogFetcher::Documents::const_iterator first, + OplogFetcher::Documents::const_iterator last, + const OplogFetcher::DocumentsInfo& info) { return _enqueueDocuments(first, last, info); }, - [this](const Status& s, int rbid) { _oplogFetcherCallback(s); }, + [this, self = shared_from_this()](const Status& s, int rbid) { _oplogFetcherCallback(s); }, tenantMigrationOplogFetcherBatchSize, OplogFetcher::StartingPoint::kEnqueueFirstDoc, _getOplogFetcherFilter(), @@ -597,16 +601,17 @@ void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status opl "Recipient migration service oplog fetcher stopped due to stopReplProducer failpoint", "tenantId"_attr = getTenantId(), "migrationId"_attr = getMigrationUUID()); - interrupt({ErrorCodes::Error(4881206), - "Recipient migration service oplog fetcher stopped due to stopReplProducer " - "failpoint"}); + _interrupt({ErrorCodes::Error(4881206), + "Recipient migration service oplog fetcher stopped due to stopReplProducer " + "failpoint"}, + /*skipWaitingForForgetMigration=*/false); } else if (oplogFetcherStatus.code() != ErrorCodes::CallbackCanceled) { LOGV2_ERROR(4881204, "Recipient migration service oplog fetcher failed", "tenantId"_attr = getTenantId(), "migrationId"_attr = getMigrationUUID(), "error"_attr = oplogFetcherStatus); - interrupt(oplogFetcherStatus); + _interrupt(oplogFetcherStatus, /*skipWaitingForForgetMigration=*/false); } } @@ -707,18 +712,19 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFu return _tenantOplogApplier ->getNotificationForOpTime(_stateDoc.getDataConsistentStopDonorOpTime().get()) .thenRunOn(**_scopedExecutor) - .then([this](TenantOplogApplier::OpTimePair donorRecipientOpTime) { - auto opCtx = cc().makeOperationContext(); + .then( + [this, self = shared_from_this()](TenantOplogApplier::OpTimePair donorRecipientOpTime) { + auto opCtx = cc().makeOperationContext(); - stdx::lock_guard lk(_mutex); - // Persist the state that tenant migration instance has reached - // consistent state. - _stateDoc.setState(TenantMigrationRecipientStateEnum::kConsistent); - uassertStatusOK( - tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc)); - return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp()); - }) + stdx::lock_guard lk(_mutex); + // Persist the state that tenant migration instance has reached + // consistent state. + _stateDoc.setState(TenantMigrationRecipientStateEnum::kConsistent); + uassertStatusOK( + tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc)); + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp()); + }) .semi(); } @@ -752,8 +758,64 @@ void setPromiseErrorifNotReady(WithLock lk, Promise& promise, Status status) { promise.setError(status); } + +template <class Promise> +void setPromiseOkifNotReady(WithLock lk, Promise& promise) { + if (promise.getFuture().isReady()) { + return; + } + + promise.emplaceValue(); +} + } // namespace +SemiFuture<void> +TenantMigrationRecipientService::Instance::_markStateDocumentAsGarbageCollectable() { + _stopOrHangOnFailPoint(&fpAfterReceivingRecipientForgetMigration); + + // Throws if we have failed to persist the state doc at the first place. This can only happen in + // unittests where we enable the autoRecipientForgetMigration failpoint. Otherwise, + // recipientForgetMigration will always wait for the state doc to be persisted first and thus + // this will only be called with _stateDocPersistedPromise resolved OK. + invariant(_stateDocPersistedPromise.getFuture().isReady()); + _stateDocPersistedPromise.getFuture().get(); + + stdx::lock_guard lk(_mutex); + + if (_stateDoc.getExpireAt()) { + // Nothing to do if the state doc already has the expireAt set. + return SemiFuture<void>::makeReady(); + } + + auto uniqueOpCtx = cc().makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + _stateDoc.setState(TenantMigrationRecipientStateEnum::kDone); + _stateDoc.setExpireAt(opCtx->getServiceContext()->getFastClockSource()->now() + + Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()}); + + auto status = [&]() { + try { + // Update the state doc with the expireAt set. + return tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc); + } catch (DBException& ex) { + return ex.toStatus(); + } + }(); + if (!status.isOK()) { + // We assume that we only fail with shutDown/stepDown errors (i.e. for failovers). + // Otherwise, the whole chain would stop running without marking the state doc garbage + // collectable while we are still the primary. + invariant(ErrorCodes::isShutdownError(status) || ErrorCodes::isNotPrimaryError(status)); + uassertStatusOK(status); + } + + auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(writeOpTime) + .semi(); +} + void TenantMigrationRecipientService::Instance::_cancelRemainingWork(WithLock lk) { if (_sharedData) { stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); @@ -777,11 +839,19 @@ void TenantMigrationRecipientService::Instance::_cancelRemainingWork(WithLock lk shutdownTarget(lk, _writerPool); } -void TenantMigrationRecipientService::Instance::interrupt(Status status) { +void TenantMigrationRecipientService::Instance::_interrupt(Status status, + bool skipWaitingForForgetMigration) { invariant(!status.isOK()); stdx::lock_guard lk(_mutex); + if (skipWaitingForForgetMigration) { + // We only get here on receiving the recipientForgetMigration command or on + // stepDown/shutDown. On receiving the recipientForgetMigration, the promise should have + // already been set. + setPromiseErrorifNotReady(lk, _receivedRecipientForgetMigrationPromise, status); + } + if (_taskState.isInterrupted() || _taskState.isDone()) { // nothing to do. return; @@ -792,15 +862,54 @@ void TenantMigrationRecipientService::Instance::interrupt(Status status) { // If the task is running, then setting promise result will be taken care by the main task // continuation chain. if (_taskState.isNotStarted()) { + invariant(skipWaitingForForgetMigration); + _stateDocPersistedPromise.setError(status); _dataSyncStartedPromise.setError(status); _dataConsistentPromise.setError(status); - _completionPromise.setError(status); + _dataSyncCompletionPromise.setError(status); + + // The interrupt() is called before the instance is scheduled to run. If the state doc has + // already been marked garbage collectable, resolve the completion promise with OK. + if (_stateDoc.getExpireAt()) { + _taskCompletionPromise.emplaceValue(); + } else { + _taskCompletionPromise.setError(status); + } } _taskState.setState(TaskState::kInterrupted, status); } -void TenantMigrationRecipientService::Instance::_cleanupOnTaskCompletion(Status status) { +void TenantMigrationRecipientService::Instance::interrupt(Status status) { + _interrupt(status, /*skipWaitingForForgetMigration=*/true); +} + +void TenantMigrationRecipientService::Instance::onReceiveRecipientForgetMigration( + OperationContext* opCtx) { + LOGV2(4881400, + "Forgetting migration due to recipientForgetMigration command", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = getTenantId()); + + // Wait until the state doc is initialized and persisted. + _stateDocPersistedPromise.getFuture().get(opCtx); + + { + stdx::lock_guard lk(_mutex); + if (_receivedRecipientForgetMigrationPromise.getFuture().isReady()) { + return; + } + _receivedRecipientForgetMigrationPromise.emplaceValue(); + } + + // Interrupt the chain to mark the state doc garbage collectable. + _interrupt(Status(ErrorCodes::TenantMigrationForgotten, + str::stream() << "recipientForgetMigration received for migration " + << getMigrationUUID()), + /*skipWaitingForForgetMigration=*/false); +} + +void TenantMigrationRecipientService::Instance::_cleanupOnDataSyncCompletion(Status status) { auto opCtx = cc().makeOperationContext(); std::unique_ptr<OplogFetcher> savedDonorOplogFetcher; @@ -814,19 +923,11 @@ void TenantMigrationRecipientService::Instance::_cleanupOnTaskCompletion(Status shutdownTarget(lk, _donorOplogFetcher); shutdownTargetWithOpCtx(lk, _donorOplogBuffer, opCtx.get()); - if (status.isOK()) { - // All intermediary promise should have been fulfilled already. - invariant(_dataSyncStartedPromise.getFuture().isReady() && - _dataConsistentPromise.getFuture().isReady()); - _completionPromise.emplaceValue(); - } - invariant(!status.isOK()); + setPromiseErrorifNotReady(lk, _stateDocPersistedPromise, status); setPromiseErrorifNotReady(lk, _dataSyncStartedPromise, status); setPromiseErrorifNotReady(lk, _dataConsistentPromise, status); - setPromiseErrorifNotReady(lk, _completionPromise, status); - - _taskState.setState(TaskState::kDone); + setPromiseErrorifNotReady(lk, _dataSyncCompletionPromise, status); // Save them to join() with it outside of _mutex. using std::swap; @@ -837,7 +938,7 @@ void TenantMigrationRecipientService::Instance::_cleanupOnTaskCompletion(Status // Perform join outside the lock to avoid deadlocks. joinTarget(savedDonorOplogFetcher); - joinTarget(savedDonorOplogFetcher); + joinTarget(savedTenantOplogApplier); joinTarget(savedWriterPool); } @@ -870,7 +971,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( "readPreference"_attr = _readPreference); return ExecutorFuture(**executor) - .then([this] { + .then([this, self = shared_from_this()] { stdx::lock_guard lk(_mutex); // Instance task can be started only once for the current term on a primary. invariant(!_taskState.isDone()); @@ -881,20 +982,27 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( _taskState.setState(TaskState::kRunning); + pauseAfterRunTenantMigrationRecipientInstance.pauseWhileSet(); + + uassert(ErrorCodes::TenantMigrationForgotten, + str::stream() << "Migration " << getMigrationUUID() + << " already marked for garbage collect", + !_stateDoc.getExpireAt()); + return _initializeStateDoc(lk); }) - .then([this] { + .then([this, self = shared_from_this()] { + _stateDocPersistedPromise.emplaceValue(); _stopOrHangOnFailPoint(&fpAfterPersistingTenantMigrationRecipientInstanceStateDoc); return _createAndConnectClients(); }) - .then([this](ConnectionPair ConnectionPair) { + .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) { stdx::lock_guard lk(_mutex); if (_taskState.isInterrupted()) { uassertStatusOK(_taskState.getInterruptStatus()); } - // interrupt() called after this code block will interrupt the cloner, oplog applier and - // fetcher. + // interrupt() called after this code block will interrupt the cloner and fetcher. _client = std::move(ConnectionPair.first); _oplogFetcherClient = std::move(ConnectionPair.second); @@ -903,17 +1011,9 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( _sharedData = std::make_unique<TenantMigrationSharedData>( getGlobalServiceContext()->getFastClockSource(), getMigrationUUID()); }) - .then([this] { + .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance); stdx::lock_guard lk(_mutex); - // The instance is marked as garbage collect if the migration is either - // committed or aborted on donor side. So, don't start the recipient task if the - // instance state doc is marked for garbage collect. - uassert(ErrorCodes::IllegalOperation, - str::stream() << "Can't start the data sync as the state doc is already marked " - "for garbage collect for migration uuid: " - << getMigrationUUID(), - !_stateDoc.getExpireAt()); _getStartOpTimesFromDonor(lk); auto opCtx = cc().makeOperationContext(); uassertStatusOK( @@ -921,14 +1021,15 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( return WaitForMajorityService::get(opCtx->getServiceContext()) .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp()); }) - .then([this] { + .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterRetrievingStartOpTimesMigrationRecipientInstance); _startOplogFetcher(); }) - .then([this] { + .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance); stdx::lock_guard lk(_mutex); + { // Throwing error when cloner is canceled externally via interrupt(), makes the // instance to skip the remaining task (i.e., starting oplog applier) in the @@ -963,8 +1064,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( _dataSyncStartedPromise.emplaceValue(); return clonerFuture; }) - .then([this] { return _onCloneSuccess(); }) - .then([this] { + .then([this, self = shared_from_this()] { return _onCloneSuccess(); }) + .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterCollectionClonerDone); LOGV2_DEBUG(4881200, 1, @@ -978,7 +1079,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance); return _getDataConsistentFuture(); }) - .then([this] { + .then([this, self = shared_from_this()] { stdx::lock_guard lk(_mutex); LOGV2_DEBUG(4881101, 1, @@ -990,7 +1091,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( _dataConsistentPromise.emplaceValue(_stateDoc.getDataConsistentStopDonorOpTime().get()); }) - .then([this] { + .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance); stdx::lock_guard lk(_mutex); // wait for oplog applier to complete/stop. @@ -999,8 +1100,14 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( return _tenantOplogApplier->getNotificationForOpTime(OpTime::max()); }) .thenRunOn(_recipientService->getInstanceCleanupExecutor()) - .onCompletion([this](StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) { - // We don't need the final optime from the oplog applier. + .onCompletion([this, self = shared_from_this()]( + StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) { + // On shutDown/stepDown, the _scopedExecutor may have already been shut down. So we need + // to schedule the clean up work on the parent executor. + + // We don't need the final optime from the oplog applier. The data sync does not + // normally stop by itself on success. It completes only on errors or on external + // interruption (e.g. by shutDown/stepDown or by recipientForgetMigration command). Status status = applierStatus.getStatus(); { // If we were interrupted during oplog application, replace oplog application @@ -1035,7 +1142,48 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( hangBeforeTaskCompletion.pauseWhileSet(); } - _cleanupOnTaskCompletion(status); + _cleanupOnDataSyncCompletion(status); + + // Handle recipientForgetMigration. + stdx::lock_guard lk(_mutex); + if (_stateDoc.getExpireAt() || + MONGO_unlikely(autoRecipientForgetMigration.shouldFail())) { + // Skip waiting for the recipientForgetMigration command. + setPromiseOkifNotReady(lk, _receivedRecipientForgetMigrationPromise); + } + }) + .thenRunOn(**_scopedExecutor) + .then([this, self = shared_from_this()] { + // Schedule on the _scopedExecutor to make sure we are still the primary when waiting + // for the recipientForgetMigration command. + return _receivedRecipientForgetMigrationPromise.getFuture(); + }) + .then( + [this, self = shared_from_this()] { return _markStateDocumentAsGarbageCollectable(); }) + .thenRunOn(_recipientService->getInstanceCleanupExecutor()) + .onCompletion([this, self = shared_from_this()](Status status) { + // Schedule on the parent executor to mark the completion of the whole chain so this is + // safe even on shutDown/stepDown. + stdx::lock_guard lk(_mutex); + invariant(_dataSyncCompletionPromise.getFuture().isReady()); + if (status.isOK()) { + LOGV2(4881401, + "Migration marked to be garbage collectable due to recipientForgetMigration " + "command", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = getTenantId(), + "expireAt"_attr = *_stateDoc.getExpireAt()); + setPromiseOkifNotReady(lk, _taskCompletionPromise); + } else { + // We should only hit here on a stepDown/shutDown. + LOGV2(4881402, + "Migration not marked to be garbage collectable", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = getTenantId(), + "status"_attr = status); + setPromiseErrorifNotReady(lk, _taskCompletionPromise, status); + } + _taskState.setState(TaskState::kDone); }) .semi(); } diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 0036756380e..320eb9b5366 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -89,11 +89,24 @@ public: void interrupt(Status status) override; /** - * Returns a Future that will be resolved when all work associated with this Instance has + * Interrupts the migration for garbage collection. + */ + void onReceiveRecipientForgetMigration(OperationContext* opCtx); + + /** + * Returns a Future that will be resolved when data sync associated with this Instance has * completed running. */ + SharedSemiFuture<void> getDataSyncCompletionFuture() const { + return _dataSyncCompletionPromise.getFuture(); + } + + /** + * Returns a Future that will be resolved when the work associated with this Instance has + * completed to indicate whether the migration is forgotten successfully. + */ SharedSemiFuture<void> getCompletionFuture() const { - return _completionPromise.getFuture(); + return _taskCompletionPromise.getFuture(); } /** @@ -189,7 +202,7 @@ public: void setState(StateFlag state, boost::optional<Status> interruptStatus = boost::none) { invariant(checkIfValidTransition(state), - str::stream() << "current state :" << toString(_state) + str::stream() << "current state: " << toString(_state) << ", new state: " << toString(state)); _state = state; @@ -245,6 +258,14 @@ public: }; /* + * Helper for interrupt(). + * The _receivedForgetMigrationPromise is resolved when skipWaitingForForgetMigration is + * set (e.g. stepDown/shutDown). And we use skipWaitingForForgetMigration=false for + * interruptions coming from the instance's task chain itself (e.g. _oplogFetcherCallback). + */ + void _interrupt(Status status, bool skipWaitingForForgetMigration); + + /* * Transitions the instance state to 'kStarted'. * * Persists the instance state doc and waits for it to be majority replicated. @@ -252,6 +273,14 @@ public: */ SemiFuture<void> _initializeStateDoc(WithLock); + /* + * Transitions the instance state to 'kDone' and sets the expireAt field. + * + * Persists the instance state doc and waits for it to be majority replicated. + * Throws on shutdown / notPrimary errors. + */ + SemiFuture<void> _markStateDocumentAsGarbageCollectable(); + /** * Creates a client, connects it to the donor, and authenticates it if authParams is * non-empty. Throws a user assertion on failure. @@ -285,7 +314,6 @@ public: /** * Starts the tenant oplog fetcher. */ - void _startOplogFetcher(); /** @@ -329,10 +357,10 @@ public: void _cancelRemainingWork(WithLock lk); /* - * Performs some cleanup work on task completion, like, shutting down the components or - * fulfilling any instance promises. + * Performs some cleanup work on sync completion, like, shutting down the components or + * fulfilling any data-sync related instance promises. */ - void _cleanupOnTaskCompletion(Status status); + void _cleanupOnDataSyncCompletion(Status status); /* * Makes the failpoint to stop or hang based on failpoint data "action" field. @@ -386,13 +414,21 @@ public: // Indicates whether the main task future continuation chain state kicked off by run(). TaskState _taskState; // (M) - // Promise that is resolved when the chain of work kicked off by run() has completed. - SharedPromise<void> _completionPromise; // (W) + // Promise that is resolved when the state document is initialized and persisted. + SharedPromise<void> _stateDocPersistedPromise; // (W) // Promise that is resolved Signaled when the instance has started tenant database cloner // and tenant oplog fetcher. SharedPromise<void> _dataSyncStartedPromise; // (W) // Promise that is resolved Signaled when the tenant data sync has reached consistent point. SharedPromise<OpTime> _dataConsistentPromise; // (W) + // Promise that is resolved when the data sync has completed. + SharedPromise<void> _dataSyncCompletionPromise; // (W) + // Promise that is resolved when the recipientForgetMigration command is received or on + // stepDown/shutDown with errors. + SharedPromise<void> _receivedRecipientForgetMigrationPromise; // (W) + // Promise that is resolved when the chain of work kicked off by run() has completed to + // indicate whether the state doc is successfully marked as garbage collectable. + SharedPromise<void> _taskCompletionPromise; // (W) }; private: diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index ae59a422d3b..f8d39c19b8c 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -133,11 +133,12 @@ class TenantMigrationRecipientServiceTest : public ServiceContextMongoDTest { public: class stopFailPointEnableBlock : public FailPointEnableBlock { public: - explicit stopFailPointEnableBlock(StringData failPointName) + explicit stopFailPointEnableBlock(StringData failPointName, + std::int32_t error = stopFailPointErrorCode) : FailPointEnableBlock(failPointName, BSON("action" << "stop" - << "stopErrorCode" << stopFailPointErrorCode)) {} + << "stopErrorCode" << error)) {} }; void setUp() override { @@ -150,6 +151,11 @@ public: WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + // Automatically mark the state doc garbage collectable after data sync completion. + globalFailPointRegistry() + .find("autoRecipientForgetMigration") + ->setMode(FailPoint::alwaysOn); + { auto opCtx = cc().makeOperationContext(); auto replCoord = std::make_unique<ReplicationCoordinatorMock>(serviceContext); @@ -313,7 +319,8 @@ TEST_F(TenantMigrationRecipientServiceTest, BasicTenantMigrationRecipientService ASSERT_EQ(migrationUUID, instance->getMigrationUUID()); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } @@ -336,8 +343,11 @@ TEST_F(TenantMigrationRecipientServiceTest, InstanceReportsErrorOnFailureWhilePe ASSERT_EQ(migrationUUID, instance->getMigrationUUID()); // Should be able to see the instance task failure error. - auto status = instance->getCompletionFuture().getNoThrow(); + auto status = instance->getDataSyncCompletionFuture().getNoThrow(); ASSERT_EQ(ErrorCodes::NotWritablePrimary, status.code()); + // Should also fail to mark the state doc garbage collectable if we have failed to persist the + // state doc at the first place. + ASSERT_EQ(ErrorCodes::NotWritablePrimary, instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_Primary) { @@ -383,7 +393,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P taskFp->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_Secondary) { @@ -429,7 +440,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_S taskFp->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_PrimaryFails) { @@ -488,7 +500,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P // Wait for task completion failure. ASSERT_EQUALS(ErrorCodes::FailedToSatisfyReadPreference, - instance->getCompletionFuture().getNoThrow().code()); + instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_PrimaryFailsOver) { @@ -537,7 +550,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P taskFp->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_BadConnectString) { @@ -601,7 +615,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi ASSERT(instance.get()); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime()); ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime()); @@ -640,7 +655,8 @@ TEST_F(TenantMigrationRecipientServiceTest, pauseFailPoint->setMode(FailPoint::off, 0); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime()); ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime()); @@ -676,7 +692,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi ASSERT(instance.get()); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime()); ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime()); @@ -722,7 +739,8 @@ TEST_F(TenantMigrationRecipientServiceTest, pauseFailPoint->setMode(FailPoint::off, 0); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime()); ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime()); @@ -750,7 +768,8 @@ TEST_F(TenantMigrationRecipientServiceTest, ASSERT(instance.get()); // Wait for task completion. - ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow()); + ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); // Even though we failed, the memory state should still match the on-disk state. checkStateDocPersisted(opCtx.get(), instance.get()); @@ -796,7 +815,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartOplogFe taskFp->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner) { @@ -860,7 +880,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner taskFp->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplication) { @@ -908,8 +929,9 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplicat oplogFetcher->shutdownWith({ErrorCodes::Error(4881203), "Injected error"}); // Wait for task completion failure. - auto status = instance->getCompletionFuture().getNoThrow(); + auto status = instance->getDataSyncCompletionFuture().getNoThrow(); ASSERT_EQ(4881203, status.code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) { @@ -964,7 +986,8 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) { oplogFetcher->receiveBatch(1LL, {oplogEntry.toBSON()}, injectedEntryOpTime.getTimestamp()); // Wait for task completion failure. - ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow()); + ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) { @@ -1012,7 +1035,8 @@ TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) { // Wait for task completion. Since we're using a test function to cancel the applier, // the actual result is not critical. - ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow()); + ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTokenNoopsToBuffer) { @@ -1115,7 +1139,400 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTok oplogFetcherFP->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_BeforeRun) { + const UUID migrationUUID = UUID::gen(); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + auto fp = globalFailPointRegistry().find("pauseBeforeRunTenantMigrationRecipientInstance"); + fp->setMode(FailPoint::alwaysOn); + + auto opCtx = makeOperationContext(); + auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + + // The task is interrupted before it start the chain. + instance->interrupt({ErrorCodes::InterruptedDueToReplStateChange, "Test stepdown"}); + + // Test that receiving recipientForgetMigration command after that should result in the same + // error. + ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()), + AssertionException, + ErrorCodes::InterruptedDueToReplStateChange); + + fp->setMode(FailPoint::off); + + // We should fail to mark the state doc garbage collectable. + ASSERT_EQ(instance->getCompletionFuture().getNoThrow(), + ErrorCodes::InterruptedDueToReplStateChange); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_FailToInitializeStateDoc) { + stopFailPointEnableBlock fp("failWhilePersistingTenantMigrationRecipientInstanceStateDoc"); + + const UUID migrationUUID = UUID::gen(); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + auto opCtx = makeOperationContext(); + auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + + ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()), + AssertionException, + ErrorCodes::NotWritablePrimary); + // We should fail to mark the state doc garbage collectable if we have failed to initialize and + // persist the state doc at the first place. + ASSERT_EQ(instance->getCompletionFuture().getNoThrow(), ErrorCodes::NotWritablePrimary); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_WaitUntilStateDocInitialized) { + // The test fixture forgets the migration automatically, disable the failpoint for this test so + // the migration continues to wait for the recipientForgetMigration command after persisting the + // state doc. + auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration"); + autoForgetFp->setMode(FailPoint::off); + + const UUID migrationUUID = UUID::gen(); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + auto fp = globalFailPointRegistry().find("pauseAfterRunTenantMigrationRecipientInstance"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn); + + auto opCtx = makeOperationContext(); + auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + + fp->waitForTimesEntered(initialTimesEntered + 1); + + // Test that onReceiveRecipientForgetMigration waits until the state doc is initialized. + opCtx->setDeadlineAfterNowBy(Seconds(2), opCtx->getTimeoutError()); + ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()), + AssertionException, + opCtx->getTimeoutError()); + + // Unblock the task chain. + fp->setMode(FailPoint::off); + + // Make a new opCtx as the old one has expired due to timeout errors. + opCtx.reset(); + opCtx = makeOperationContext(); + + // The onReceiveRecipientForgetMigration should eventually go through. + instance->onReceiveRecipientForgetMigration(opCtx.get()); + ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(), + ErrorCodes::TenantMigrationForgotten); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + const auto doc = getStateDoc(instance.get()); + LOGV2(4881411, + "Test migration complete", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE(doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone); + ASSERT_TRUE(doc.getExpireAt() != boost::none); + ASSERT_TRUE(doc.getExpireAt().get() > opCtx->getServiceContext()->getFastClockSource()->now()); + ASSERT_TRUE(doc.getStartApplyingDonorOpTime() == boost::none); + ASSERT_TRUE(doc.getStartFetchingDonorOpTime() == boost::none); + ASSERT_TRUE(doc.getDataConsistentStopDonorOpTime() == boost::none); + ASSERT_TRUE(doc.getCloneFinishedRecipientOpTime() == boost::none); + checkStateDocPersisted(opCtx.get(), instance.get()); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterStartOpTimes) { + auto fp = + globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Create and start the instance. + auto opCtx = makeOperationContext(); + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + fp->waitForTimesEntered(initialTimesEntered + 1); + instance->onReceiveRecipientForgetMigration(opCtx.get()); + + // Skip the cloners in this test, so we provide an empty list of databases. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); + + fp->setMode(FailPoint::off); + ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(), + ErrorCodes::TenantMigrationForgotten); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + const auto doc = getStateDoc(instance.get()); + LOGV2(4881412, + "Test migration complete", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE(doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone); + ASSERT_TRUE(doc.getExpireAt() != boost::none); + ASSERT_TRUE(doc.getExpireAt().get() > opCtx->getServiceContext()->getFastClockSource()->now()); + checkStateDocPersisted(opCtx.get(), instance.get()); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterConsistent) { + // The test fixture forgets the migration automatically, disable the failpoint for this test so + // the migration continues to wait for the recipientForgetMigration command after reaching data + // consistent state. + auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration"); + autoForgetFp->setMode(FailPoint::off); + + auto dataConsistentFp = + globalFailPointRegistry().find("fpAfterDataConsistentMigrationRecipientInstance"); + auto initialTimesEntered = dataConsistentFp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Setting these causes us to skip cloning. + initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime); + initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + dataConsistentFp->waitForTimesEntered(initialTimesEntered + 1); + + { + const auto doc = getStateDoc(instance.get()); + LOGV2(4881413, + "Test migration after consistent", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE( + doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kConsistent); + ASSERT_TRUE(doc.getExpireAt() == boost::none); + checkStateDocPersisted(opCtx.get(), instance.get()); + } + + instance->onReceiveRecipientForgetMigration(opCtx.get()); + + // Test receiving duplicating recipientForgetMigration requests. + instance->onReceiveRecipientForgetMigration(opCtx.get()); + + // Continue after data being consistent. + dataConsistentFp->setMode(FailPoint::off); + + // The data sync should have completed. + ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(), + ErrorCodes::TenantMigrationForgotten); + + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + { + const auto doc = getStateDoc(instance.get()); + LOGV2(4881414, + "Test migration complete", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE( + doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone); + ASSERT_TRUE(doc.getExpireAt() != boost::none); + ASSERT_TRUE(doc.getExpireAt().get() > + opCtx->getServiceContext()->getFastClockSource()->now()); + checkStateDocPersisted(opCtx.get(), instance.get()); + } +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterFail) { + // The test fixture forgets the migration automatically, disable the failpoint for this test so + // the migration continues to wait for the recipientForgetMigration command after getting an + // error from the migration. + auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration"); + autoForgetFp->setMode(FailPoint::off); + + stopFailPointEnableBlock fp("fpAfterCollectionClonerDone"); + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Setting these causes us to skip cloning. + initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime); + initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + ASSERT_THROWS_CODE(instance->waitUntilMigrationReachesConsistentState(opCtx.get()), + AssertionException, + stopFailPointErrorCode); + + { + const auto doc = getStateDoc(instance.get()); + LOGV2(4881415, + "Test migration after collection cloner done", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE( + doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kStarted); + ASSERT_TRUE(doc.getExpireAt() == boost::none); + checkStateDocPersisted(opCtx.get(), instance.get()); + } + + // The data sync should have completed. + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + + // The instance should still be running and waiting for the recipientForgetMigration command. + instance->onReceiveRecipientForgetMigration(opCtx.get()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + { + const auto doc = getStateDoc(instance.get()); + LOGV2(4881416, + "Test migration complete", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE( + doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone); + ASSERT_TRUE(doc.getExpireAt() != boost::none); + ASSERT_TRUE(doc.getExpireAt().get() > + opCtx->getServiceContext()->getFastClockSource()->now()); + checkStateDocPersisted(opCtx.get(), instance.get()); + } +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_FailToMarkGarbageCollectable) { + // The test fixture forgets the migration automatically, disable the failpoint for this test so + // the migration continues to wait for the recipientForgetMigration command after getting an + // error from the migration. + auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration"); + autoForgetFp->setMode(FailPoint::off); + + stopFailPointEnableBlock fp("fpAfterPersistingTenantMigrationRecipientInstanceStateDoc"); + const UUID migrationUUID = UUID::gen(); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Create and start the instance. + auto opCtx = makeOperationContext(); + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + ASSERT_EQ(migrationUUID, instance->getMigrationUUID()); + + // The data sync should have completed. + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + + // Fail marking the state doc garbage collectable with a different error code, simulating a + // stepDown. + stopFailPointEnableBlock fpFailForget("fpAfterReceivingRecipientForgetMigration", + ErrorCodes::NotWritablePrimary); + + // The instance should still be running and waiting for the recipientForgetMigration command. + instance->onReceiveRecipientForgetMigration(opCtx.get()); + // Check that it fails to mark the state doc garbage collectable. + ASSERT_EQ(ErrorCodes::NotWritablePrimary, instance->getCompletionFuture().getNoThrow().code()); + + { + const auto doc = getStateDoc(instance.get()); + LOGV2(4881417, + "Test migration complete", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE( + doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kStarted); + ASSERT_TRUE(doc.getExpireAt() == boost::none); + checkStateDocPersisted(opCtx.get(), instance.get()); + } } } // namespace repl } // namespace mongo diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h index 4f1d9c06364..de3e4930a5d 100644 --- a/src/mongo/util/uuid.h +++ b/src/mongo/util/uuid.h @@ -83,6 +83,7 @@ class UUID { friend class LogicalSessionFromClient; friend class MigrationCoordinatorDocument; friend class MigrationDestinationManager; + friend class MigrationRecipientCommonData; friend class RangeDeletionTask; friend class ResolvedKeyId; friend class repl::CollectionInfo; |