summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_recipient_service_test.cpp')
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp455
1 files changed, 436 insertions, 19 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index ae59a422d3b..f8d39c19b8c 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -133,11 +133,12 @@ class TenantMigrationRecipientServiceTest : public ServiceContextMongoDTest {
public:
class stopFailPointEnableBlock : public FailPointEnableBlock {
public:
- explicit stopFailPointEnableBlock(StringData failPointName)
+ explicit stopFailPointEnableBlock(StringData failPointName,
+ std::int32_t error = stopFailPointErrorCode)
: FailPointEnableBlock(failPointName,
BSON("action"
<< "stop"
- << "stopErrorCode" << stopFailPointErrorCode)) {}
+ << "stopErrorCode" << error)) {}
};
void setUp() override {
@@ -150,6 +151,11 @@ public:
WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+ // Automatically mark the state doc garbage collectable after data sync completion.
+ globalFailPointRegistry()
+ .find("autoRecipientForgetMigration")
+ ->setMode(FailPoint::alwaysOn);
+
{
auto opCtx = cc().makeOperationContext();
auto replCoord = std::make_unique<ReplicationCoordinatorMock>(serviceContext);
@@ -313,7 +319,8 @@ TEST_F(TenantMigrationRecipientServiceTest, BasicTenantMigrationRecipientService
ASSERT_EQ(migrationUUID, instance->getMigrationUUID());
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
@@ -336,8 +343,11 @@ TEST_F(TenantMigrationRecipientServiceTest, InstanceReportsErrorOnFailureWhilePe
ASSERT_EQ(migrationUUID, instance->getMigrationUUID());
// Should be able to see the instance task failure error.
- auto status = instance->getCompletionFuture().getNoThrow();
+ auto status = instance->getDataSyncCompletionFuture().getNoThrow();
ASSERT_EQ(ErrorCodes::NotWritablePrimary, status.code());
+ // Should also fail to mark the state doc garbage collectable if we have failed to persist the
+ // state doc at the first place.
+ ASSERT_EQ(ErrorCodes::NotWritablePrimary, instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_Primary) {
@@ -383,7 +393,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P
taskFp->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_Secondary) {
@@ -429,7 +440,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_S
taskFp->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_PrimaryFails) {
@@ -488,7 +500,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P
// Wait for task completion failure.
ASSERT_EQUALS(ErrorCodes::FailedToSatisfyReadPreference,
- instance->getCompletionFuture().getNoThrow().code());
+ instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_PrimaryFailsOver) {
@@ -537,7 +550,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_P
taskFp->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_BadConnectString) {
@@ -601,7 +615,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi
ASSERT(instance.get());
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime());
ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime());
@@ -640,7 +655,8 @@ TEST_F(TenantMigrationRecipientServiceTest,
pauseFailPoint->setMode(FailPoint::off, 0);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime());
ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime());
@@ -676,7 +692,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientGetStartOpTi
ASSERT(instance.get());
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime());
ASSERT_EQ(topOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime());
@@ -722,7 +739,8 @@ TEST_F(TenantMigrationRecipientServiceTest,
pauseFailPoint->setMode(FailPoint::off, 0);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
ASSERT_EQ(txnStartOpTime, getStateDoc(instance.get()).getStartFetchingDonorOpTime());
ASSERT_EQ(newTopOfOplogOpTime, getStateDoc(instance.get()).getStartApplyingDonorOpTime());
@@ -750,7 +768,8 @@ TEST_F(TenantMigrationRecipientServiceTest,
ASSERT(instance.get());
// Wait for task completion.
- ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow());
+ ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
// Even though we failed, the memory state should still match the on-disk state.
checkStateDocPersisted(opCtx.get(), instance.get());
@@ -796,7 +815,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartOplogFe
taskFp->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner) {
@@ -860,7 +880,8 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientStartsCloner
taskFp->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplication) {
@@ -908,8 +929,9 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherFailsDuringOplogApplicat
oplogFetcher->shutdownWith({ErrorCodes::Error(4881203), "Injected error"});
// Wait for task completion failure.
- auto status = instance->getCompletionFuture().getNoThrow();
+ auto status = instance->getDataSyncCompletionFuture().getNoThrow();
ASSERT_EQ(4881203, status.code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) {
@@ -964,7 +986,8 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) {
oplogFetcher->receiveBatch(1LL, {oplogEntry.toBSON()}, injectedEntryOpTime.getTimestamp());
// Wait for task completion failure.
- ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow());
+ ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) {
@@ -1012,7 +1035,8 @@ TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) {
// Wait for task completion. Since we're using a test function to cancel the applier,
// the actual result is not critical.
- ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow());
+ ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTokenNoopsToBuffer) {
@@ -1115,7 +1139,400 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTok
oplogFetcherFP->setMode(FailPoint::off);
// Wait for task completion.
- ASSERT_EQ(stopFailPointErrorCode, instance->getCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_BeforeRun) {
+ const UUID migrationUUID = UUID::gen();
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ auto fp = globalFailPointRegistry().find("pauseBeforeRunTenantMigrationRecipientInstance");
+ fp->setMode(FailPoint::alwaysOn);
+
+ auto opCtx = makeOperationContext();
+ auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+
+ // The task is interrupted before it start the chain.
+ instance->interrupt({ErrorCodes::InterruptedDueToReplStateChange, "Test stepdown"});
+
+ // Test that receiving recipientForgetMigration command after that should result in the same
+ // error.
+ ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()),
+ AssertionException,
+ ErrorCodes::InterruptedDueToReplStateChange);
+
+ fp->setMode(FailPoint::off);
+
+ // We should fail to mark the state doc garbage collectable.
+ ASSERT_EQ(instance->getCompletionFuture().getNoThrow(),
+ ErrorCodes::InterruptedDueToReplStateChange);
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_FailToInitializeStateDoc) {
+ stopFailPointEnableBlock fp("failWhilePersistingTenantMigrationRecipientInstanceStateDoc");
+
+ const UUID migrationUUID = UUID::gen();
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ auto opCtx = makeOperationContext();
+ auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+
+ ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()),
+ AssertionException,
+ ErrorCodes::NotWritablePrimary);
+ // We should fail to mark the state doc garbage collectable if we have failed to initialize and
+ // persist the state doc at the first place.
+ ASSERT_EQ(instance->getCompletionFuture().getNoThrow(), ErrorCodes::NotWritablePrimary);
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_WaitUntilStateDocInitialized) {
+ // The test fixture forgets the migration automatically, disable the failpoint for this test so
+ // the migration continues to wait for the recipientForgetMigration command after persisting the
+ // state doc.
+ auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration");
+ autoForgetFp->setMode(FailPoint::off);
+
+ const UUID migrationUUID = UUID::gen();
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ auto fp = globalFailPointRegistry().find("pauseAfterRunTenantMigrationRecipientInstance");
+ auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn);
+
+ auto opCtx = makeOperationContext();
+ auto instance = repl::TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+
+ // Test that onReceiveRecipientForgetMigration waits until the state doc is initialized.
+ opCtx->setDeadlineAfterNowBy(Seconds(2), opCtx->getTimeoutError());
+ ASSERT_THROWS_CODE(instance->onReceiveRecipientForgetMigration(opCtx.get()),
+ AssertionException,
+ opCtx->getTimeoutError());
+
+ // Unblock the task chain.
+ fp->setMode(FailPoint::off);
+
+ // Make a new opCtx as the old one has expired due to timeout errors.
+ opCtx.reset();
+ opCtx = makeOperationContext();
+
+ // The onReceiveRecipientForgetMigration should eventually go through.
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+ ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(),
+ ErrorCodes::TenantMigrationForgotten);
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881411,
+ "Test migration complete",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone);
+ ASSERT_TRUE(doc.getExpireAt() != boost::none);
+ ASSERT_TRUE(doc.getExpireAt().get() > opCtx->getServiceContext()->getFastClockSource()->now());
+ ASSERT_TRUE(doc.getStartApplyingDonorOpTime() == boost::none);
+ ASSERT_TRUE(doc.getStartFetchingDonorOpTime() == boost::none);
+ ASSERT_TRUE(doc.getDataConsistentStopDonorOpTime() == boost::none);
+ ASSERT_TRUE(doc.getCloneFinishedRecipientOpTime() == boost::none);
+ checkStateDocPersisted(opCtx.get(), instance.get());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterStartOpTimes) {
+ auto fp =
+ globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance");
+ auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Create and start the instance.
+ auto opCtx = makeOperationContext();
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+
+ // Skip the cloners in this test, so we provide an empty list of databases.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
+
+ fp->setMode(FailPoint::off);
+ ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(),
+ ErrorCodes::TenantMigrationForgotten);
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881412,
+ "Test migration complete",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone);
+ ASSERT_TRUE(doc.getExpireAt() != boost::none);
+ ASSERT_TRUE(doc.getExpireAt().get() > opCtx->getServiceContext()->getFastClockSource()->now());
+ checkStateDocPersisted(opCtx.get(), instance.get());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterConsistent) {
+ // The test fixture forgets the migration automatically, disable the failpoint for this test so
+ // the migration continues to wait for the recipientForgetMigration command after reaching data
+ // consistent state.
+ auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration");
+ autoForgetFp->setMode(FailPoint::off);
+
+ auto dataConsistentFp =
+ globalFailPointRegistry().find("fpAfterDataConsistentMigrationRecipientInstance");
+ auto initialTimesEntered = dataConsistentFp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Setting these causes us to skip cloning.
+ initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime);
+ initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+ dataConsistentFp->waitForTimesEntered(initialTimesEntered + 1);
+
+ {
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881413,
+ "Test migration after consistent",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(
+ doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kConsistent);
+ ASSERT_TRUE(doc.getExpireAt() == boost::none);
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ }
+
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+
+ // Test receiving duplicating recipientForgetMigration requests.
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+
+ // Continue after data being consistent.
+ dataConsistentFp->setMode(FailPoint::off);
+
+ // The data sync should have completed.
+ ASSERT_EQ(instance->getDataSyncCompletionFuture().getNoThrow(),
+ ErrorCodes::TenantMigrationForgotten);
+
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ {
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881414,
+ "Test migration complete",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(
+ doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone);
+ ASSERT_TRUE(doc.getExpireAt() != boost::none);
+ ASSERT_TRUE(doc.getExpireAt().get() >
+ opCtx->getServiceContext()->getFastClockSource()->now());
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ }
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_AfterFail) {
+ // The test fixture forgets the migration automatically, disable the failpoint for this test so
+ // the migration continues to wait for the recipientForgetMigration command after getting an
+ // error from the migration.
+ auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration");
+ autoForgetFp->setMode(FailPoint::off);
+
+ stopFailPointEnableBlock fp("fpAfterCollectionClonerDone");
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Setting these causes us to skip cloning.
+ initialStateDocument.setCloneFinishedRecipientOpTime(topOfOplogOpTime);
+ initialStateDocument.setDataConsistentStopDonorOpTime(topOfOplogOpTime);
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ ASSERT_THROWS_CODE(instance->waitUntilMigrationReachesConsistentState(opCtx.get()),
+ AssertionException,
+ stopFailPointErrorCode);
+
+ {
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881415,
+ "Test migration after collection cloner done",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(
+ doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kStarted);
+ ASSERT_TRUE(doc.getExpireAt() == boost::none);
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ }
+
+ // The data sync should have completed.
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+
+ // The instance should still be running and waiting for the recipientForgetMigration command.
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ {
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881416,
+ "Test migration complete",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(
+ doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kDone);
+ ASSERT_TRUE(doc.getExpireAt() != boost::none);
+ ASSERT_TRUE(doc.getExpireAt().get() >
+ opCtx->getServiceContext()->getFastClockSource()->now());
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ }
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientForgetMigration_FailToMarkGarbageCollectable) {
+ // The test fixture forgets the migration automatically, disable the failpoint for this test so
+ // the migration continues to wait for the recipientForgetMigration command after getting an
+ // error from the migration.
+ auto autoForgetFp = globalFailPointRegistry().find("autoRecipientForgetMigration");
+ autoForgetFp->setMode(FailPoint::off);
+
+ stopFailPointEnableBlock fp("fpAfterPersistingTenantMigrationRecipientInstanceStateDoc");
+ const UUID migrationUUID = UUID::gen();
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Create and start the instance.
+ auto opCtx = makeOperationContext();
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ ASSERT_EQ(migrationUUID, instance->getMigrationUUID());
+
+ // The data sync should have completed.
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+
+ // Fail marking the state doc garbage collectable with a different error code, simulating a
+ // stepDown.
+ stopFailPointEnableBlock fpFailForget("fpAfterReceivingRecipientForgetMigration",
+ ErrorCodes::NotWritablePrimary);
+
+ // The instance should still be running and waiting for the recipientForgetMigration command.
+ instance->onReceiveRecipientForgetMigration(opCtx.get());
+ // Check that it fails to mark the state doc garbage collectable.
+ ASSERT_EQ(ErrorCodes::NotWritablePrimary, instance->getCompletionFuture().getNoThrow().code());
+
+ {
+ const auto doc = getStateDoc(instance.get());
+ LOGV2(4881417,
+ "Test migration complete",
+ "preStateDoc"_attr = initialStateDocument.toBSON(),
+ "postStateDoc"_attr = doc.toBSON());
+ ASSERT_EQ(doc.getDonorConnectionString(), replSet.getConnectionString());
+ ASSERT_EQ(doc.getTenantId(), "tenantA");
+ ASSERT_TRUE(
+ doc.getReadPreference().equals(ReadPreferenceSetting(ReadPreference::PrimaryOnly)));
+ ASSERT_TRUE(doc.getState() == TenantMigrationRecipientStateEnum::kStarted);
+ ASSERT_TRUE(doc.getExpireAt() == boost::none);
+ checkStateDocPersisted(opCtx.get(), instance.get());
+ }
}
} // namespace repl
} // namespace mongo