diff options
author | Wenbin Zhu <wenbin.zhu@mongodb.com> | 2021-04-20 20:59:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-21 01:47:48 +0000 |
commit | d06ab1800e38439afbb71ae67e49b0e22a6a5c5c (patch) | |
tree | 1a299e2d09183706655d0f461629f3ba3ce7620a /src | |
parent | 31babedc11b394be2db40f33fe087af202d80be0 (diff) | |
download | mongo-d06ab1800e38439afbb71ae67e49b0e22a6a5c5c.tar.gz |
SERVER-53790 Test tenant migration recipient respects the provided read preference on donor.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 157 |
1 files changed, 152 insertions, 5 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index 019467c5b49..6a208f439a7 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -605,7 +605,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_S } TEST_F(TenantMigrationRecipientServiceTest, - TenantMigrationRecipientConnect_ExcludedPrimaryHostPrimaryOnly) { + TenantMigrationRecipientConnection_ExcludedPrimaryHostPrimaryOnly) { FailPointEnableBlock skipRetriesFp("skipRetriesWhenConnectingToDonorHost"); auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion"); @@ -680,7 +680,7 @@ TEST_F(TenantMigrationRecipientServiceTest, } TEST_F(TenantMigrationRecipientServiceTest, - TenantMigrationRecipientConnect_ExcludedPrimaryHostExpires) { + TenantMigrationRecipientConnection_ExcludedPrimaryHostExpires) { stopFailPointEnableBlock fp("fpAfterConnectingTenantMigrationRecipientInstance"); auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion"); @@ -752,7 +752,7 @@ TEST_F(TenantMigrationRecipientServiceTest, } TEST_F(TenantMigrationRecipientServiceTest, - TenantMigrationRecipientConnect_ExcludedAllHostsNearest) { + TenantMigrationRecipientConnection_ExcludedAllHostsNearest) { FailPointEnableBlock skipRetriesFp("skipRetriesWhenConnectingToDonorHost"); auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion"); @@ -830,7 +830,7 @@ TEST_F(TenantMigrationRecipientServiceTest, } TEST_F(TenantMigrationRecipientServiceTest, - TenantMigrationRecipientConnect_ExcludedPrimaryWithPrimaryPreferred) { + TenantMigrationRecipientConnection_ExcludedPrimaryWithPrimaryPreferred) { stopFailPointEnableBlock fp("fpAfterConnectingTenantMigrationRecipientInstance"); auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion"); @@ -899,7 +899,7 @@ TEST_F(TenantMigrationRecipientServiceTest, } TEST_F(TenantMigrationRecipientServiceTest, - TenantMigrationRecipientConnect_ExcludedPrimaryExpiresWithPrimaryPreferred) { + TenantMigrationRecipientConnection_ExcludedPrimaryExpiresWithPrimaryPreferred) { stopFailPointEnableBlock fp("fpAfterConnectingTenantMigrationRecipientInstance"); auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion"); @@ -971,6 +971,153 @@ TEST_F(TenantMigrationRecipientServiceTest, } TEST_F(TenantMigrationRecipientServiceTest, + TenantMigrationRecipientConnection_SecondariesDownOrExcludedSecondaryOnly) { + FailPointEnableBlock skipRetriesFp("skipRetriesWhenConnectingToDonorHost"); + + auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion"); + auto taskFpInitialTimesEntered = taskFp->setMode(FailPoint::alwaysOn); + + const UUID migrationUUID = UUID::gen(); + + MockReplicaSet replSet("donorSet", 3); + + insertTopOfOplog(&replSet, OpTime(kDefaultStartMigrationTimestamp, 1)); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::SecondaryOnly)); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + // Hang the migration before attempting to connect to clients. + auto hangFp = + globalFailPointRegistry().find("fpAfterPersistingTenantMigrationRecipientInstanceStateDoc"); + auto hangFpInitialTimesEntered = hangFp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + // Create and start the instance. + auto opCtx = makeOperationContext(); + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + hangFp->waitForTimesEntered(hangFpInitialTimesEntered + 1); + + // Shutdown one secondary and mark the other secondary as excluded. + auto hosts = replSet.getHosts(); + replSet.kill(hosts[1].toString()); + auto now = opCtx->getServiceContext()->getFastClockSource()->now(); + auto excludeTime = Milliseconds(500); + instance->excludeDonorHost_forTest(hosts.at(2), now + excludeTime); + + AtomicWord<bool> runReplMonitor{true}; + // Keep scanning the replica set while waiting to reach the failpoint. This would normally + // be automatic but that doesn't work with mock replica sets. + stdx::thread replMonitorThread([&] { + Client::initThread("replMonitorThread"); + while (runReplMonitor.load()) { + auto monitor = ReplicaSetMonitor::get(replSet.getSetName()); + // Monitor may not have been created yet. + if (monitor) { + monitor->runScanForMockReplicaSet(); + } + mongo::sleepmillis(100); + } + }); + + hangFp->setMode(FailPoint::off); + taskFp->waitForTimesEntered(taskFpInitialTimesEntered + 1); + runReplMonitor.store(false); + replMonitorThread.join(); + + auto* client = getClient(instance.get()); + auto* oplogFetcherClient = getOplogFetcherClient(instance.get()); + // Neither client should be populated. + ASSERT_FALSE(client); + ASSERT_FALSE(oplogFetcherClient); + + taskFp->setMode(FailPoint::off); + + // Wait for task completion failure. + ASSERT_EQUALS(ErrorCodes::FailedToSatisfyReadPreference, + instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); +} + +TEST_F(TenantMigrationRecipientServiceTest, + TenantMigrationRecipientConnection_SecondariesDownOrExcludedSecondaryPreferred) { + stopFailPointEnableBlock fp("fpAfterConnectingTenantMigrationRecipientInstance"); + + auto taskFp = globalFailPointRegistry().find("hangBeforeTaskCompletion"); + auto taskFpInitialTimesEntered = taskFp->setMode(FailPoint::alwaysOn); + + const UUID migrationUUID = UUID::gen(); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + + insertTopOfOplog(&replSet, OpTime(kDefaultStartMigrationTimestamp, 1)); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::SecondaryPreferred)); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + // Hang the migration before attempting to connect to clients. + auto hangFp = + globalFailPointRegistry().find("fpAfterPersistingTenantMigrationRecipientInstanceStateDoc"); + auto hangFpInitialTimesEntered = hangFp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + // Create and start the instance. + auto opCtx = makeOperationContext(); + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + hangFp->waitForTimesEntered(hangFpInitialTimesEntered + 1); + + // Shutdown one secondary and mark the other secondary as excluded. + auto hosts = replSet.getHosts(); + replSet.kill(hosts[1].toString()); + auto now = opCtx->getServiceContext()->getFastClockSource()->now(); + auto excludeTime = Milliseconds(500); + instance->excludeDonorHost_forTest(hosts.at(2), now + excludeTime); + + hangFp->setMode(FailPoint::off); + taskFp->waitForTimesEntered(taskFpInitialTimesEntered + 1); + + auto* client = getClient(instance.get()); + auto* oplogFetcherClient = getOplogFetcherClient(instance.get()); + // Both clients should be populated. + ASSERT(client); + ASSERT(oplogFetcherClient); + + // Clients should be distinct. + ASSERT(client != oplogFetcherClient); + + // Clients should be connected to the primary. + auto primary = replSet.getHosts()[0].toString(); + ASSERT_EQ(primary, client->getServerAddress()); + ASSERT(client->isStillConnected()); + ASSERT_EQ(primary, oplogFetcherClient->getServerAddress()); + ASSERT(oplogFetcherClient->isStillConnected()); + + taskFp->setMode(FailPoint::off); + + // Wait for task completion. + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); +} + +TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientConnection_RemoteMajorityOpTimeBehindStartApplying) { stopFailPointEnableBlock fp("fpAfterConnectingTenantMigrationRecipientInstance"); |