diff options
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_recipient_service_test.cpp')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 455 |
1 files changed, 436 insertions, 19 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index ae59a422d3b..f8d39c19b8c 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -133,11 +133,12 @@ class TenantMigrationRecipientServiceTest : public ServiceContextMongoDTest { public: class stopFailPointEnableBlock : public FailPointEnableBlock { public: - explicit stopFailPointEnableBlock(StringData failPointName) + explicit stopFailPointEnableBlock(StringData failPointName, + std::int32_t error = stopFailPointErrorCode) : FailPointEnableBlock(failPointName, BSON("action" << "stop" - << "stopErrorCode" << stopFailPointErrorCode)) {} + << "stopErrorCode" << error)) {} }; void setUp() override { @@ -150,6 +151,11 @@ public: WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + // Automatically mark the state doc garbage collectable after data sync completion. + globalFailPointRegistry() + .find("autoRecipientForgetMigration") + ->setMode(FailPoint::alwaysOn); + { auto opCtx = cc().makeOperationContext(); auto replCoord = std::make_unique<ReplicationCoordinatorMock>(serviceContext); @@ -313,7 +319,8 @@ TEST_F(TenantMigrationRecipientServiceTest, BasicTenantMigrationRecipientService ASSERT_EQ(migrationUUID, instance->getMigrationUUID()); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } @@ -336,8 +343,11 @@ TEST_F(TenantMigrationRecipientServiceTest, InstanceReportsErrorOnFailureWhilePe ASSERT_EQ(migrationUUID, instance->getMigrationUUID()); // Should be able to see the instance task failure error. - auto status = instance->getCompletionFuture().getNoThrow(); + auto status = instance->getDataSyncCompletionFuture().getNoThrow(); ASSERT_EQ(ErrorCodes::NotWritablePrimary, status.code()); + // Should also fail to mark the state doc garbage collectable if we have failed to persist the + // state doc at the first place. + ASSERT_EQ(ErrorCodes::NotWritablePrimary, instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_Primary) { @@ -383,7 +393,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P taskFp->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_Secondary) { @@ -429,7 +440,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_S taskFp->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_PrimaryFails) { @@ -488,7 +500,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P // Wait for task completion failure. ASSERT_EQUALS(ErrorCodes::FailedToSatisfyReadPreference, - instance->getCompletionFuture().getNoThrow().code()); + instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_PrimaryFailsOver) { @@ -537,7 +550,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P taskFp->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_BadConnectString) { @@ -601,7 +615,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi ASSERT(instance.get()); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime()); ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime()); @@ -640,7 +655,8 @@ TEST_F(TenantMigrationRecipientServiceTest, pauseFailPoint->setMode(FailPoint::off, 0); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime()); ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime()); @@ -676,7 +692,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi ASSERT(instance.get()); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime()); ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime()); @@ -722,7 +739,8 @@ TEST_F(TenantMigrationRecipientServiceTest, pauseFailPoint->setMode(FailPoint::off, 0); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime()); ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime()); @@ -750,7 +768,8 @@ TEST_F(TenantMigrationRecipientServiceTest, ASSERT(instance.get()); // Wait for task completion. - ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow()); + ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); // Even though we failed, the memory state should still match the on-disk state. checkStateDocPersisted(opCtx.get(), instance.get()); @@ -796,7 +815,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartOplogFe taskFp->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner) { @@ -860,7 +880,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner taskFp->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplication) { @@ -908,8 +929,9 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplicat oplogFetcher->shutdownWith({ErrorCodes::Error(4881203), "Injected error"}); // Wait for task completion failure. - auto status = instance->getCompletionFuture().getNoThrow(); + auto status = instance->getDataSyncCompletionFuture().getNoThrow(); ASSERT_EQ(4881203, status.code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) { @@ -964,7 +986,8 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) { oplogFetcher->receiveBatch(1LL, {oplogEntry.toBSON()}, injectedEntryOpTime.getTimestamp()); // Wait for task completion failure. - ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow()); + ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) { @@ -1012,7 +1035,8 @@ TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) { // Wait for task completion. Since we're using a test function to cancel the applier, // the actual result is not critical. - ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow()); + ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTokenNoopsToBuffer) { @@ -1115,7 +1139,400 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTok oplogFetcherFP->setMode(FailPoint::off); // Wait for task completion. - ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code()); + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_BeforeRun) { + const UUID migrationUUID = UUID::gen(); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + auto fp = globalFailPointRegistry().find("pauseBeforeRunTenantMigrationRecipientInstance"); + fp->setMode(FailPoint::alwaysOn); + + auto opCtx = makeOperationContext(); + auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + + // The task is interrupted before it start the chain. + instance->interrupt({ErrorCodes::InterruptedDueToReplStateChange, "Test stepdown"}); + + // Test that receiving recipientForgetMigration command after that should result in the same + // error. + ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()), + AssertionException, + ErrorCodes::InterruptedDueToReplStateChange); + + fp->setMode(FailPoint::off); + + // We should fail to mark the state doc garbage collectable. + ASSERT_EQ(instance->getCompletionFuture().getNoThrow(), + ErrorCodes::InterruptedDueToReplStateChange); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_FailToInitializeStateDoc) { + stopFailPointEnableBlock fp("failWhilePersistingTenantMigrationRecipientInstanceStateDoc"); + + const UUID migrationUUID = UUID::gen(); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + auto opCtx = makeOperationContext(); + auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + + ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()), + AssertionException, + ErrorCodes::NotWritablePrimary); + // We should fail to mark the state doc garbage collectable if we have failed to initialize and + // persist the state doc at the first place. + ASSERT_EQ(instance->getCompletionFuture().getNoThrow(), ErrorCodes::NotWritablePrimary); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_WaitUntilStateDocInitialized) { + // The test fixture forgets the migration automatically, disable the failpoint for this test so + // the migration continues to wait for the recipientForgetMigration command after persisting the + // state doc. + auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration"); + autoForgetFp->setMode(FailPoint::off); + + const UUID migrationUUID = UUID::gen(); + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + auto fp = globalFailPointRegistry().find("pauseAfterRunTenantMigrationRecipientInstance"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn); + + auto opCtx = makeOperationContext(); + auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + + fp->waitForTimesEntered(initialTimesEntered + 1); + + // Test that onReceiveRecipientForgetMigration waits until the state doc is initialized. + opCtx->setDeadlineAfterNowBy(Seconds(2), opCtx->getTimeoutError()); + ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()), + AssertionException, + opCtx->getTimeoutError()); + + // Unblock the task chain. + fp->setMode(FailPoint::off); + + // Make a new opCtx as the old one has expired due to timeout errors. + opCtx.reset(); + opCtx = makeOperationContext(); + + // The onReceiveRecipientForgetMigration should eventually go through. + instance->onReceiveRecipientForgetMigration(opCtx.get()); + ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(), + ErrorCodes::TenantMigrationForgotten); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + const auto doc = getStateDoc(instance.get()); + LOGV2(4881411, + "Test migration complete", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE(doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone); + ASSERT_TRUE(doc.getExpireAt() != boost::none); + ASSERT_TRUE(doc.getExpireAt().get() > opCtx->getServiceContext()->getFastClockSource()->now()); + ASSERT_TRUE(doc.getStartApplyingDonorOpTime() == boost::none); + ASSERT_TRUE(doc.getStartFetchingDonorOpTime() == boost::none); + ASSERT_TRUE(doc.getDataConsistentStopDonorOpTime() == boost::none); + ASSERT_TRUE(doc.getCloneFinishedRecipientOpTime() == boost::none); + checkStateDocPersisted(opCtx.get(), instance.get()); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterStartOpTimes) { + auto fp = + globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Create and start the instance. + auto opCtx = makeOperationContext(); + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + fp->waitForTimesEntered(initialTimesEntered + 1); + instance->onReceiveRecipientForgetMigration(opCtx.get()); + + // Skip the cloners in this test, so we provide an empty list of databases. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); + + fp->setMode(FailPoint::off); + ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(), + ErrorCodes::TenantMigrationForgotten); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + const auto doc = getStateDoc(instance.get()); + LOGV2(4881412, + "Test migration complete", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE(doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone); + ASSERT_TRUE(doc.getExpireAt() != boost::none); + ASSERT_TRUE(doc.getExpireAt().get() > opCtx->getServiceContext()->getFastClockSource()->now()); + checkStateDocPersisted(opCtx.get(), instance.get()); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterConsistent) { + // The test fixture forgets the migration automatically, disable the failpoint for this test so + // the migration continues to wait for the recipientForgetMigration command after reaching data + // consistent state. + auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration"); + autoForgetFp->setMode(FailPoint::off); + + auto dataConsistentFp = + globalFailPointRegistry().find("fpAfterDataConsistentMigrationRecipientInstance"); + auto initialTimesEntered = dataConsistentFp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Setting these causes us to skip cloning. + initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime); + initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + dataConsistentFp->waitForTimesEntered(initialTimesEntered + 1); + + { + const auto doc = getStateDoc(instance.get()); + LOGV2(4881413, + "Test migration after consistent", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE( + doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kConsistent); + ASSERT_TRUE(doc.getExpireAt() == boost::none); + checkStateDocPersisted(opCtx.get(), instance.get()); + } + + instance->onReceiveRecipientForgetMigration(opCtx.get()); + + // Test receiving duplicating recipientForgetMigration requests. + instance->onReceiveRecipientForgetMigration(opCtx.get()); + + // Continue after data being consistent. + dataConsistentFp->setMode(FailPoint::off); + + // The data sync should have completed. + ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(), + ErrorCodes::TenantMigrationForgotten); + + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + { + const auto doc = getStateDoc(instance.get()); + LOGV2(4881414, + "Test migration complete", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE( + doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone); + ASSERT_TRUE(doc.getExpireAt() != boost::none); + ASSERT_TRUE(doc.getExpireAt().get() > + opCtx->getServiceContext()->getFastClockSource()->now()); + checkStateDocPersisted(opCtx.get(), instance.get()); + } +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterFail) { + // The test fixture forgets the migration automatically, disable the failpoint for this test so + // the migration continues to wait for the recipientForgetMigration command after getting an + // error from the migration. + auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration"); + autoForgetFp->setMode(FailPoint::off); + + stopFailPointEnableBlock fp("fpAfterCollectionClonerDone"); + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Setting these causes us to skip cloning. + initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime); + initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + ASSERT_THROWS_CODE(instance->waitUntilMigrationReachesConsistentState(opCtx.get()), + AssertionException, + stopFailPointErrorCode); + + { + const auto doc = getStateDoc(instance.get()); + LOGV2(4881415, + "Test migration after collection cloner done", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE( + doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kStarted); + ASSERT_TRUE(doc.getExpireAt() == boost::none); + checkStateDocPersisted(opCtx.get(), instance.get()); + } + + // The data sync should have completed. + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + + // The instance should still be running and waiting for the recipientForgetMigration command. + instance->onReceiveRecipientForgetMigration(opCtx.get()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + { + const auto doc = getStateDoc(instance.get()); + LOGV2(4881416, + "Test migration complete", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE( + doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone); + ASSERT_TRUE(doc.getExpireAt() != boost::none); + ASSERT_TRUE(doc.getExpireAt().get() > + opCtx->getServiceContext()->getFastClockSource()->now()); + checkStateDocPersisted(opCtx.get(), instance.get()); + } +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_FailToMarkGarbageCollectable) { + // The test fixture forgets the migration automatically, disable the failpoint for this test so + // the migration continues to wait for the recipientForgetMigration command after getting an + // error from the migration. + auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration"); + autoForgetFp->setMode(FailPoint::off); + + stopFailPointEnableBlock fp("fpAfterPersistingTenantMigrationRecipientInstanceStateDoc"); + const UUID migrationUUID = UUID::gen(); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Create and start the instance. + auto opCtx = makeOperationContext(); + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + ASSERT_EQ(migrationUUID, instance->getMigrationUUID()); + + // The data sync should have completed. + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + + // Fail marking the state doc garbage collectable with a different error code, simulating a + // stepDown. + stopFailPointEnableBlock fpFailForget("fpAfterReceivingRecipientForgetMigration", + ErrorCodes::NotWritablePrimary); + + // The instance should still be running and waiting for the recipientForgetMigration command. + instance->onReceiveRecipientForgetMigration(opCtx.get()); + // Check that it fails to mark the state doc garbage collectable. + ASSERT_EQ(ErrorCodes::NotWritablePrimary, instance->getCompletionFuture().getNoThrow().code()); + + { + const auto doc = getStateDoc(instance.get()); + LOGV2(4881417, + "Test migration complete", + "preStateDoc"_attr = initialStateDocument.toBSON(), + "postStateDoc"_attr = doc.toBSON()); + ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString()); + ASSERT_EQ(doc.getTenantId(), "tenantA"); + ASSERT_TRUE( + doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly))); + ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kStarted); + ASSERT_TRUE(doc.getExpireAt() == boost::none); + checkStateDocPersisted(opCtx.get(), instance.get()); + } } } // namespace repl } // namespace mongo |