summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorA. Jesse Jiryu Davis <jesse@mongodb.com>2021-11-04 23:23:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-05 00:03:21 +0000
commit0eda431672cedbbe62e3b487d2389b1f5c5695df (patch)
treeec4b8aab3c3b2a15c1f61516e8ca2b570a7adfd1
parente47af06d2f144eab78746daac71005bbb73e69f4 (diff)
downloadmongo-0eda431672cedbbe62e3b487d2389b1f5c5695df.tar.gz
SERVER-60969 Race in tenant migration state machine
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp15
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp78
2 files changed, 90 insertions, 3 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 30c98b48af7..19f8a191489 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -525,14 +525,23 @@ TenantMigrationRecipientService::Instance::waitUntilMigrationReachesReturnAfterR
opCtx, returnAfterReachingTimestamp, donorRecipientOpTimePair.recipientOpTime));
}
_stopOrHangOnFailPoint(&fpBeforePersistingRejectReadsBeforeTimestamp, opCtx);
- uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc));
- auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ auto lastOpBeforeUpdate = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc));
+ auto lastOpAfterUpdate = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
auto replCoord = repl::ReplicationCoordinator::get(_serviceContext);
+ if (lastOpBeforeUpdate == lastOpAfterUpdate) {
+ // updateStateDoc was a no-op, but we still must ensure it's all-replicated.
+ lastOpAfterUpdate = uassertStatusOK(replCoord->getLatestWriteOpTime(opCtx));
+ LOGV2(6096900,
+ "Fixed write timestamp for recording rejectReadsBeforeTimestamp",
+ "newWriteOpTime"_attr = lastOpAfterUpdate);
+ }
+
WriteConcernOptions writeConcern(repl::ReplSetConfig::kConfigAllWriteConcernName,
WriteConcernOptions::SyncMode::NONE,
opCtx->getWriteConcern().wTimeout);
- uassertStatusOK(replCoord->awaitReplication(opCtx, writeOpTime, writeConcern).status);
+ uassertStatusOK(replCoord->awaitReplication(opCtx, lastOpAfterUpdate, writeConcern).status);
_stopOrHangOnFailPoint(&fpAfterWaitForRejectReadsBeforeTimestamp, opCtx);
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index b30dee191a0..7cd67c527a1 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -3318,6 +3318,84 @@ TEST_F(TenantMigrationRecipientServiceTest, WaitUntilMigrationReachesReturnAfter
ASSERT_GTE(lastAppliedOpTime, newOpTime);
}
+TEST_F(TenantMigrationRecipientServiceTest, DuplicateReturnAfterReachingTimestamp) {
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ ThreadPool::Options threadPoolOptions;
+ threadPoolOptions.threadNamePrefix = "TenantMigrationRecipientServiceTest-";
+ threadPoolOptions.poolName = "TenantMigrationRecipientServiceTestThreadPool";
+ threadPoolOptions.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
+
+ auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
+ auto executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPool>(threadPoolOptions),
+ executor::makeNetworkInterface(
+ "TenantMigrationRecipientServiceTestNetwork", nullptr, std::move(hookList)));
+ executor->startup();
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock()));
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ // Skip the cloners in this test, so we provide an empty list of databases.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ instance->waitUntilMigrationReachesConsistentState(opCtx.get());
+ checkStateDocPersisted(opCtx.get(), instance.get());
+
+ // Simulate recipient receiving a donor timestamp.
+ auto returnAfterReachingTimestamp =
+ ReplicationCoordinator::get(getServiceContext())->getMyLastAppliedOpTime().getTimestamp() +
+ 1;
+
+ // Both tasks write the same rejectReadsBeforeTimestamp value. Whichever task writes it second
+ // is therefore a no-op.
+ startCapturingLogMessages();
+ auto future1 = ExecutorFuture<void>(executor).then([&]() {
+ auto innerOpCtx = makeOperationContext();
+ instance->waitUntilMigrationReachesReturnAfterReachingTimestamp(
+ innerOpCtx.get(), returnAfterReachingTimestamp);
+ });
+ auto future2 = ExecutorFuture<void>(executor).then([&]() {
+ auto innerOpCtx = makeOperationContext();
+ instance->waitUntilMigrationReachesReturnAfterReachingTimestamp(
+ innerOpCtx.get(), returnAfterReachingTimestamp);
+ });
+
+ future1.wait();
+ future2.wait();
+ executor->shutdown();
+ executor->join();
+ stopCapturingLogMessages();
+ // "Fixed write timestamp for recording rejectReadsBeforeTimestamp".
+ ASSERT_EQ(1, countBSONFormatLogLinesIsSubset(BSON("id" << 6096900)));
+}
+
TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesRetriableFetcherError) {
stopFailPointEnableBlock stopFp("fpAfterCollectionClonerDone");
auto fp =