From c78ba79626722ae69ea9b64762ffd1dc075ce960 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Sat, 2 Apr 2022 11:56:52 +0000 Subject: SERVER-65059 Store recipient connection string in state document --- .../db/repl/primary_only_service_test_fixture.cpp | 17 -- .../db/repl/primary_only_service_test_fixture.h | 2 - src/mongo/db/s/add_shard_cmd.idl | 10 +- .../db/serverless/shard_split_donor_service.cpp | 137 +++++------ .../db/serverless/shard_split_donor_service.h | 53 ++--- .../serverless/shard_split_donor_service_test.cpp | 257 ++++++++++++--------- .../db/serverless/shard_split_state_machine.idl | 6 + src/mongo/idl/basic_types.idl | 7 + 8 files changed, 248 insertions(+), 241 deletions(-) diff --git a/src/mongo/db/repl/primary_only_service_test_fixture.cpp b/src/mongo/db/repl/primary_only_service_test_fixture.cpp index 70f7453cf9e..cb21b22f243 100644 --- a/src/mongo/db/repl/primary_only_service_test_fixture.cpp +++ b/src/mongo/db/repl/primary_only_service_test_fixture.cpp @@ -124,22 +124,5 @@ void PrimaryOnlyServiceMongoDTest::stepDown() { _registry->onStepDown(); } -std::shared_ptr makeTestExecutor() { - ThreadPool::Options threadPoolOptions; - threadPoolOptions.threadNamePrefix = "PrimaryOnlyServiceTest-"; - threadPoolOptions.poolName = "PrimaryOnlyServiceTestThreadPool"; - threadPoolOptions.onCreateThread = [](const std::string& threadName) { - Client::initThread(threadName.c_str()); - }; - - auto hookList = std::make_unique(); - auto executor = std::make_shared( - std::make_unique(threadPoolOptions), - executor::makeNetworkInterface( - "PrimaryOnlyServiceTestNetwork", nullptr, std::move(hookList))); - executor->startup(); - return executor; -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/primary_only_service_test_fixture.h b/src/mongo/db/repl/primary_only_service_test_fixture.h index 8ca3dae3bb2..906cbd15297 100644 --- a/src/mongo/db/repl/primary_only_service_test_fixture.h +++ b/src/mongo/db/repl/primary_only_service_test_fixture.h @@ -81,7 +81,5 @@ protected: long long _term = 0; }; -std::shared_ptr makeTestExecutor(); - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/s/add_shard_cmd.idl b/src/mongo/db/s/add_shard_cmd.idl index b57a2edc14d..4955d8e57a5 100644 --- a/src/mongo/db/s/add_shard_cmd.idl +++ b/src/mongo/db/s/add_shard_cmd.idl @@ -33,14 +33,6 @@ global: imports: - "mongo/idl/basic_types.idl" -types: - connectionstring: - bson_serialization_type: string - description: "A MongoDB ConnectionString" - cpp_type: "mongo::ConnectionString" - serializer: mongo::ConnectionString::toString - deserializer: mongo::ConnectionString::deserialize - structs: ShardIdentity: description: "Contains all the information needed to identify a shard and lookup the shard identity document from storage" @@ -53,7 +45,7 @@ structs: type: objectid configsvrConnectionString: description: "Connection string to the config server" - type: connectionstring + type: connection_string commands: _addShard: diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index d240f9c5496..defb74e9a3c 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -116,15 +116,8 @@ void insertTenantAccessBlocker(WithLock lk, ShardSplitDonorDocument donorStateDoc) { auto optionalTenants = donorStateDoc.getTenantIds(); invariant(optionalTenants); - - auto recipientTagName = donorStateDoc.getRecipientTagName(); - auto recipientSetName = donorStateDoc.getRecipientSetName(); - invariant(recipientTagName); - invariant(recipientSetName); - - auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); - auto recipientConnectionString = - serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName); + auto recipientConnectionString = donorStateDoc.getRecipientConnectionString(); + invariant(recipientConnectionString); for (const auto& tenantId : optionalTenants.get()) { auto mtab = std::make_shared( @@ -132,7 +125,7 @@ void insertTenantAccessBlocker(WithLock lk, donorStateDoc.getId(), tenantId.toString(), MigrationProtocolEnum::kMultitenantMigrations, - recipientConnectionString.toString()); + recipientConnectionString->toString()); TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenantId, mtab); @@ -154,13 +147,7 @@ namespace detail { SemiFuture makeRecipientAcceptSplitFuture( std::shared_ptr taskExecutor, const CancellationToken& token, - const StringData& recipientTagName, - const StringData& recipientSetName) { - - auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); - invariant(replCoord); - auto recipientConnectionString = serverless::makeRecipientConnectionString( - replCoord->getConfig(), recipientTagName, recipientSetName); + const ConnectionString& recipientConnectionString) { // build a vector of single server discovery monitors to listen for heartbeats auto eventsPublisher = std::make_shared(taskExecutor); @@ -257,6 +244,8 @@ ExecutorFuture ShardSplitDonorService::_rebuildService( return _createStateDocumentTTLIndex(executor, token); } +boost::optional + ShardSplitDonorService::DonorStateMachine::_splitAcceptanceTaskExecutorForTest; ShardSplitDonorService::DonorStateMachine::DonorStateMachine( ServiceContext* serviceContext, ShardSplitDonorService* splitService, @@ -383,15 +372,13 @@ SemiFuture ShardSplitDonorService::DonorStateMachine::run( "id"_attr = _migrationId, "timeout"_attr = repl::shardSplitTimeoutMS.load()); - _createReplicaSetMonitor(abortToken); - _decisionPromise.setWith([&] { return ExecutorFuture(**executor) .then([this, executor, primaryToken] { // Note we do not use the abort split token here because the abortShardSplit // command waits for a decision to be persisted which will not happen if // inserting the initial state document fails. - return _writeInitialDocument(executor, primaryToken); + return _enterBlockingOrAbortedState(executor, primaryToken); }) .then([this, executor, abortToken] { checkForTokenInterrupt(abortToken); @@ -561,7 +548,7 @@ ExecutorFuture ShardSplitDonorService::DonorStateMachine::_waitForRecipien } return ExecutorFuture(**executor) - .then([&]() { return _recipientAcceptedSplit.getFuture(); }) + .then([&]() { return _splitAcceptancePromise.getFuture(); }) .then([this, executor, token] { LOGV2(6142503, "Recipient has accepted the split, committing decision.", @@ -574,13 +561,12 @@ ExecutorFuture ShardSplitDonorService::DonorStateMachine::_waitForRecipien }); } -ExecutorFuture ShardSplitDonorService::DonorStateMachine::_writeInitialDocument( +ExecutorFuture ShardSplitDonorService::DonorStateMachine::_enterBlockingOrAbortedState( const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryServiceToken) { ShardSplitDonorStateEnum nextState; { stdx::lock_guard lg(_mutex); - if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) { if (isAbortedDocumentPersistent(lg, _stateDoc)) { // Node has step up and created an instance using a document in abort state. No need @@ -594,14 +580,42 @@ ExecutorFuture ShardSplitDonorService::DonorStateMachine::_writeInitialDoc _abortReason->serializeErrorToBSON(&bob); _stateDoc.setAbortReason(bob.obj()); nextState = ShardSplitDonorStateEnum::kAborted; + } else { + auto recipientTagName = _stateDoc.getRecipientTagName(); + invariant(recipientTagName); + auto recipientSetName = _stateDoc.getRecipientSetName(); + invariant(recipientSetName); + auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); + auto recipientConnectionString = serverless::makeRecipientConnectionString( + config, *recipientTagName, *recipientSetName); + + // Always start the replica set monitor if we haven't reached a decision yet + _splitAcceptancePromise.setWith([&]() -> Future { + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking || + MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { + return SemiFuture::makeReady().unsafeToInlineFuture(); + } + + // Optionally select a task executor for unit testing + auto executor = _splitAcceptanceTaskExecutorForTest + ? *_splitAcceptanceTaskExecutorForTest + : _shardSplitService->getInstanceCleanupExecutor(); + + return detail::makeRecipientAcceptSplitFuture( + executor, primaryServiceToken, recipientConnectionString) + .unsafeToInlineFuture(); + }); + + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) { + // Node has step up and resumed a shard split. No need to write the document as it + // already exists. + return ExecutorFuture(**executor); + } - } else if (_stateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized) { + // Otherwise, record the recipient connection string + _stateDoc.setRecipientConnectionString(recipientConnectionString); _stateDoc.setState(ShardSplitDonorStateEnum::kBlocking); nextState = ShardSplitDonorStateEnum::kBlocking; - } else { - // Node has step up and resumed a shard split. No need to write the document as it - // already exists. - return ExecutorFuture(**executor); } } @@ -756,36 +770,6 @@ void ShardSplitDonorService::DonorStateMachine::_initiateTimeout( .semi(); } -void ShardSplitDonorService::DonorStateMachine::_createReplicaSetMonitor( - const CancellationToken& abortToken) { - { - stdx::lock_guard lg(_mutex); - if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) { - return; - } - } - - auto future = [&]() { - stdx::lock_guard lg(_mutex); - if (MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { // Test-only. - return SemiFuture::makeReady(); - } - - auto recipientTagName = _stateDoc.getRecipientTagName(); - auto recipientSetName = _stateDoc.getRecipientSetName(); - invariant(recipientTagName); - invariant(recipientSetName); - - return detail::makeRecipientAcceptSplitFuture( - _shardSplitService->getInstanceCleanupExecutor(), - abortToken, - *recipientTagName, - *recipientSetName); - }(); - - _recipientAcceptedSplit.setFrom(std::move(future).unsafeToInlineFuture()); -} - ExecutorFuture ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( StatusWith statusWithState, @@ -846,26 +830,6 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( }); } -ExecutorFuture -ShardSplitDonorService::DonorStateMachine::_markStateDocAsGarbageCollectable( - std::shared_ptr executor, const CancellationToken& token) { - stdx::lock_guard lg(_mutex); - - _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + - Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}); - - return AsyncTry([this, self = shared_from_this()] { - auto opCtxHolder = cc().makeOperationContext(); - auto opCtx = opCtxHolder.get(); - - uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc)); - return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - }) - .until([](StatusWith swOpTime) { return swOpTime.getStatus().isOK(); }) - .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, token); -} - ExecutorFuture ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectible( const ScopedTaskExecutorPtr& executor, const CancellationToken& token) { @@ -888,7 +852,22 @@ ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageColle LOGV2(6236606, "Marking shard split as garbage-collectable.", "migrationId"_attr = _migrationId); - return _markStateDocAsGarbageCollectable(executor, token); + + stdx::lock_guard lg(_mutex); + _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + + Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}); + + return AsyncTry([this, self = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc)); + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + }) + .until( + [](StatusWith swOpTime) { return swOpTime.getStatus().isOK(); }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**executor, token); }) .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { return _waitForMajorityWriteConcern(executor, std::move(opTime), token); diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h index 6af6d79ad49..43759fa5dee 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.h +++ b/src/mongo/db/serverless/shard_split_donor_service.h @@ -38,19 +38,9 @@ namespace mongo { +using TaskExecutorPtr = std::shared_ptr; using ScopedTaskExecutorPtr = std::shared_ptr; -namespace detail { - -SemiFuture makeRecipientAcceptSplitFuture( - ExecutorPtr executor, - std::shared_ptr taskExecutor, - const CancellationToken& token, - const StringData& recipientTagName, - const StringData& recipientSetName); - -}; // namespace detail - class ShardSplitDonorService final : public repl::PrimaryOnlyService { public: static constexpr StringData kServiceName = "ShardSplitDonorService"_sd; @@ -150,8 +140,18 @@ public: return !!_stateDoc.getExpireAt(); } + /** + * Only used for testing. Allows settinga custom task executor for observing split acceptance. + */ + static void setSplitAcceptanceTaskExecutor_forTest(TaskExecutorPtr taskExecutor) { + _splitAcceptanceTaskExecutorForTest = taskExecutor; + } + private: // Tasks + ExecutorFuture _enterBlockingOrAbortedState(const ScopedTaskExecutorPtr& executor, + const CancellationToken& token); + ExecutorFuture _waitForRecipientToReachBlockTimestamp( const ScopedTaskExecutorPtr& executor, const CancellationToken& token); @@ -161,10 +161,16 @@ private: ExecutorFuture _waitForRecipientToAcceptSplit(const ScopedTaskExecutorPtr& executor, const CancellationToken& token); - // Helpers - ExecutorFuture _writeInitialDocument(const ScopedTaskExecutorPtr& executor, - const CancellationToken& token); + ExecutorFuture _waitForForgetCmdThenMarkGarbageCollectible( + const ScopedTaskExecutorPtr& executor, const CancellationToken& token); + + ExecutorFuture _handleErrorOrEnterAbortedState( + StatusWith durableState, + const ScopedTaskExecutorPtr& executor, + const CancellationToken& instanceAbortToken, + const CancellationToken& abortToken); + // Helpers ExecutorFuture _updateStateDocument(const ScopedTaskExecutorPtr& executor, const CancellationToken& token, ShardSplitDonorStateEnum nextState); @@ -176,20 +182,6 @@ private: void _initiateTimeout(const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken); - void _createReplicaSetMonitor(const CancellationToken& abortToken); - - ExecutorFuture _handleErrorOrEnterAbortedState( - StatusWith durableState, - const ScopedTaskExecutorPtr& executor, - const CancellationToken& instanceAbortToken, - const CancellationToken& abortToken); - - ExecutorFuture _markStateDocAsGarbageCollectable( - std::shared_ptr executor, const CancellationToken& token); - - ExecutorFuture _waitForForgetCmdThenMarkGarbageCollectible( - const ScopedTaskExecutorPtr& executor, const CancellationToken& token); - /* * We need to call this method when we find out the replica set name is the same as the state * doc recipient set name and the current state doc state is blocking. @@ -223,10 +215,13 @@ private: SharedPromise _completionPromise; // A promise fulfilled when all recipient nodes have accepted the split. - SharedPromise _recipientAcceptedSplit; + SharedPromise _splitAcceptancePromise; // A promise fulfilled when tryForget is called. SharedPromise _forgetShardSplitReceivedPromise; + + // A task executor used for the split acceptance future in tests + static boost::optional _splitAcceptanceTaskExecutorForTest; }; } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index 65abd349de0..36a2b03c4a5 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -200,8 +200,22 @@ public: ClockSourceMock clockSource; clockSource.advance(Milliseconds(1000)); - // Fake replSet just for creating consistent URI for monitor - _rsmMonitor.setup(_replSet.getURI()); + // setup mock networking for split acceptance + auto network = std::make_unique(); + _net = network.get(); + _executor = std::make_shared( + std::make_unique( + _net, 1, executor::ThreadPoolMock::Options{}), + std::move(network)); + _executor->startup(); + } + + void tearDown() override { + _net->exitNetwork(); + _executor->shutdown(); + _executor->join(); + + repl::PrimaryOnlyServiceMongoDTest::tearDown(); } protected: @@ -227,24 +241,80 @@ protected: "recipientSetForTest", 3, true /* hasPrimary */, false /* dollarPrefixHosts */}; const NamespaceString _nss{"testDB2", "testColl2"}; std::vector _tenantIds = {"tenant1", "tenantAB"}; - StreamableReplicaSetMonitorForTesting _rsmMonitor; std::string _recipientTagName{"$recipientNode"}; - std::string _recipientSetName{_replSet.getURI().getSetName()}; - FailPointEnableBlock _skipAcceptanceFP{"skipShardSplitWaitForSplitAcceptance"}; + std::string _recipientSetName{_recipientSet.getURI().getSetName()}; + + std::unique_ptr _skipAcceptanceFP = + std::make_unique("skipShardSplitWaitForSplitAcceptance"); + + + // for mocking split acceptance + executor::NetworkInterfaceMock* _net; + TaskExecutorPtr _executor; }; +executor::RemoteCommandRequest assertRemoteCommandNameEquals( + StringData cmdName, const executor::RemoteCommandRequest& request) { + auto&& cmdObj = request.cmdObj; + ASSERT_FALSE(cmdObj.isEmpty()); + if (cmdName != cmdObj.firstElementFieldName()) { + std::string msg = str::stream() + << "Expected command name \"" << cmdName << "\" in remote command request but found \"" + << cmdObj.firstElementFieldName() << "\" instead: " << request.toString(); + FAIL(msg); + } + return request; +} + +void processHelloRequest(executor::NetworkInterfaceMock* net, MockReplicaSet* replSet) { + ASSERT(net->hasReadyRequests()); + net->runReadyNetworkOperations(); + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + + assertRemoteCommandNameEquals("isMaster", request); + auto requestHost = request.target.toString(); + const auto node = replSet->getNode(requestHost); + if (node->isRunning()) { + const auto opmsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj); + const auto reply = node->runCommand(request.id, opmsg)->getCommandReply(); + net->scheduleSuccessfulResponse(noi, + executor::RemoteCommandResponse(reply, Milliseconds(0))); + } else { + net->scheduleErrorResponse(noi, Status(ErrorCodes::HostUnreachable, "")); + } +} + +void waitForHelloRequest(executor::NetworkInterfaceMock* net) { + while (!net->hasReadyRequests()) { + net->advanceTime(net->now() + Milliseconds{1}); + } +} + TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) { auto opCtx = makeOperationContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); test::shard_split::reconfigToAddRecipientNodes( getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); + ShardSplitDonorService::DonorStateMachine::setSplitAcceptanceTaskExecutor_forTest(_executor); + _skipAcceptanceFP.reset(); + // Create and start the instance. auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( opCtx.get(), _service, defaultStateDocument().toBSON()); ASSERT(serviceInstance.get()); ASSERT_EQ(_uuid, serviceInstance->getId()); + // Wait for monitors to start, and enqueue successfull hello responses + _net->enterNetwork(); + waitForHelloRequest(_net); + processHelloRequest(_net, &_recipientSet); + processHelloRequest(_net, &_recipientSet); + processHelloRequest(_net, &_recipientSet); + _net->runReadyNetworkOperations(); + _net->exitNetwork(); + auto decisionFuture = serviceInstance->decisionFuture(); decisionFuture.wait(); @@ -459,63 +529,28 @@ TEST_F(ShardSplitDonorServiceTest, AbortDueToRecipientNodesValidation) { ASSERT_TRUE(!serviceInstance->isGarbageCollectable()); } -class SplitReplicaSetObserverTest : public ServiceContextTest { -public: - void setUp() override { - ServiceContextTest::setUp(); - - // we need a mock replication coordinator in order to identify recipient nodes - auto serviceContext = getServiceContext(); - auto replCoord = std::make_unique(serviceContext); - repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord)); - - _rsmMonitor.setup(_validRepl.getURI()); - _otherRsmMonitor.setup(_invalidRepl.getURI()); - - _executor = repl::makeTestExecutor(); - - // Retrieve monitor installed by _rsmMonitor.setup(...) - auto monitor = checked_pointer_cast( - ReplicaSetMonitor::createIfNeeded(_validRepl.getURI())); - invariant(monitor); - _publisher = monitor->getEventsPublisher(); - } - -protected: - MockReplicaSet _validRepl{ - "replInScope", 3, true /* hasPrimary */, false /* dollarPrefixHosts */}; - MockReplicaSet _recipientSet{ - "recipientReplSet", 3, true /* hasPrimary */, false /* dollarPrefixHosts */}; - MockReplicaSet _invalidRepl{ - "replNotInScope", 3, true /* hasPrimary */, false /* dollarPrefixHosts */}; - - StreamableReplicaSetMonitorForTesting _rsmMonitor; - StreamableReplicaSetMonitorForTesting _otherRsmMonitor; - std::shared_ptr _executor; - std::shared_ptr _publisher; - std::string _recipientTagName{"$recipientNode"}; - std::string _recipientSetName{_validRepl.getURI().getSetName()}; -}; - -TEST_F(SplitReplicaSetObserverTest, FutureReady) { +TEST(RecipientAcceptSplitListenerTest, FutureReady) { + MockReplicaSet donor{"donor", 3, true /* hasPrimary */, false /* dollarPrefixHosts */}; auto listener = - mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString()); + mongo::serverless::RecipientAcceptSplitListener(donor.getURI().connectionString()); - for (const auto& host : _validRepl.getHosts()) { + for (const auto& host : donor.getHosts()) { ASSERT_FALSE(listener.getFuture().isReady()); - listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << _validRepl.getSetName())); + listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << donor.getSetName())); } + ASSERT_FALSE(listener.getFuture().isReady()); + listener.onServerHeartbeatSucceededEvent( - _validRepl.getHosts().front(), - BSON("setName" << _validRepl.getSetName() << "ismaster" << true)); + donor.getHosts().front(), BSON("setName" << donor.getSetName() << "ismaster" << true)); ASSERT_TRUE(listener.getFuture().isReady()); } -TEST_F(SplitReplicaSetObserverTest, FutureReadyNameChange) { +TEST(RecipientAcceptSplitListenerTest, FutureReadyNameChange) { + MockReplicaSet donor{"donor", 3, true /* hasPrimary */, false /* dollarPrefixHosts */}; auto listener = - mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString()); + mongo::serverless::RecipientAcceptSplitListener(donor.getURI().connectionString()); - for (const auto& host : _validRepl.getHosts()) { + for (const auto& host : donor.getHosts()) { listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << "donorSetName")); @@ -523,44 +558,54 @@ TEST_F(SplitReplicaSetObserverTest, FutureReadyNameChange) { ASSERT_FALSE(listener.getFuture().isReady()); - for (const auto& host : _validRepl.getHosts()) { - listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << _validRepl.getSetName())); + for (const auto& host : donor.getHosts()) { + listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << donor.getSetName())); } listener.onServerHeartbeatSucceededEvent( - _validRepl.getHosts().front(), - BSON("setName" << _validRepl.getSetName() << "ismaster" << true)); + donor.getHosts().front(), BSON("setName" << donor.getSetName() << "ismaster" << true)); ASSERT_TRUE(listener.getFuture().isReady()); } -TEST_F(SplitReplicaSetObserverTest, FutureNotReadyMissingNodes) { +TEST(RecipientAcceptSplitListenerTest, FutureNotReadyMissingNodes) { + MockReplicaSet donor{"donor", 3, false /* hasPrimary */, false /* dollarPrefixHosts */}; auto listener = - mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString()); + mongo::serverless::RecipientAcceptSplitListener(donor.getURI().connectionString()); - for (size_t i = 0; i < _validRepl.getHosts().size() - 1; ++i) { - listener.onServerHeartbeatSucceededEvent(_validRepl.getHosts()[i], - BSON("setName" << _validRepl.getSetName())); + + for (size_t i = 0; i < donor.getHosts().size() - 1; ++i) { + listener.onServerHeartbeatSucceededEvent(donor.getHosts()[i], + BSON("setName" << donor.getSetName())); } ASSERT_FALSE(listener.getFuture().isReady()); + listener.onServerHeartbeatSucceededEvent(donor.getHosts()[donor.getHosts().size() - 1], + BSON("setName" << donor.getSetName())); + ASSERT_FALSE(listener.getFuture().isReady()); + donor.setPrimary(donor.getHosts()[0].host()); + listener.onServerHeartbeatSucceededEvent( + donor.getHosts()[0], BSON("setName" << donor.getSetName() << "ismaster" << true)); + ASSERT_TRUE(listener.getFuture().isReady()); } -TEST_F(SplitReplicaSetObserverTest, FutureNotReadyNoSetName) { +TEST(RecipientAcceptSplitListenerTest, FutureNotReadyNoSetName) { + MockReplicaSet donor{"donor", 3, true /* hasPrimary */, false /* dollarPrefixHosts */}; auto listener = - mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString()); + mongo::serverless::RecipientAcceptSplitListener(donor.getURI().connectionString()); - for (size_t i = 0; i < _validRepl.getHosts().size() - 1; ++i) { - listener.onServerHeartbeatSucceededEvent(_validRepl.getHosts()[i], BSONObj()); + for (size_t i = 0; i < donor.getHosts().size() - 1; ++i) { + listener.onServerHeartbeatSucceededEvent(donor.getHosts()[i], BSONObj()); } ASSERT_FALSE(listener.getFuture().isReady()); } -TEST_F(SplitReplicaSetObserverTest, FutureNotReadyWrongSet) { +TEST(RecipientAcceptSplitListenerTest, FutureNotReadyWrongSet) { + MockReplicaSet donor{"donor", 3, true /* hasPrimary */, false /* dollarPrefixHosts */}; auto listener = - mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString()); + mongo::serverless::RecipientAcceptSplitListener(donor.getURI().connectionString()); - for (const auto& host : _validRepl.getHosts()) { + for (const auto& host : donor.getHosts()) { listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << "wrongSetName")); @@ -569,36 +614,6 @@ TEST_F(SplitReplicaSetObserverTest, FutureNotReadyWrongSet) { ASSERT_FALSE(listener.getFuture().isReady()); } -class ShardSplitPersistenceTest : public ShardSplitDonorServiceTest { -public: - void setUpPersistence(OperationContext* opCtx) override { - - // We need to allow writes during the test's setup. - auto replCoord = dynamic_cast( - repl::ReplicationCoordinator::get(opCtx->getServiceContext())); - replCoord->alwaysAllowWrites(true); - - replCoord->setGetConfigReturnValue(initialDonorConfig()); - - _recStateDoc = initialStateDocument(); - uassertStatusOK(serverless::insertStateDoc(opCtx, _recStateDoc)); - - _pauseBeforeRecipientCleanupFp = - std::make_unique("pauseShardSplitBeforeRecipientCleanup"); - - _initialTimesEntered = _pauseBeforeRecipientCleanupFp->initialTimesEntered(); - } - - virtual repl::ReplSetConfig initialDonorConfig() = 0; - - virtual ShardSplitDonorDocument initialStateDocument() = 0; - -protected: - ShardSplitDonorDocument _recStateDoc; - std::unique_ptr _pauseBeforeRecipientCleanupFp; - FailPoint::EntryCountT _initialTimesEntered; -}; - TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) { auto opCtx = makeOperationContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); @@ -624,6 +639,7 @@ TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) { ASSERT_FALSE(result.isOK()); ASSERT_EQ(ErrorCodes::InterruptedDueToReplStateChange, result.getStatus().code()); + // verify that the state document exists ASSERT_OK(serverless::getStateDocument(opCtx.get(), _uuid).getStatus()); auto fp = std::make_unique("pauseShardSplitAfterBlocking"); @@ -633,20 +649,51 @@ TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) { fp->failPoint()->waitForTimesEntered(initialTimesEntered + 1); - std::shared_ptr serviceInstance = - ShardSplitDonorService::DonorStateMachine::getOrCreate( - opCtx.get(), _service, defaultStateDocument().toBSON()); - ASSERT(serviceInstance.get()); - + // verify that the state document exists ASSERT_OK(serverless::getStateDocument(opCtx.get(), _uuid).getStatus()); + auto donor = ShardSplitDonorService::DonorStateMachine::lookup( + opCtx.get(), _service, BSON("_id" << _uuid)); + ASSERT(donor); fp.reset(); - ASSERT_OK(serviceInstance->decisionFuture().getNoThrow().getStatus()); + ASSERT_OK((*donor)->decisionFuture().getNoThrow().getStatus()); - serviceInstance->tryForget(); + (*donor)->tryForget(); + ASSERT_OK((*donor)->completionFuture().getNoThrow()); + ASSERT_TRUE((*donor)->isGarbageCollectable()); } +class ShardSplitPersistenceTest : public ShardSplitDonorServiceTest { +public: + void setUpPersistence(OperationContext* opCtx) override { + + // We need to allow writes during the test's setup. + auto replCoord = dynamic_cast( + repl::ReplicationCoordinator::get(opCtx->getServiceContext())); + replCoord->alwaysAllowWrites(true); + + replCoord->setGetConfigReturnValue(initialDonorConfig()); + + _recStateDoc = initialStateDocument(); + uassertStatusOK(serverless::insertStateDoc(opCtx, _recStateDoc)); + + _pauseBeforeRecipientCleanupFp = + std::make_unique("pauseShardSplitBeforeRecipientCleanup"); + + _initialTimesEntered = _pauseBeforeRecipientCleanupFp->initialTimesEntered(); + } + + virtual repl::ReplSetConfig initialDonorConfig() = 0; + + virtual ShardSplitDonorDocument initialStateDocument() = 0; + +protected: + ShardSplitDonorDocument _recStateDoc; + std::unique_ptr _pauseBeforeRecipientCleanupFp; + FailPoint::EntryCountT _initialTimesEntered; +}; + class ShardSplitRecipientCleanupTest : public ShardSplitPersistenceTest { public: repl::ReplSetConfig initialDonorConfig() override { diff --git a/src/mongo/db/serverless/shard_split_state_machine.idl b/src/mongo/db/serverless/shard_split_state_machine.idl index 81cef079bff..8aa65017c1b 100644 --- a/src/mongo/db/serverless/shard_split_state_machine.idl +++ b/src/mongo/db/serverless/shard_split_state_machine.idl @@ -61,6 +61,12 @@ structs: type: string description: "The replica set tag that identifies recipient nodes." optional: true + recipientConnectionString: + type: connection_string + description: >- + The connection string generated by the donor which it uses to + monitor the recipient for split acceptance. + optional: true tenantIds: type: array optional: true diff --git a/src/mongo/idl/basic_types.idl b/src/mongo/idl/basic_types.idl index b635ca97b0e..d124f29cdae 100644 --- a/src/mongo/idl/basic_types.idl +++ b/src/mongo/idl/basic_types.idl @@ -225,6 +225,13 @@ types: serializer: mongo::NamespaceString::toString deserializer: mongo::NamespaceString + connection_string: + bson_serialization_type: string + description: "A MongoDB ConnectionString" + cpp_type: "mongo::ConnectionString" + serializer: mongo::ConnectionString::toString + deserializer: mongo::ConnectionString::deserialize + fcv_string: bson_serialization_type: string description: >- -- cgit v1.2.1