summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorA. Jesse Jiryu Davis <jesse@mongodb.com>2021-11-11 12:00:00 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-16 01:42:28 +0000
commit71e85a2029d7e93f2915191ea28be10e2d869859 (patch)
treebaf9d30111ae572cf0b5a593cc40a4802d093328
parentef75c91cacef77b0c478ff6eab53e40a3cf9a99a (diff)
downloadmongo-71e85a2029d7e93f2915191ea28be10e2d869859.tar.gz
SERVER-60969 Race in tenant migration state machine, try 2
(cherry picked from commit 65b6cd2fc68021388b992fad6d46fa349324f2a2)
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp31
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp89
2 files changed, 111 insertions, 9 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 1529c05fa2a..6978555c310 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -514,14 +514,23 @@ TenantMigrationRecipientService::Instance::waitUntilMigrationReachesReturnAfterR
opCtx, returnAfterReachingTimestamp, donorRecipientOpTimePair.recipientOpTime));
}
_stopOrHangOnFailPoint(&fpBeforePersistingRejectReadsBeforeTimestamp, opCtx);
- uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc));
- auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ auto lastOpBeforeUpdate = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc));
+ auto lastOpAfterUpdate = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
auto replCoord = repl::ReplicationCoordinator::get(_serviceContext);
+ if (lastOpBeforeUpdate == lastOpAfterUpdate) {
+ // updateStateDoc was a no-op, but we still must ensure it's all-replicated.
+ lastOpAfterUpdate = uassertStatusOK(replCoord->getLatestWriteOpTime(opCtx));
+ LOGV2(6096900,
+ "Fixed write timestamp for recording rejectReadsBeforeTimestamp",
+ "newWriteOpTime"_attr = lastOpAfterUpdate);
+ }
+
WriteConcernOptions writeConcern(repl::ReplSetConfig::kConfigAllWriteConcernName,
WriteConcernOptions::SyncMode::NONE,
opCtx->getWriteConcern().wTimeout);
- uassertStatusOK(replCoord->awaitReplication(opCtx, writeOpTime, writeConcern).status);
+ uassertStatusOK(replCoord->awaitReplication(opCtx, lastOpAfterUpdate, writeConcern).status);
_stopOrHangOnFailPoint(&fpAfterWaitForRejectReadsBeforeTimestamp, opCtx);
@@ -1460,6 +1469,7 @@ void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status opl
void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint* fp,
OperationContext* opCtx) {
+ auto shouldHang = false;
fp->executeIf(
[&](const BSONObj& data) {
LOGV2(4881103,
@@ -1469,11 +1479,8 @@ void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint
"name"_attr = fp->getName(),
"args"_attr = data);
if (data["action"].str() == "hang") {
- if (opCtx) {
- fp->pauseWhileSet(opCtx);
- } else {
- fp->pauseWhileSet();
- }
+ // fp is locked. If we call pauseWhileSet here, another thread can't disable fp.
+ shouldHang = true;
} else {
uasserted(data["stopErrorCode"].numberInt(),
"Skipping remaining processing due to fail point");
@@ -1483,6 +1490,14 @@ void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint
auto action = data["action"].str();
return (action == "hang" || action == "stop");
});
+
+ if (shouldHang) {
+ if (opCtx) {
+ fp->pauseWhileSet(opCtx);
+ } else {
+ fp->pauseWhileSet();
+ }
+ }
}
bool TenantMigrationRecipientService::Instance::_isCloneCompletedMarkerSet(WithLock) const {
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index b30dee191a0..009a6043623 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -61,7 +61,7 @@
#include "mongo/dbtests/mock/mock_conn_registry.h"
#include "mongo/dbtests/mock/mock_replica_set.h"
#include "mongo/executor/network_interface.h"
-#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
@@ -3318,6 +3318,93 @@ TEST_F(TenantMigrationRecipientServiceTest, WaitUntilMigrationReachesReturnAfter
ASSERT_GTE(lastAppliedOpTime, newOpTime);
}
+// When the recipientSyncData command is called with returnAfterReachingTimestamp, mongod updates
+// the state document's rejectReadsBeforeTimestamp value and waits for w:all acknowledgment.
+// If there are two concurrent calls to the command with the same
+// returnAfterReachingTimestamp, then two threads may write the same rejectReadsBeforeTimestamp
+// value. Whichever thread writes it second is therefore a no-op, and its
+// ReplClientInfo::getLastOp() will be null. Test that the second thread detects the problem.
+TEST_F(TenantMigrationRecipientServiceTest, DuplicateReturnAfterReachingTimestamp) {
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ ThreadPool::Options threadPoolOptions;
+ threadPoolOptions.threadNamePrefix = "TenantMigrationRecipientServiceTest-";
+ threadPoolOptions.poolName = "TenantMigrationRecipientServiceTestThreadPool";
+ threadPoolOptions.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
+
+ auto executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPool>(threadPoolOptions),
+ std::make_unique<executor::NetworkInterfaceMock>());
+ executor->startup();
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock()));
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ // Skip the cloners in this test, so we provide an empty list of databases.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ instance->waitUntilMigrationReachesConsistentState(opCtx.get());
+ checkStateDocPersisted(opCtx.get(), instance.get());
+
+ // Simulate recipient receiving a donor timestamp.
+ auto returnAfterReachingTimestamp =
+ ReplicationCoordinator::get(getServiceContext())->getMyLastAppliedOpTime().getTimestamp() +
+ 1;
+
+ auto fp = globalFailPointRegistry().find("fpBeforePersistingRejectReadsBeforeTimestamp");
+ auto timesEntered = fp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+ startCapturingLogMessages();
+ auto future1 = ExecutorFuture<void>(executor).then([&]() {
+ auto innerOpCtx = makeOperationContext();
+ instance->waitUntilMigrationReachesReturnAfterReachingTimestamp(
+ innerOpCtx.get(), returnAfterReachingTimestamp);
+ });
+ auto future2 = ExecutorFuture<void>(executor).then([&]() {
+ auto innerOpCtx = makeOperationContext();
+ instance->waitUntilMigrationReachesReturnAfterReachingTimestamp(
+ innerOpCtx.get(), returnAfterReachingTimestamp);
+ });
+
+ fp->waitForTimesEntered(timesEntered + 2);
+ fp->setMode(FailPoint::off);
+ advanceTime(Seconds(10)); // Wake threads that were blocked on fp.
+ future1.wait();
+ future2.wait();
+ ASSERT_EQ(1, countBSONFormatLogLinesIsSubset(BSON("id" << 6096900)));
+ executor->shutdown();
+ executor->join();
+ stopCapturingLogMessages();
+}
+
TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesRetriableFetcherError) {
stopFailPointEnableBlock stopFp("fpAfterCollectionClonerDone");
auto fp =