diff options
author | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2021-11-11 12:00:00 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-11-16 01:42:28 +0000 |
commit | 71e85a2029d7e93f2915191ea28be10e2d869859 (patch) | |
tree | baf9d30111ae572cf0b5a593cc40a4802d093328 | |
parent | ef75c91cacef77b0c478ff6eab53e40a3cf9a99a (diff) | |
download | mongo-71e85a2029d7e93f2915191ea28be10e2d869859.tar.gz |
SERVER-60969 Race in tenant migration state machine, try 2
(cherry picked from commit 65b6cd2fc68021388b992fad6d46fa349324f2a2)
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 89 |
2 files changed, 111 insertions, 9 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 1529c05fa2a..6978555c310 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -514,14 +514,23 @@ TenantMigrationRecipientService::Instance::waitUntilMigrationReachesReturnAfterR opCtx, returnAfterReachingTimestamp, donorRecipientOpTimePair.recipientOpTime)); } _stopOrHangOnFailPoint(&fpBeforePersistingRejectReadsBeforeTimestamp, opCtx); - uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc)); - auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + auto lastOpBeforeUpdate = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc)); + auto lastOpAfterUpdate = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); auto replCoord = repl::ReplicationCoordinator::get(_serviceContext); + if (lastOpBeforeUpdate == lastOpAfterUpdate) { + // updateStateDoc was a no-op, but we still must ensure it's all-replicated. + lastOpAfterUpdate = uassertStatusOK(replCoord->getLatestWriteOpTime(opCtx)); + LOGV2(6096900, + "Fixed write timestamp for recording rejectReadsBeforeTimestamp", + "newWriteOpTime"_attr = lastOpAfterUpdate); + } + WriteConcernOptions writeConcern(repl::ReplSetConfig::kConfigAllWriteConcernName, WriteConcernOptions::SyncMode::NONE, opCtx->getWriteConcern().wTimeout); - uassertStatusOK(replCoord->awaitReplication(opCtx, writeOpTime, writeConcern).status); + uassertStatusOK(replCoord->awaitReplication(opCtx, lastOpAfterUpdate, writeConcern).status); _stopOrHangOnFailPoint(&fpAfterWaitForRejectReadsBeforeTimestamp, opCtx); @@ -1460,6 +1469,7 @@ void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status opl void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint* fp, OperationContext* opCtx) { + auto shouldHang = false; fp->executeIf( [&](const BSONObj& data) { LOGV2(4881103, @@ -1469,11 +1479,8 @@ void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint "name"_attr = fp->getName(), "args"_attr = data); if (data["action"].str() == "hang") { - if (opCtx) { - fp->pauseWhileSet(opCtx); - } else { - fp->pauseWhileSet(); - } + // fp is locked. If we call pauseWhileSet here, another thread can't disable fp. + shouldHang = true; } else { uasserted(data["stopErrorCode"].numberInt(), "Skipping remaining processing due to fail point"); @@ -1483,6 +1490,14 @@ void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint auto action = data["action"].str(); return (action == "hang" || action == "stop"); }); + + if (shouldHang) { + if (opCtx) { + fp->pauseWhileSet(opCtx); + } else { + fp->pauseWhileSet(); + } + } } bool TenantMigrationRecipientService::Instance::_isCloneCompletedMarkerSet(WithLock) const { diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index b30dee191a0..009a6043623 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -61,7 +61,7 @@ #include "mongo/dbtests/mock/mock_conn_registry.h" #include "mongo/dbtests/mock/mock_replica_set.h" #include "mongo/executor/network_interface.h" -#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" @@ -3318,6 +3318,93 @@ TEST_F(TenantMigrationRecipientServiceTest, WaitUntilMigrationReachesReturnAfter ASSERT_GTE(lastAppliedOpTime, newOpTime); } +// When the recipientSyncData command is called with returnAfterReachingTimestamp, mongod updates +// the state document's rejectReadsBeforeTimestamp value and waits for w:all acknowledgment. +// If there are two concurrent calls to the command with the same +// returnAfterReachingTimestamp, then two threads may write the same rejectReadsBeforeTimestamp +// value. Whichever thread writes it second is therefore a no-op, and its +// ReplClientInfo::getLastOp() will be null. Test that the second thread detects the problem. +TEST_F(TenantMigrationRecipientServiceTest, DuplicateReturnAfterReachingTimestamp) { + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + ThreadPool::Options threadPoolOptions; + threadPoolOptions.threadNamePrefix = "TenantMigrationRecipientServiceTest-"; + threadPoolOptions.poolName = "TenantMigrationRecipientServiceTestThreadPool"; + threadPoolOptions.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; + + auto executor = std::make_shared<executor::ThreadPoolTaskExecutor>( + std::make_unique<ThreadPool>(threadPoolOptions), + std::make_unique<executor::NetworkInterfaceMock>()); + executor->startup(); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock())); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + // Skip the cloners in this test, so we provide an empty list of databases. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + instance->waitUntilMigrationReachesConsistentState(opCtx.get()); + checkStateDocPersisted(opCtx.get(), instance.get()); + + // Simulate recipient receiving a donor timestamp. + auto returnAfterReachingTimestamp = + ReplicationCoordinator::get(getServiceContext())->getMyLastAppliedOpTime().getTimestamp() + + 1; + + auto fp = globalFailPointRegistry().find("fpBeforePersistingRejectReadsBeforeTimestamp"); + auto timesEntered = fp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + startCapturingLogMessages(); + auto future1 = ExecutorFuture<void>(executor).then([&]() { + auto innerOpCtx = makeOperationContext(); + instance->waitUntilMigrationReachesReturnAfterReachingTimestamp( + innerOpCtx.get(), returnAfterReachingTimestamp); + }); + auto future2 = ExecutorFuture<void>(executor).then([&]() { + auto innerOpCtx = makeOperationContext(); + instance->waitUntilMigrationReachesReturnAfterReachingTimestamp( + innerOpCtx.get(), returnAfterReachingTimestamp); + }); + + fp->waitForTimesEntered(timesEntered + 2); + fp->setMode(FailPoint::off); + advanceTime(Seconds(10)); // Wake threads that were blocked on fp. + future1.wait(); + future2.wait(); + ASSERT_EQ(1, countBSONFormatLogLinesIsSubset(BSON("id" << 6096900))); + executor->shutdown(); + executor->join(); + stopCapturingLogMessages(); +} + TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesRetriableFetcherError) { stopFailPointEnableBlock stopFp("fpAfterCollectionClonerDone"); auto fp = |