From 0eda431672cedbbe62e3b487d2389b1f5c5695df Mon Sep 17 00:00:00 2001 From: "A. Jesse Jiryu Davis" Date: Thu, 4 Nov 2021 23:23:41 +0000 Subject: SERVER-60969 Race in tenant migration state machine --- .../db/repl/tenant_migration_recipient_service.cpp | 15 ++++- .../tenant_migration_recipient_service_test.cpp | 78 ++++++++++++++++++++++ 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 30c98b48af7..19f8a191489 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -525,14 +525,23 @@ TenantMigrationRecipientService::Instance::waitUntilMigrationReachesReturnAfterR opCtx, returnAfterReachingTimestamp, donorRecipientOpTimePair.recipientOpTime)); } _stopOrHangOnFailPoint(&fpBeforePersistingRejectReadsBeforeTimestamp, opCtx); - uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc)); - auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + auto lastOpBeforeUpdate = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc)); + auto lastOpAfterUpdate = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); auto replCoord = repl::ReplicationCoordinator::get(_serviceContext); + if (lastOpBeforeUpdate == lastOpAfterUpdate) { + // updateStateDoc was a no-op, but we still must ensure it's all-replicated. + lastOpAfterUpdate = uassertStatusOK(replCoord->getLatestWriteOpTime(opCtx)); + LOGV2(6096900, + "Fixed write timestamp for recording rejectReadsBeforeTimestamp", + "newWriteOpTime"_attr = lastOpAfterUpdate); + } + WriteConcernOptions writeConcern(repl::ReplSetConfig::kConfigAllWriteConcernName, WriteConcernOptions::SyncMode::NONE, opCtx->getWriteConcern().wTimeout); - uassertStatusOK(replCoord->awaitReplication(opCtx, writeOpTime, writeConcern).status); + uassertStatusOK(replCoord->awaitReplication(opCtx, lastOpAfterUpdate, writeConcern).status); _stopOrHangOnFailPoint(&fpAfterWaitForRejectReadsBeforeTimestamp, opCtx); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index b30dee191a0..7cd67c527a1 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -3318,6 +3318,84 @@ TEST_F(TenantMigrationRecipientServiceTest, WaitUntilMigrationReachesReturnAfter ASSERT_GTE(lastAppliedOpTime, newOpTime); } +TEST_F(TenantMigrationRecipientServiceTest, DuplicateReturnAfterReachingTimestamp) { + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + ThreadPool::Options threadPoolOptions; + threadPoolOptions.threadNamePrefix = "TenantMigrationRecipientServiceTest-"; + threadPoolOptions.poolName = "TenantMigrationRecipientServiceTestThreadPool"; + threadPoolOptions.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; + + auto hookList = std::make_unique(); + auto executor = std::make_shared( + std::make_unique(threadPoolOptions), + executor::makeNetworkInterface( + "TenantMigrationRecipientServiceTestNetwork", nullptr, std::move(hookList))); + executor->startup(); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock())); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + // Skip the cloners in this test, so we provide an empty list of databases. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); + + auto opCtx = makeOperationContext(); + std::shared_ptr instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique()); + } + + instance->waitUntilMigrationReachesConsistentState(opCtx.get()); + checkStateDocPersisted(opCtx.get(), instance.get()); + + // Simulate recipient receiving a donor timestamp. + auto returnAfterReachingTimestamp = + ReplicationCoordinator::get(getServiceContext())->getMyLastAppliedOpTime().getTimestamp() + + 1; + + // Both tasks write the same rejectReadsBeforeTimestamp value. Whichever task writes it second + // is therefore a no-op. + startCapturingLogMessages(); + auto future1 = ExecutorFuture(executor).then([&]() { + auto innerOpCtx = makeOperationContext(); + instance->waitUntilMigrationReachesReturnAfterReachingTimestamp( + innerOpCtx.get(), returnAfterReachingTimestamp); + }); + auto future2 = ExecutorFuture(executor).then([&]() { + auto innerOpCtx = makeOperationContext(); + instance->waitUntilMigrationReachesReturnAfterReachingTimestamp( + innerOpCtx.get(), returnAfterReachingTimestamp); + }); + + future1.wait(); + future2.wait(); + executor->shutdown(); + executor->join(); + stopCapturingLogMessages(); + // "Fixed write timestamp for recording rejectReadsBeforeTimestamp". + ASSERT_EQ(1, countBSONFormatLogLinesIsSubset(BSON("id" << 6096900))); +} + TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesRetriableFetcherError) { stopFailPointEnableBlock stopFp("fpAfterCollectionClonerDone"); auto fp = -- cgit v1.2.1