diff options
-rw-r--r-- | jstests/serverless/libs/basic_serverless_test.js | 8 | ||||
-rw-r--r-- | jstests/serverless/shard_split_basic_test.js | 44 | ||||
-rw-r--r-- | jstests/serverless/shard_split_enabled.js | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.cpp | 121 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.h | 16 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service_test.cpp | 97 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_utils.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_utils.h | 24 |
9 files changed, 224 insertions, 122 deletions
diff --git a/jstests/serverless/libs/basic_serverless_test.js b/jstests/serverless/libs/basic_serverless_test.js index acf0e6e6877..9ad9ce565ac 100644 --- a/jstests/serverless/libs/basic_serverless_test.js +++ b/jstests/serverless/libs/basic_serverless_test.js @@ -10,6 +10,7 @@ class BasicServerlessTest { } stop() { + this.removeAndStopRecipientNodes(); // If we validate, it will try to list all collections and the migrated collections will // return a TenantMigrationCommitted error. this.donor.stopSet(undefined /* signal */, false /* forRestart */, {skipValidation: 1}); @@ -57,6 +58,13 @@ class BasicServerlessTest { print("Removing and stopping recipient nodes"); this.recipientNodes.forEach(node => this.donor.remove(node)); } + + /** + * Crafts a tenant database name. + */ + tenantDB(tenantId, dbName) { + return `${tenantId}_${dbName}`; + } } function findMigration(primary, uuid) { diff --git a/jstests/serverless/shard_split_basic_test.js b/jstests/serverless/shard_split_basic_test.js new file mode 100644 index 00000000000..333fda6805f --- /dev/null +++ b/jstests/serverless/shard_split_basic_test.js @@ -0,0 +1,44 @@ +/** + * + * Tests that runs a shard split to completion. + * @tags: [requires_fcv_52, featureFlagShardSplit] + */ + +load("jstests/libs/fail_point_util.js"); // for "configureFailPoint" +load('jstests/libs/parallel_shell_helpers.js'); // for "startParallelShell" +load("jstests/serverless/libs/basic_serverless_test.js"); + +(function() { +"use strict"; + +const recipientTagName = "recipientNode"; +const recipientSetName = "recipientSetName"; +const test = new BasicServerlessTest({ + recipientTagName, + recipientSetName, + nodeOptions: { + // Set a short timeout to test that the operation times out waiting for replication + setParameter: "shardSplitTimeoutMS=100000" + } +}); + +test.addRecipientNodes(); +test.donor.awaitSecondaryNodes(); + +const migrationId = UUID(); + +jsTestLog("Running the commitShardSplit operation"); +const admin = test.donor.getPrimary().getDB("admin"); +assert.commandWorked(admin.runCommand({ + commitShardSplit: 1, + migrationId, + tenantIds: ["tenant1", "tenant2"], + recipientTagName, + recipientSetName +})); + +jsTestLog("Forgetting shard split"); +assert.commandWorked(test.donor.getPrimary().adminCommand({forgetShardSplit: 1, migrationId})); + +test.stop(); +})(); diff --git a/jstests/serverless/shard_split_enabled.js b/jstests/serverless/shard_split_enabled.js index cb2075421b8..46a310b8574 100644 --- a/jstests/serverless/shard_split_enabled.js +++ b/jstests/serverless/shard_split_enabled.js @@ -79,8 +79,6 @@ function makeShardSplitTest() { 6236600, `forgetShardSplit should reject when featureFlagShardSplit is disabled`); - // shut down recipient nodes - test.removeAndStopRecipientNodes(); // shut down replica set test.stop(); }; diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 6d356487893..dd7cb413540 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -716,6 +716,11 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } const auto configToApply = statusWithConf.getValue(); + if (isSplitRecipientConfig) { + LOGV2(6309200, + "Applying a recipient split config for a shard split operation.", + "config"_attr = configToApply); + } const StatusWith<int> myIndex = validateConfigForHeartbeatReconfig( _externalState.get(), configToApply, getGlobalServiceContext()); diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index e401f565c27..1af4c492723 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -43,8 +43,10 @@ #include "mongo/executor/cancelable_executor.h" #include "mongo/executor/connection_pool.h" #include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/network_interface_thread_pool.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" +#include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/util/fail_point.h" #include "mongo/util/future_util.h" #include "mongo/util/time_support.h" @@ -114,83 +116,58 @@ const std::string kTTLIndexName = "ShardSplitDonorTTLIndex"; } // namespace namespace detail { -std::function<bool(const std::vector<sdam::ServerDescriptionPtr>&)> -makeRecipientAcceptSplitPredicate(const ConnectionString& recipientConnectionString) { - return [recipientConnectionString](const std::vector<sdam::ServerDescriptionPtr>& servers) { - auto recipientNodeCount = - static_cast<uint32_t>(recipientConnectionString.getServers().size()); - auto nodesReportingRecipientSetName = - std::count_if(servers.begin(), servers.end(), [&](const auto& server) { - return server->getSetName() && - *(server->getSetName()) == recipientConnectionString.getSetName(); - }); - - return nodesReportingRecipientSetName == recipientNodeCount; - }; -} - -SemiFuture<void> makeRecipientAcceptSplitFuture(ExecutorPtr executor, - const CancellationToken& token, - const StringData& recipientTagName, - const StringData& recipientSetName) { - class RecipientAcceptSplitListener : public sdam::TopologyListener { - public: - RecipientAcceptSplitListener(const ConnectionString& recipientConnectionString) - : _predicate(makeRecipientAcceptSplitPredicate(recipientConnectionString)) {} - void onTopologyDescriptionChangedEvent(TopologyDescriptionPtr previousDescription, - TopologyDescriptionPtr newDescription) final { - stdx::lock_guard<Latch> lg(_mutex); - if (_fulfilled) { - return; - } - - if (_predicate(newDescription->getServers())) { - _fulfilled = true; - _promise.emplaceValue(); - } - } - - // Fulfilled when all nodes have accepted the split. - SharedSemiFuture<void> getFuture() const { - return _promise.getFuture(); - } - private: - bool _fulfilled = false; - std::function<bool(const std::vector<sdam::ServerDescriptionPtr>&)> _predicate; - SharedPromise<void> _promise; - mutable Mutex _mutex = - MONGO_MAKE_LATCH("ShardSplitDonorService::getRecipientAcceptSplitFuture::_mutex"); - }; +SemiFuture<void> makeRecipientAcceptSplitFuture( + std::shared_ptr<executor::TaskExecutor> taskExecutor, + const CancellationToken& token, + const StringData& recipientTagName, + const StringData& recipientSetName) { auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); invariant(replCoord); auto recipientConnectionString = serverless::makeRecipientConnectionString( replCoord->getConfig(), recipientTagName, recipientSetName); - auto monitor = ReplicaSetMonitor::createIfNeeded(MongoURI{recipientConnectionString}); - invariant(monitor); - - // Only StreamableReplicaSetMonitor derives ReplicaSetMonitor. Therefore static cast is - // possible - auto streamableMonitor = checked_pointer_cast<StreamableReplicaSetMonitor>(monitor); - auto listener = std::make_shared<RecipientAcceptSplitListener>(recipientConnectionString); - streamableMonitor->getEventsPublisher()->registerListener(listener); + // build a vector of single server discovery monitors to listen for heartbeats + auto eventsPublisher = std::make_shared<sdam::TopologyEventsPublisher>(taskExecutor); + + auto listener = std::make_shared<mongo::serverless::RecipientAcceptSplitListener>( + recipientConnectionString); + eventsPublisher->registerListener(listener); + + auto managerStats = std::make_shared<ReplicaSetMonitorManagerStats>(); + auto stats = std::make_shared<ReplicaSetMonitorStats>(managerStats); + auto recipientNodes = recipientConnectionString.getServers(); + + std::vector<SingleServerDiscoveryMonitorPtr> monitors; + for (const auto& server : recipientNodes) { + SdamConfiguration sdamConfiguration(std::vector<HostAndPort>{server}); + auto connectionString = ConnectionString::forStandalones(std::vector<HostAndPort>{server}); + + monitors.push_back( + std::make_shared<SingleServerDiscoveryMonitor>(MongoURI{connectionString}, + server, + boost::none, + sdamConfiguration, + eventsPublisher, + taskExecutor, + stats)); + monitors.back()->init(); + } LOGV2(6142508, "Monitoring recipient nodes for split acceptance.", "recipientConnectionString"_attr = recipientConnectionString); return future_util::withCancellation(listener->getFuture(), token) - .thenRunOn(executor) + .thenRunOn(taskExecutor) // Preserve lifetime of listener and monitor until the future is fulfilled and remove the // listener. - .onCompletion([listener, monitor = streamableMonitor](Status s) { - monitor->getEventsPublisher()->removeListener(listener); - return s; - }) + .onCompletion([monitors = std::move(monitors), listener, eventsPublisher, taskExecutor]( + Status s) { return s; }) .semi(); } + } // namespace detail ThreadPool::Limits ShardSplitDonorService::getThreadPoolLimits() const { @@ -358,10 +335,16 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( .then([this, executor, abortToken] { checkForTokenInterrupt(abortToken); _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); - _createReplicaSetMonitor(executor, abortToken); + _createReplicaSetMonitor(abortToken); return _enterBlockingState(executor, abortToken); }) - .then([this] { pauseShardSplitAfterBlocking.pauseWhileSet(); }) + .then([this, abortToken] { + if (MONGO_unlikely(pauseShardSplitAfterBlocking.shouldFail())) { + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + pauseShardSplitAfterBlocking.pauseWhileSetAndNotCanceled(opCtx.get(), + abortToken); + } + }) .then([this, executor, abortToken] { return _waitForRecipientToReachBlockTimestamp(executor, abortToken); }) @@ -738,7 +721,14 @@ void ShardSplitDonorService::DonorStateMachine::_initiateTimeout( } void ShardSplitDonorService::DonorStateMachine::_createReplicaSetMonitor( - const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { + const CancellationToken& abortToken) { + { + stdx::lock_guard<Latch> lg(_mutex); + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) { + return; + } + } + auto future = [&]() { stdx::lock_guard<Latch> lg(_mutex); if (MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { // Test-only. @@ -751,7 +741,10 @@ void ShardSplitDonorService::DonorStateMachine::_createReplicaSetMonitor( invariant(recipientSetName); return detail::makeRecipientAcceptSplitFuture( - **executor, abortToken, *recipientTagName, *recipientSetName); + _shardSplitService->getInstanceCleanupExecutor(), + abortToken, + *recipientTagName, + *recipientSetName); }(); _recipientAcceptedSplit.setFrom(std::move(future).unsafeToInlineFuture()); diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h index dc69d8cb1eb..d001bba0de1 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.h +++ b/src/mongo/db/serverless/shard_split_donor_service.h @@ -41,13 +41,14 @@ namespace mongo { using ScopedTaskExecutorPtr = std::shared_ptr<executor::ScopedTaskExecutor>; namespace detail { -std::function<bool(const std::vector<sdam::ServerDescriptionPtr>&)> -makeRecipientAcceptSplitPredicate(const ConnectionString& recipientConnectionString); -SemiFuture<void> makeRecipientAcceptSplitFuture(ExecutorPtr executor, - const CancellationToken& token, - const StringData& recipientTagName, - const StringData& recipientSetName); +SemiFuture<void> makeRecipientAcceptSplitFuture( + ExecutorPtr executor, + std::shared_ptr<executor::TaskExecutor> taskExecutor, + const CancellationToken& token, + const StringData& recipientTagName, + const StringData& recipientSetName); + }; // namespace detail class ShardSplitDonorService final : public repl::PrimaryOnlyService { @@ -182,8 +183,7 @@ private: void _initiateTimeout(const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken); - void _createReplicaSetMonitor(const ScopedTaskExecutorPtr& executor, - const CancellationToken& abortToken); + void _createReplicaSetMonitor(const CancellationToken& abortToken); ExecutorFuture<DurableState> _handleErrorOrEnterAbortedState( StatusWith<DurableState> durableState, diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index 415ddbace97..069c4926fb3 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -231,6 +231,8 @@ protected: }; TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) { + FailPointEnableBlock fp("skipShardSplitWaitForSplitAcceptance"); + auto opCtx = makeOperationContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); test::shard_split::reconfigToAddRecipientNodes( @@ -279,6 +281,9 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) } TEST_F(ShardSplitDonorServiceTest, ShardSplitDonorServiceTimeout) { + FailPointEnableBlock fp("pauseShardSplitAfterBlocking"); + FailPointEnableBlock fp2("skipShardSplitWaitForSplitAcceptance"); + auto opCtx = makeOperationContext(); auto serviceContext = getServiceContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); @@ -509,75 +514,71 @@ protected: std::string _recipientSetName{_validRepl.getURI().getSetName()}; }; -TEST_F(SplitReplicaSetObserverTest, SupportsCancellation) { - test::shard_split::reconfigToAddRecipientNodes( - getServiceContext(), _recipientTagName, _validRepl.getHosts(), _recipientSet.getHosts()); - - CancellationSource source; - auto future = detail::makeRecipientAcceptSplitFuture( - _executor, source.token(), _recipientTagName, _recipientSetName); +TEST_F(SplitReplicaSetObserverTest, FutureReady) { + auto listener = + mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString()); - ASSERT_FALSE(future.isReady()); - source.cancel(); + for (const auto& host : _validRepl.getHosts()) { + ASSERT_FALSE(listener.getFuture().isReady()); + listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << _validRepl.getSetName())); + } - ASSERT_EQ(future.getNoThrow().code(), ErrorCodes::CallbackCanceled); + ASSERT_TRUE(listener.getFuture().isReady()); } -TEST_F(SplitReplicaSetObserverTest, GetRecipientAcceptSplitFutureTest) { - test::shard_split::reconfigToAddRecipientNodes( - getServiceContext(), _recipientTagName, _validRepl.getHosts(), _recipientSet.getHosts()); +TEST_F(SplitReplicaSetObserverTest, FutureReadyNameChange) { + auto listener = + mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString()); - CancellationSource source; - auto future = detail::makeRecipientAcceptSplitFuture( - _executor, source.token(), _recipientTagName, _recipientSetName); + for (const auto& host : _validRepl.getHosts()) { + listener.onServerHeartbeatSucceededEvent(host, + BSON("setName" + << "donorSetName")); + } - std::shared_ptr<TopologyDescription> topologyDescriptionOld = - std::make_shared<sdam::TopologyDescription>(sdam::SdamConfiguration()); - std::shared_ptr<TopologyDescription> topologyDescriptionNew = - makeRecipientTopologyDescription(_validRepl); + ASSERT_FALSE(listener.getFuture().isReady()); - _publisher->onTopologyDescriptionChangedEvent(topologyDescriptionOld, topologyDescriptionNew); + for (const auto& host : _validRepl.getHosts()) { + listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << _validRepl.getSetName())); + } - future.wait(); + ASSERT_TRUE(listener.getFuture().isReady()); } TEST_F(SplitReplicaSetObserverTest, FutureNotReadyMissingNodes) { - auto predicate = - detail::makeRecipientAcceptSplitPredicate(_validRepl.getURI().connectionString()); + auto listener = + mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString()); - std::shared_ptr<TopologyDescription> topologyDescriptionNew = - makeRecipientTopologyDescription(_validRepl); - topologyDescriptionNew->removeServerDescription(_validRepl.getHosts()[0]); + for (size_t i = 0; i < _validRepl.getHosts().size() - 1; ++i) { + listener.onServerHeartbeatSucceededEvent(_validRepl.getHosts()[i], + BSON("setName" << _validRepl.getSetName())); + } - ASSERT_FALSE(predicate(topologyDescriptionNew->getServers())); + ASSERT_FALSE(listener.getFuture().isReady()); } -TEST_F(SplitReplicaSetObserverTest, FutureNotReadyWrongSet) { - auto predicate = - detail::makeRecipientAcceptSplitPredicate(_validRepl.getURI().connectionString()); +TEST_F(SplitReplicaSetObserverTest, FutureNotReadyNoSetName) { + auto listener = + mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString()); - std::shared_ptr<TopologyDescription> topologyDescriptionNew = - makeRecipientTopologyDescription(_invalidRepl); + for (size_t i = 0; i < _validRepl.getHosts().size() - 1; ++i) { + listener.onServerHeartbeatSucceededEvent(_validRepl.getHosts()[i], BSONObj()); + } - ASSERT_FALSE(predicate(topologyDescriptionNew->getServers())); + ASSERT_FALSE(listener.getFuture().isReady()); } -TEST_F(SplitReplicaSetObserverTest, ExecutorCanceled) { - test::shard_split::reconfigToAddRecipientNodes( - getServiceContext(), _recipientTagName, _validRepl.getHosts(), _recipientSet.getHosts()); - - CancellationSource source; - auto future = detail::makeRecipientAcceptSplitFuture( - _executor, source.token(), _recipientTagName, _recipientSetName); - - _executor->shutdown(); - _executor->join(); +TEST_F(SplitReplicaSetObserverTest, FutureNotReadyWrongSet) { + auto listener = + mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString()); - ASSERT_FALSE(future.isReady()); + for (const auto& host : _validRepl.getHosts()) { + listener.onServerHeartbeatSucceededEvent(host, + BSON("setName" + << "wrongSetName")); + } - // Ensure the test does not hang. - source.cancel(); - ASSERT_EQ(future.getNoThrow().code(), ErrorCodes::ShutdownInProgress); + ASSERT_FALSE(listener.getFuture().isReady()); } class ShardSplitRecipientCleanupTest : public ShardSplitDonorServiceTest { diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp index f2db7aaf4d1..16e9b7f5a59 100644 --- a/src/mongo/db/serverless/shard_split_utils.cpp +++ b/src/mongo/db/serverless/shard_split_utils.cpp @@ -238,5 +238,34 @@ bool shouldRemoveStateDocumentOnRecipient(OperationContext* opCtx, stateDocument.getState() == ShardSplitDonorStateEnum::kBlocking; } +RecipientAcceptSplitListener::RecipientAcceptSplitListener( + const ConnectionString& recipientConnectionString) + : _numberOfRecipient(recipientConnectionString.getServers().size()), + _recipientSetName(recipientConnectionString.getSetName()) {} + +void RecipientAcceptSplitListener::onServerHeartbeatSucceededEvent(const HostAndPort& hostAndPort, + const BSONObj reply) { + if (_fulfilled.load() || !reply["setName"]) { + return; + } + + stdx::lock_guard<Latch> lg(_mutex); + _reportedSetNames[hostAndPort] = reply["setName"].str(); + + auto allReportCorrectly = + std::all_of(_reportedSetNames.begin(), + _reportedSetNames.end(), + [&](const auto& entry) { return entry.second == _recipientSetName; }) && + _reportedSetNames.size() == _numberOfRecipient; + if (allReportCorrectly) { + _fulfilled.store(true); + _promise.emplaceValue(); + } +} + +SharedSemiFuture<void> RecipientAcceptSplitListener::getFuture() const { + return _promise.getFuture(); +} + } // namespace serverless } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_utils.h b/src/mongo/db/serverless/shard_split_utils.h index ec2f1becd58..19978fba1da 100644 --- a/src/mongo/db/serverless/shard_split_utils.h +++ b/src/mongo/db/serverless/shard_split_utils.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/client/sdam/topology_listener.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/serverless/shard_split_state_machine_gen.h" @@ -116,6 +117,29 @@ StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx, bool shouldRemoveStateDocumentOnRecipient(OperationContext* opCtx, const ShardSplitDonorDocument& stateDocument); +/** + * Listener that receives heartbeat events and fulfills a future once it sees the expected number + * of nodes in the recipient replica set to monitor. + */ +class RecipientAcceptSplitListener : public sdam::TopologyListener { +public: + RecipientAcceptSplitListener(const ConnectionString& recipientConnectionString); + + void onServerHeartbeatSucceededEvent(const HostAndPort& hostAndPort, BSONObj reply) final; + + // Fulfilled when all nodes have accepted the split. + SharedSemiFuture<void> getFuture() const; + +private: + mutable Mutex _mutex = + MONGO_MAKE_LATCH("ShardSplitDonorService::getRecipientAcceptSplitFuture::_mutex"); + + AtomicWord<bool> _fulfilled{false}; + const size_t _numberOfRecipient; + std::string _recipientSetName; + std::map<HostAndPort, std::string> _reportedSetNames; + SharedPromise<void> _promise; +}; } // namespace serverless } // namespace mongo |