diff options
author | Matt Broadstone <mbroadst@mongodb.com> | 2022-02-04 13:23:59 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-04 13:54:45 +0000 |
commit | 2e878f0d1f4234199fe0602878de0ac1ce7aa0cc (patch) | |
tree | 727c8804b0c6258af883f87a6a957eb37275da25 /src/mongo/db/serverless | |
parent | 08ca09bda76f5d1082b7d8fea5e874c1113a5542 (diff) | |
download | mongo-2e878f0d1f4234199fe0602878de0ac1ce7aa0cc.tar.gz |
SERVER-62482 Build recipient connection string from tag/set name
Diffstat (limited to 'src/mongo/db/serverless')
-rw-r--r-- | src/mongo/db/serverless/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_commands.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_commands.idl | 9 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_op_observer.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp | 99 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.cpp | 74 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.h | 11 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service_test.cpp | 219 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_state_machine.idl | 10 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_test_utils.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_test_utils.h | 23 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_utils.cpp | 120 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_utils.h | 70 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_utils_test.cpp | 176 |
14 files changed, 561 insertions, 317 deletions
diff --git a/src/mongo/db/serverless/SConscript b/src/mongo/db/serverless/SConscript index 1c8fb9c93a8..3d56f201203 100644 --- a/src/mongo/db/serverless/SConscript +++ b/src/mongo/db/serverless/SConscript @@ -50,9 +50,11 @@ env.Library( source=[ 'shard_split_donor_service.cpp', 'shard_split_donor_op_observer.cpp', + 'shard_split_utils.cpp' ], LIBDEPS=[ '$BUILD_DIR/mongo/db/repl/primary_only_service', + '$BUILD_DIR/mongo/db/repl/replica_set_messages', 'shard_split_state_machine', ], LIBDEPS_PRIVATE=[ @@ -70,6 +72,7 @@ env.CppUnitTest( source=[ 'shard_split_donor_op_observer_test.cpp', 'shard_split_donor_service_test.cpp', + 'shard_split_utils_test.cpp', 'shard_split_test_utils.cpp', ], LIBDEPS=[ diff --git a/src/mongo/db/serverless/shard_split_commands.cpp b/src/mongo/db/serverless/shard_split_commands.cpp index d18e1bd2c45..32143d5bb4c 100644 --- a/src/mongo/db/serverless/shard_split_commands.cpp +++ b/src/mongo/db/serverless/shard_split_commands.cpp @@ -61,7 +61,8 @@ public: const auto& cmd = request(); auto stateDoc = ShardSplitDonorDocument(cmd.getMigrationId()); stateDoc.setTenantIds(cmd.getTenantIds()); - stateDoc.setRecipientConnectionString(cmd.getRecipientConnectionString()); + stateDoc.setRecipientTagName(cmd.getRecipientTagName()); + stateDoc.setRecipientSetName(cmd.getRecipientSetName()); opCtx->setAlwaysInterruptAtStepDownOrUp(); diff --git a/src/mongo/db/serverless/shard_split_commands.idl b/src/mongo/db/serverless/shard_split_commands.idl index dbd0dae05a7..f7aa6b1cf98 100644 --- a/src/mongo/db/serverless/shard_split_commands.idl +++ b/src/mongo/db/serverless/shard_split_commands.idl @@ -59,11 +59,12 @@ commands: migrationId: description: "Unique identifier for the shard split operation." type: uuid - recipientConnectionString: - description: "The connection string for the recipient slice." + recipientSetName: + description: "The replica set name for the recipient." + type: string + recipientTagName: + description: "The replica set tag that identifies recipient nodes." type: string - validator: - callback: "tenant_migration_util::validateConnectionString" tenantIds: description: "The identifier for the list of tenants being migrated." type: array<string> diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp index ee64dc49a27..602e3f46ff9 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp @@ -33,6 +33,7 @@ #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/serverless/shard_split_donor_op_observer.h" #include "mongo/db/serverless/shard_split_state_machine_gen.h" +#include "mongo/db/serverless/shard_split_utils.h" namespace mongo { namespace { @@ -88,12 +89,21 @@ void onBlockerInitialization(OperationContext* opCtx, auto optionalTenants = donorStateDoc.getTenantIds(); invariant(optionalTenants); + auto recipientTagName = donorStateDoc.getRecipientTagName(); + auto recipientSetName = donorStateDoc.getRecipientSetName(); + invariant(recipientTagName); + invariant(recipientSetName); + + auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); + auto recipientConnectionString = + repl::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName); + for (const auto& tenantId : optionalTenants.get()) { auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>( opCtx->getServiceContext(), tenantId.toString(), MigrationProtocolEnum::kMultitenantMigrations, - donorStateDoc.getRecipientConnectionString()->toString()); + recipientConnectionString.toString()); TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenantId, mtab); diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp index 28da14e4954..8c4efbf629b 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp @@ -56,15 +56,9 @@ void startBlockingWrites( } } -/** - * This test suite validates that when the default OpObserver chain is set up (which happens to - * include the ShardingMongodOpObserver), writes to the 'admin.system.version' collection (and the - * shardIdentity document specifically) will invoke the sharding initialization code. - */ class ShardSplitDonorOpObserverTest : public ServiceContextMongoDTest { public: void setUp() override { - // Set up mongod. ServiceContextMongoDTest::setUp(); auto service = getServiceContext(); @@ -99,12 +93,12 @@ public: protected: void runInsertTestCase( - ShardSplitDonorDocument document, - std::vector<std::string> tenants, + ShardSplitDonorDocument stateDocument, + const std::vector<std::string>& tenants, std::function<void(std::shared_ptr<TenantMigrationAccessBlocker>)> mtabVerifier) { std::vector<InsertStatement> inserts; - inserts.emplace_back(_oplogSlot++, document.toBSON()); + inserts.emplace_back(_oplogSlot++, stateDocument.toBSON()); WriteUnitOfWork wow(_opCtx.get()); _observer->onInserts(_opCtx.get(), _nss, _uuid, inserts.begin(), inserts.end(), false); @@ -114,8 +108,8 @@ protected: } void runUpdateTestCase( - ShardSplitDonorDocument document, - std::vector<std::string> tenants, + ShardSplitDonorDocument stateDocument, + const std::vector<std::string>& tenants, std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>> blockers, std::function<void(std::shared_ptr<TenantMigrationAccessBlocker>)> mtabVerifier) { ASSERT_EQ(tenants.size(), blockers.size()); @@ -131,12 +125,12 @@ protected: CollectionUpdateArgs updateArgs; updateArgs.stmtIds = {}; - updateArgs.updatedDoc = document.toBSON(); + updateArgs.updatedDoc = stateDocument.toBSON(); updateArgs.update = BSON("$set" << BSON(ShardSplitDonorDocument::kStateFieldName - << ShardSplitDonorState_serializer(document.getState()))); - updateArgs.criteria = BSON("_id" << document.getId()); - OplogUpdateEntryArgs update(&updateArgs, _nss, document.getId()); + << ShardSplitDonorState_serializer(stateDocument.getState()))); + updateArgs.criteria = BSON("_id" << stateDocument.getId()); + OplogUpdateEntryArgs update(&updateArgs, _nss, stateDocument.getId()); WriteUnitOfWork wuow(_opCtx.get()); _observer->onUpdate(_opCtx.get(), update); @@ -164,6 +158,12 @@ protected: return blockers; } + ShardSplitDonorDocument defaultStateDocument() const { + return ShardSplitDonorDocument::parse( + {"donor.document"}, + BSON("_id" << _uuid << "tenantIds" << _tenantIds << "recipientTagName" + << _recipientTagName << "recipientSetName" << _recipientSetName)); + } protected: MockReplicaSet _replSet = @@ -172,6 +172,8 @@ protected: std::vector<std::string> _tenantIds = {"tenant1", "tenantAB"}; std::string _connectionStr = _replSet.getConnectionString(); UUID _uuid = UUID::gen(); + std::string _recipientTagName{"$recipientNode"}; + std::string _recipientSetName{_replSet.getURI().getSetName()}; std::unique_ptr<ShardSplitDonorOpObserver> _observer; std::shared_ptr<OperationContext> _opCtx; @@ -225,28 +227,28 @@ TEST_F(ShardSplitDonorOpObserverTest, InitialInsertInvalidState) { ShardSplitDonorStateEnum::kDataSync}; for (auto state : states) { - auto document = test::shard_split::createDocument(_uuid, state, _tenantIds, _connectionStr); + auto stateDocument = defaultStateDocument(); + stateDocument.setState(state); auto mtabVerifier = [](std::shared_ptr<TenantMigrationAccessBlocker>) {}; - ASSERT_THROWS(runInsertTestCase(document, _tenantIds, mtabVerifier), DBException); + ASSERT_THROWS(runInsertTestCase(stateDocument, _tenantIds, mtabVerifier), DBException); } } TEST_F(ShardSplitDonorOpObserverTest, InsertValidAbortedDocument) { - - auto document = test::shard_split::createDocument( - _uuid, ShardSplitDonorStateEnum::kAborted, _tenantIds, _connectionStr); + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kAborted); Status status(ErrorCodes::CallbackCanceled, "Split has been aborted"); BSONObjBuilder bob; status.serializeErrorToBSON(&bob); - document.setAbortReason(bob.obj()); + stateDocument.setAbortReason(bob.obj()); - document.setCommitOrAbortOpTime(repl::OpTime(Timestamp(1), 1)); + stateDocument.setCommitOrAbortOpTime(repl::OpTime(Timestamp(1), 1)); std::vector<InsertStatement> inserts; - inserts.emplace_back(_oplogSlot++, document.toBSON()); + inserts.emplace_back(_oplogSlot++, stateDocument.toBSON()); WriteUnitOfWork wow(_opCtx.get()); _observer->onInserts(_opCtx.get(), _nss, _uuid, inserts.begin(), inserts.end(), false); @@ -260,9 +262,10 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertValidAbortedDocument) { } TEST_F(ShardSplitDonorOpObserverTest, InsertDocument) { - auto document = test::shard_split::createDocument( - _uuid, ShardSplitDonorStateEnum::kUninitialized, _tenantIds, _connectionStr); + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts()); + auto stateDocument = defaultStateDocument(); auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) { ASSERT_TRUE(mtab); ASSERT_OK(mtab->checkIfCanWrite(Timestamp(1)).code()); @@ -270,12 +273,12 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertDocument) { ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict); }; - runInsertTestCase(document, _tenantIds, mtabVerifier); + runInsertTestCase(stateDocument, _tenantIds, mtabVerifier); } TEST_F(ShardSplitDonorOpObserverTest, TransitionToDataSync) { - auto document = test::shard_split::createDocument( - _uuid, ShardSplitDonorStateEnum::kDataSync, _tenantIds, _connectionStr); + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kDataSync); auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr); @@ -287,13 +290,13 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToDataSync) { ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict); }; - runUpdateTestCase(document, _tenantIds, blockers, mtabVerifier); + runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier); } TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingPrimary) { - auto document = test::shard_split::createDocument( - _uuid, ShardSplitDonorStateEnum::kBlocking, _tenantIds, _connectionStr); - document.setBlockTimestamp(Timestamp(1, 1)); + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); + stateDocument.setBlockTimestamp(Timestamp(1, 1)); auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr); for (auto& blocker : blockers) { @@ -311,16 +314,16 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingPrimary) { ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict); }; - runUpdateTestCase(document, _tenantIds, blockers, mtabVerifier); + runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier); } TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingSecondary) { // This indicates the instance is secondary for the OpObserver. repl::UnreplicatedWritesBlock setSecondary(_opCtx.get()); - auto document = test::shard_split::createDocument( - _uuid, ShardSplitDonorStateEnum::kBlocking, _tenantIds, _connectionStr); - document.setBlockTimestamp(Timestamp(1, 1)); + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); + stateDocument.setBlockTimestamp(Timestamp(1, 1)); auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr); @@ -334,7 +337,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingSecondary) { ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict); }; - runUpdateTestCase(document, _tenantIds, blockers, mtabVerifier); + runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier); } TEST_F(ShardSplitDonorOpObserverTest, TransitionToCommit) { @@ -342,10 +345,10 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToCommit) { auto commitOpTime = mongo::repl::OpTime(Timestamp(1, 3), 2); _replicationCoordinatorMock->setCurrentCommittedSnapshotOpTime(commitOpTime); - auto document = test::shard_split::createDocument( - _uuid, ShardSplitDonorStateEnum::kCommitted, _tenantIds, _connectionStr); - document.setBlockTimestamp(Timestamp(1, 2)); - document.setCommitOrAbortOpTime(commitOpTime); + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kCommitted); + stateDocument.setBlockTimestamp(Timestamp(1, 2)); + stateDocument.setCommitOrAbortOpTime(commitOpTime); auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr); startBlockingWrites(blockers); @@ -362,7 +365,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToCommit) { ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationCommitted); }; - runUpdateTestCase(document, _tenantIds, blockers, mtabVerifier); + runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier); } TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbort) { @@ -374,11 +377,11 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbort) { BSONObjBuilder bob; status.serializeErrorToBSON(&bob); - auto document = test::shard_split::createDocument( - _uuid, ShardSplitDonorStateEnum::kAborted, _tenantIds, _connectionStr); - document.setBlockTimestamp(Timestamp(1, 2)); - document.setCommitOrAbortOpTime(commitOpTime); - document.setAbortReason(bob.obj()); + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kAborted); + stateDocument.setBlockTimestamp(Timestamp(1, 2)); + stateDocument.setCommitOrAbortOpTime(commitOpTime); + stateDocument.setAbortReason(bob.obj()); auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr); startBlockingWrites(blockers); @@ -394,7 +397,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbort) { ASSERT_OK(mtab->checkIfCanBuildIndex().code()); }; - runUpdateTestCase(document, _tenantIds, blockers, mtabVerifier); + runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier); } } // namespace diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index ef033c43bae..a569faa022a 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -38,6 +38,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/serverless/shard_split_utils.h" #include "mongo/executor/cancelable_executor.h" #include "mongo/executor/connection_pool.h" #include "mongo/executor/network_interface_factory.h" @@ -101,27 +102,34 @@ void setMtabToBlockingForTenants(ServiceContext* context, MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterInitialSync); MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking); +MONGO_FAIL_POINT_DEFINE(skipShardSplitWaitForSplitAcceptance); } // namespace +namespace detail { std::function<bool(const std::vector<sdam::ServerDescriptionPtr>&)> -makeRecipientAcceptSplitPredicate(std::string name, int expectedSize) { - return [name = std::move(name), - expectedSize](const std::vector<sdam::ServerDescriptionPtr>& servers) { - return expectedSize == +makeRecipientAcceptSplitPredicate(const ConnectionString& recipientConnectionString) { + return [recipientConnectionString](const std::vector<sdam::ServerDescriptionPtr>& servers) { + auto recipientNodeCount = + static_cast<uint32_t>(recipientConnectionString.getServers().size()); + auto nodesReportingRecipientSetName = std::count_if(servers.begin(), servers.end(), [&](const auto& server) { - return server->getSetName() && *(server->getSetName()) == name; - }); + return server->getSetName() && + *(server->getSetName()) == recipientConnectionString.getSetName(); + }); + + return nodesReportingRecipientSetName == recipientNodeCount; }; } -SemiFuture<void> getRecipientAcceptSplitFuture(ExecutorPtr executor, - const CancellationToken& token, - MongoURI recipientConnectionString) { +SemiFuture<void> makeRecipientAcceptSplitFuture(ExecutorPtr executor, + const CancellationToken& token, + const StringData& recipientTagName, + const StringData& recipientSetName) { class RecipientAcceptSplitListener : public sdam::TopologyListener { public: - RecipientAcceptSplitListener(int expectedSize, std::string rsName) - : _predicate(makeRecipientAcceptSplitPredicate(std::move(rsName), expectedSize)) {} + RecipientAcceptSplitListener(const ConnectionString& recipientConnectionString) + : _predicate(makeRecipientAcceptSplitPredicate(recipientConnectionString)) {} void onTopologyDescriptionChangedEvent(TopologyDescriptionPtr previousDescription, TopologyDescriptionPtr newDescription) final { stdx::lock_guard<Latch> lg(_mutex); @@ -148,19 +156,24 @@ SemiFuture<void> getRecipientAcceptSplitFuture(ExecutorPtr executor, MONGO_MAKE_LATCH("ShardSplitDonorService::getRecipientAcceptSplitFuture::_mutex"); }; - auto monitor = ReplicaSetMonitor::createIfNeeded(recipientConnectionString); + auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); + invariant(replCoord); + auto recipientConnectionString = repl::makeRecipientConnectionString( + replCoord->getConfig(), recipientTagName, recipientSetName); + auto monitor = ReplicaSetMonitor::createIfNeeded(MongoURI{recipientConnectionString}); invariant(monitor); // Only StreamableReplicaSetMonitor derives ReplicaSetMonitor. Therefore static cast is // possible auto streamableMonitor = checked_pointer_cast<StreamableReplicaSetMonitor>(monitor); - auto listener = std::make_shared<RecipientAcceptSplitListener>( - recipientConnectionString.getServers().size(), - recipientConnectionString.getReplicaSetName()); - + auto listener = std::make_shared<RecipientAcceptSplitListener>(recipientConnectionString); streamableMonitor->getEventsPublisher()->registerListener(listener); + LOGV2(6142508, + "Monitoring recipient nodes for split acceptance.", + "recipientConnectionString"_attr = recipientConnectionString); + return future_util::withCancellation(listener->getFuture(), token) .thenRunOn(executor) // Preserve lifetime of listener and monitor until the future is fulfilled and remove the @@ -171,6 +184,7 @@ SemiFuture<void> getRecipientAcceptSplitFuture(ExecutorPtr executor, }) .semi(); } +} // namespace detail ThreadPool::Limits ShardSplitDonorService::getThreadPoolLimits() const { return ThreadPool::Limits(); @@ -209,7 +223,8 @@ Status ShardSplitDonorService::DonorStateMachine::checkIfOptionsConflict( invariant(stateDoc.getId() == _stateDoc.getId()); if (_stateDoc.getTenantIds() == stateDoc.getTenantIds() && - _stateDoc.getRecipientConnectionString() == stateDoc.getRecipientConnectionString()) { + _stateDoc.getRecipientTagName() == stateDoc.getRecipientTagName() && + _stateDoc.getRecipientSetName() == stateDoc.getRecipientSetName()) { return Status::OK(); } @@ -550,24 +565,23 @@ void ShardSplitDonorService::DonorStateMachine::_initiateTimeout( void ShardSplitDonorService::DonorStateMachine::_createReplicaSetMonitor( const ExecutorPtr& executor, const CancellationToken& abortToken) { - - auto connectionString = [&]() { + auto future = [&]() { stdx::lock_guard<Latch> lg(_mutex); + if (MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { // Test-only. + return SemiFuture<void>::makeReady(); + } - return _stateDoc.getRecipientConnectionString(); - }(); + auto recipientTagName = _stateDoc.getRecipientTagName(); + auto recipientSetName = _stateDoc.getRecipientSetName(); + invariant(recipientTagName); + invariant(recipientSetName); - invariant(connectionString); + return detail::makeRecipientAcceptSplitFuture( + executor, abortToken, *recipientTagName, *recipientSetName); + }(); - _recipientAcceptedSplit.setFrom( - getRecipientAcceptSplitFuture( - executor, abortToken, MongoURI::parse(connectionString.get()).getValue()) - .unsafeToInlineFuture()); + _recipientAcceptedSplit.setFrom(std::move(future).unsafeToInlineFuture()); _replicaSetMonitorCreatedPromise.emplaceValue(); - - LOGV2(6142508, - "Monitoring recipient nodes for split acceptance.", - "recipientConnectionString"_attr = connectionString.get()); } ExecutorFuture<ShardSplitDonorService::DonorStateMachine::DurableState> diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h index 521550330b3..85809dbab2c 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.h +++ b/src/mongo/db/serverless/shard_split_donor_service.h @@ -39,12 +39,15 @@ namespace mongo { using ScopedTaskExecutorPtr = std::shared_ptr<executor::ScopedTaskExecutor>; +namespace detail { std::function<bool(const std::vector<sdam::ServerDescriptionPtr>&)> -makeRecipientAcceptSplitPredicate(std::string name, int expectedSize); +makeRecipientAcceptSplitPredicate(const ConnectionString& recipientConnectionString); -SemiFuture<void> getRecipientAcceptSplitFuture(ExecutorPtr executor, - const CancellationToken& token, - MongoURI recipientConnectionString); +SemiFuture<void> makeRecipientAcceptSplitFuture(ExecutorPtr executor, + const CancellationToken& token, + const StringData& recipientTagName, + const StringData& recipientSetName); +}; // namespace detail class ShardSplitDonorService final : public repl::PrimaryOnlyService { public: diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index 3a12f6e2fbe..71bb0b07b1e 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -69,9 +69,7 @@ namespace mongo { namespace { -constexpr std::int32_t stopFailPointErrorCode = 9822402; - -sdam::TopologyDescriptionPtr createTopologyDescription(const MockReplicaSet& set) { +sdam::TopologyDescriptionPtr makeRecipientTopologyDescription(const MockReplicaSet& set) { std::shared_ptr<TopologyDescription> topologyDescription = std::make_shared<sdam::TopologyDescription>(sdam::SdamConfiguration( set.getHosts(), sdam::TopologyType::kReplicaSetNoPrimary, set.getSetName())); @@ -112,140 +110,66 @@ std::ostringstream& operator<<(std::ostringstream& builder, return builder; } -class ShardSplitDonorServiceTest : public ServiceContextMongoDTest { +class ShardSplitDonorServiceTest : public repl::PrimaryOnlyServiceMongoDTest { public: void setUp() override { - ServiceContextMongoDTest::setUp(); - auto serviceContext = getServiceContext(); - - // Fake replSet just for creating consistent URI for monitor - _rsmMonitor.setup(_replSet.getURI()); - - ConnectionString::setConnectionHook(mongo::MockConnRegistry::get()->getConnStrHook()); - - // Set up clocks. - serviceContext->setFastClockSource(std::make_unique<SharedClockSourceAdapter>(_clkSource)); - serviceContext->setPreciseClockSource( - std::make_unique<SharedClockSourceAdapter>(_clkSource)); - - WaitForMajorityService::get(serviceContext).startup(serviceContext); + repl::PrimaryOnlyServiceMongoDTest::setUp(); + // The database needs to be open before using shard split donor service. { auto opCtx = cc().makeOperationContext(); - auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext); - repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord)); - - repl::createOplog(opCtx.get()); - { - Lock::GlobalWrite lk(opCtx.get()); - OldClientContext ctx(opCtx.get(), NamespaceString::kRsOplogNamespace.ns()); - tenant_migration_util::createOplogViewForTenantMigrations(opCtx.get(), ctx.db()); - } - - // Need real (non-mock) storage for the oplog buffer. - repl::StorageInterface::set(serviceContext, - std::make_unique<repl::StorageInterfaceImpl>()); - - // The DropPendingCollectionReaper is required to drop the oplog buffer collection. - repl::DropPendingCollectionReaper::set( - serviceContext, - std::make_unique<repl::DropPendingCollectionReaper>( - repl::StorageInterface::get(serviceContext))); - - // Set up OpObserver so that repl::logOp() will store the oplog entry's optime in - // ReplClientInfo. - OpObserverRegistry* opObserverRegistry = - dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver()); - opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>()); - opObserverRegistry->addObserver( - std::make_unique<repl::PrimaryOnlyServiceOpObserver>(serviceContext)); - opObserverRegistry->addObserver(std::make_unique<ShardSplitDonorOpObserver>()); - - _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); - std::unique_ptr<ShardSplitDonorService> service = - std::make_unique<ShardSplitDonorService>(getServiceContext()); - _registry->registerService(std::move(service)); - _registry->onStartup(opCtx.get()); + AutoGetDb autoDb( + opCtx.get(), NamespaceString::kTenantSplitDonorsNamespace.db(), MODE_X); + auto db = autoDb.ensureDbExists(opCtx.get()); + ASSERT_TRUE(db); } - _openDatabase(); - stepUp(); - - _service = _registry->lookupServiceByName(ShardSplitDonorService::kServiceName); - ASSERT(_service); - // Timestamps of "0 seconds" are not allowed, so we must advance our clock mock to the first - // real second. - _clkSource->advance(Milliseconds(1000)); - } - - void tearDown() override { - WaitForMajorityService::get(getServiceContext()).shutDown(); - - _registry->onShutdown(); - _service = nullptr; + // real second. Don't save an instance, since this just internally modified the global + // immortal ClockSourceMockImpl. + ClockSourceMock clockSource; + clockSource.advance(Milliseconds(1000)); - repl::StorageInterface::set(getServiceContext(), {}); - - ServiceContextMongoDTest::tearDown(); + // Fake replSet just for creating consistent URI for monitor + _rsmMonitor.setup(_replSet.getURI()); } - void stepUp() { - auto opCtx = cc().makeOperationContext(); - auto replCoord = repl::ReplicationCoordinator::get(getServiceContext()); - - // Advance term - _term++; - - ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); - ASSERT_OK(replCoord->updateTerm(opCtx.get(), _term)); - replCoord->setMyLastAppliedOpTimeAndWallTime( - repl::OpTimeAndWallTime(repl::OpTime(Timestamp(1, 1), _term), Date_t())); +protected: + std::unique_ptr<repl::PrimaryOnlyService> makeService(ServiceContext* serviceContext) override { + return std::make_unique<ShardSplitDonorService>(serviceContext); + } - _registry->onStepUpComplete(opCtx.get(), _term); + void setUpOpObserverRegistry(OpObserverRegistry* opObserverRegistry) override { + opObserverRegistry->addObserver(std::make_unique<ShardSplitDonorOpObserver>()); } - void stepDown() { - ASSERT_OK(repl::ReplicationCoordinator::get(getServiceContext()) - ->setFollowerMode(repl::MemberState::RS_SECONDARY)); - _registry->onStepDown(); + ShardSplitDonorDocument defaultStateDocument() const { + return ShardSplitDonorDocument::parse( + {"donor.document"}, + BSON("_id" << _uuid << "tenantIds" << _tenantIds << "recipientTagName" + << _recipientTagName << "recipientSetName" << _recipientSetName)); } -protected: - repl::PrimaryOnlyServiceRegistry* _registry; - repl::PrimaryOnlyService* _service; - long long _term = 0; + UUID _uuid = UUID::gen(); MockReplicaSet _replSet{ "donorSetForTest", 3, true /* hasPrimary */, false /* dollarPrefixHosts */}; - const NamespaceString _nss = NamespaceString("testDB2", "testColl2"); + const NamespaceString _nss{"testDB2", "testColl2"}; std::vector<std::string> _tenantIds = {"tenant1", "tenantAB"}; - std::string _connectionStr = _replSet.getConnectionString(); - -private: - void _openDatabase() { - auto opCtx = cc().makeOperationContext(); - - // The DB needs to be open before using shard split donor service. - AutoGetDb autoDb(opCtx.get(), NamespaceString::kTenantSplitDonorsNamespace.db(), MODE_X); - auto db = autoDb.ensureDbExists(opCtx.get()); - ASSERT_TRUE(db); - } - - std::shared_ptr<ClockSourceMock> _clkSource = std::make_shared<ClockSourceMock>(); StreamableReplicaSetMonitorForTesting _rsmMonitor; + std::string _recipientTagName{"$recipientNode"}; + std::string _recipientSetName{_replSet.getURI().getSetName()}; }; TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) { auto opCtx = makeOperationContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); - - auto document = test::shard_split::createDocument( - _uuid, ShardSplitDonorStateEnum::kUninitialized, _tenantIds, _connectionStr); + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts()); // Create and start the instance. auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( - opCtx.get(), _service, document.toBSON()); + opCtx.get(), _service, defaultStateDocument().toBSON()); ASSERT(serviceInstance.get()); ASSERT_EQ(_uuid, serviceInstance->getId()); @@ -254,7 +178,7 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) std::shared_ptr<TopologyDescription> topologyDescriptionOld = std::make_shared<sdam::TopologyDescription>(sdam::SdamConfiguration()); std::shared_ptr<TopologyDescription> topologyDescriptionNew = - createTopologyDescription(_replSet); + makeRecipientTopologyDescription(_replSet); // Wait until the RSM has been created by the instance. auto replicaSetMonitorCreatedFuture = serviceInstance->replicaSetMonitorCreatedFuture(); @@ -278,16 +202,21 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) TEST_F(ShardSplitDonorServiceTest, ShardSplitDonorServiceTimeout) { auto opCtx = makeOperationContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts()); + + auto stateDocument = defaultStateDocument(); - auto document = test::shard_split::createDocument( - _uuid, ShardSplitDonorStateEnum::kUninitialized, _tenantIds, _connectionStr); + // Set a timeout of 200 ms, and make sure we reset after this test is run + ON_BLOCK_EXIT([splitTimout = repl::shardSplitTimeoutMS.load()] { + repl::shardSplitTimeoutMS.store(splitTimout); + }); - // Set a timeout of 200 ms. repl::shardSplitTimeoutMS.store(200); // Create and start the instance. auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( - opCtx.get(), _service, document.toBSON()); + opCtx.get(), _service, stateDocument.toBSON()); ASSERT(serviceInstance.get()); ASSERT_EQ(_uuid, serviceInstance->getId()); @@ -305,11 +234,11 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceInAbortState) { test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); - auto document = ShardSplitDonorDocument(_uuid); - document.setState(ShardSplitDonorStateEnum::kAborted); + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kAborted); auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( - opCtx.get(), _service, document.toBSON()); + opCtx.get(), _service, stateDocument.toBSON()); ASSERT(serviceInstance.get()); auto result = serviceInstance->completionFuture().get(opCtx.get()); @@ -324,9 +253,8 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceThenAbort) { auto opCtx = makeOperationContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); - - auto document = test::shard_split::createDocument( - _uuid, ShardSplitDonorStateEnum::kUninitialized, _tenantIds, _connectionStr); + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts()); std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance; { @@ -334,7 +262,7 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceThenAbort) { auto initialTimesEntered = fp.initialTimesEntered(); serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( - opCtx.get(), _service, document.toBSON()); + opCtx.get(), _service, defaultStateDocument().toBSON()); ASSERT(serviceInstance.get()); fp->waitForTimesEntered(initialTimesEntered + 1); @@ -351,9 +279,8 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceThenAbort) { TEST_F(ShardSplitDonorServiceTest, StepDownTest) { auto opCtx = makeOperationContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); - - auto document = test::shard_split::createDocument( - _uuid, ShardSplitDonorStateEnum::kUninitialized, _tenantIds, _connectionStr); + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts()); std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance; @@ -362,7 +289,7 @@ TEST_F(ShardSplitDonorServiceTest, StepDownTest) { auto initialTimesEntered = fp.initialTimesEntered(); serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( - opCtx.get(), _service, document.toBSON()); + opCtx.get(), _service, defaultStateDocument().toBSON()); ASSERT(serviceInstance.get()); fp->waitForTimesEntered(initialTimesEntered + 1); @@ -380,6 +307,11 @@ public: void setUp() override { ServiceContextTest::setUp(); + // we need a mock replication coordinator in order to identify recipient nodes + auto serviceContext = getServiceContext(); + auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext); + repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord)); + _rsmMonitor.setup(_validRepl.getURI()); _otherRsmMonitor.setup(_invalidRepl.getURI()); @@ -402,12 +334,17 @@ protected: StreamableReplicaSetMonitorForTesting _otherRsmMonitor; std::shared_ptr<executor::TaskExecutor> _executor; std::shared_ptr<sdam::TopologyEventsPublisher> _publisher; + std::string _recipientTagName{"$recipientNode"}; + std::string _recipientSetName{_validRepl.getURI().getSetName()}; }; TEST_F(SplitReplicaSetObserverTest, SupportsCancellation) { + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _validRepl.getHosts()); + CancellationSource source; - auto future = getRecipientAcceptSplitFuture( - _executor, source.token(), MongoURI::parse(_validRepl.getConnectionString()).getValue()); + auto future = detail::makeRecipientAcceptSplitFuture( + _executor, source.token(), _recipientTagName, _recipientSetName); ASSERT_FALSE(future.isReady()); source.cancel(); @@ -416,15 +353,17 @@ TEST_F(SplitReplicaSetObserverTest, SupportsCancellation) { } TEST_F(SplitReplicaSetObserverTest, GetRecipientAcceptSplitFutureTest) { - CancellationSource source; + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _validRepl.getHosts()); - auto future = getRecipientAcceptSplitFuture( - _executor, source.token(), MongoURI::parse(_validRepl.getConnectionString()).getValue()); + CancellationSource source; + auto future = detail::makeRecipientAcceptSplitFuture( + _executor, source.token(), _recipientTagName, _recipientSetName); std::shared_ptr<TopologyDescription> topologyDescriptionOld = std::make_shared<sdam::TopologyDescription>(sdam::SdamConfiguration()); std::shared_ptr<TopologyDescription> topologyDescriptionNew = - createTopologyDescription(_validRepl); + makeRecipientTopologyDescription(_validRepl); _publisher->onTopologyDescriptionChangedEvent(topologyDescriptionOld, topologyDescriptionNew); @@ -432,35 +371,33 @@ TEST_F(SplitReplicaSetObserverTest, GetRecipientAcceptSplitFutureTest) { } TEST_F(SplitReplicaSetObserverTest, FutureNotReadyMissingNodes) { - auto uri = MongoURI::parse(_validRepl.getConnectionString()).getValue(); - auto predicate = - makeRecipientAcceptSplitPredicate(uri.getReplicaSetName(), uri.getServers().size()); + detail::makeRecipientAcceptSplitPredicate(_validRepl.getURI().connectionString()); std::shared_ptr<TopologyDescription> topologyDescriptionNew = - createTopologyDescription(_validRepl); + makeRecipientTopologyDescription(_validRepl); topologyDescriptionNew->removeServerDescription(_validRepl.getHosts()[0]); ASSERT_FALSE(predicate(topologyDescriptionNew->getServers())); } TEST_F(SplitReplicaSetObserverTest, FutureNotReadyWrongSet) { - auto uri = MongoURI::parse(_validRepl.getConnectionString()).getValue(); - auto predicate = - makeRecipientAcceptSplitPredicate(uri.getReplicaSetName(), uri.getServers().size()); + detail::makeRecipientAcceptSplitPredicate(_validRepl.getURI().connectionString()); std::shared_ptr<TopologyDescription> topologyDescriptionNew = - createTopologyDescription(_invalidRepl); + makeRecipientTopologyDescription(_invalidRepl); ASSERT_FALSE(predicate(topologyDescriptionNew->getServers())); } TEST_F(SplitReplicaSetObserverTest, ExecutorCanceled) { - CancellationSource source; + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _validRepl.getHosts()); - auto future = getRecipientAcceptSplitFuture( - _executor, source.token(), MongoURI::parse(_validRepl.getConnectionString()).getValue()); + CancellationSource source; + auto future = detail::makeRecipientAcceptSplitFuture( + _executor, source.token(), _recipientTagName, _recipientSetName); _executor->shutdown(); _executor->join(); diff --git a/src/mongo/db/serverless/shard_split_state_machine.idl b/src/mongo/db/serverless/shard_split_state_machine.idl index 47f5e4b521d..54283a662f2 100644 --- a/src/mongo/db/serverless/shard_split_state_machine.idl +++ b/src/mongo/db/serverless/shard_split_state_machine.idl @@ -54,12 +54,14 @@ structs: type: uuid description: "Unique identifier for the shard split operation." cpp_name: id - recipientConnectionString: + recipientSetName: type: string - description: "The connection string for the recipient slice." + description: "The replica set name for the recipient." + optional: true + recipientTagName: + type: string + description: "The replica set tag that identifies recipient nodes." optional: true - validator: - callback: "tenant_migration_util::validateConnectionString" tenantIds: type: array<string> optional: true diff --git a/src/mongo/db/serverless/shard_split_test_utils.cpp b/src/mongo/db/serverless/shard_split_test_utils.cpp index e6638c73f2f..8d3ce4b4d40 100644 --- a/src/mongo/db/serverless/shard_split_test_utils.cpp +++ b/src/mongo/db/serverless/shard_split_test_utils.cpp @@ -28,39 +28,18 @@ */ #include "mongo/db/serverless/shard_split_test_utils.h" +#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/tenant_migration_access_blocker_registry.h" +#include "mongo/db/serverless/shard_split_state_machine_gen.h" +#include "mongo/util/uuid.h" namespace mongo { namespace test { namespace shard_split { -std::vector<StringData> toStringData(const std::vector<std::string>& data) { - std::vector<StringData> out; - out.reserve(data.size()); - - std::transform(data.begin(), - data.end(), - std::back_inserter(out), - [](const std::string& tenant) { return StringData(tenant); }); - - return out; -} - -ShardSplitDonorDocument createDocument(UUID id, - ShardSplitDonorStateEnum state, - const std::vector<std::string>& tenantIds, - const std::string& connectionStr) { - ShardSplitDonorDocument document(id); - document.setState(state); - document.setTenantIds(toStringData(tenantIds)); - document.setRecipientConnectionString(StringData(connectionStr)); - - return document; -} - -ScopedTenantAccessBlocker::ScopedTenantAccessBlocker(std::vector<std::string> tenants, +ScopedTenantAccessBlocker::ScopedTenantAccessBlocker(const std::vector<std::string>& tenants, OperationContext* opCtx) - : _tenants(std::move(tenants)), _opCtx(opCtx) {} + : _tenants(tenants), _opCtx(opCtx) {} ScopedTenantAccessBlocker::~ScopedTenantAccessBlocker() { for (const auto& tenant : _tenants) { @@ -73,6 +52,24 @@ void ScopedTenantAccessBlocker::dismiss() { _tenants.clear(); } +void reconfigToAddRecipientNodes(ServiceContext* serviceContext, + const std::string& recipientTagName, + const std::vector<HostAndPort>& nodes) { + BSONArrayBuilder members; + for (auto node : nodes) { + members.append(BSON("_id" << 1 << "host" << node.toString() << "tags" + << BSON(recipientTagName << UUID::gen().toString()))); + } + + auto newConfig = repl::ReplSetConfig::parse(BSON("_id" + << "donor" + << "version" << 1 << "protocolVersion" << 1 + << "members" << members.arr())); + + auto replCoord = repl::ReplicationCoordinator::get(serviceContext); + dynamic_cast<repl::ReplicationCoordinatorMock*>(replCoord)->setGetConfigReturnValue(newConfig); +} + } // namespace shard_split } // namespace test } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_test_utils.h b/src/mongo/db/serverless/shard_split_test_utils.h index 22aea96da0a..c5cc6bda4a8 100644 --- a/src/mongo/db/serverless/shard_split_test_utils.h +++ b/src/mongo/db/serverless/shard_split_test_utils.h @@ -29,26 +29,21 @@ #pragma once -#include "mongo/db/serverless/shard_split_state_machine_gen.h" -#include "mongo/util/uuid.h" +#include "mongo/util/net/hostandport.h" namespace mongo { +class OperationContext; +class ServiceContext; + namespace test { namespace shard_split { -std::vector<StringData> toStringData(const std::vector<std::string>& data); - -ShardSplitDonorDocument createDocument(UUID id, - ShardSplitDonorStateEnum state, - const std::vector<std::string>& tenantIds, - const std::string& connectionStr); - // Scoped guard to ensure tenant blockers are removed in case a test case fails and throws an // exception. If we do not remove the blockers, it triggers an invariant upon destruction of the // test fixture, which introduces additional errors in the test and makes debugging harder. class ScopedTenantAccessBlocker { public: - ScopedTenantAccessBlocker(std::vector<std::string> tenants, OperationContext* opCtx); + ScopedTenantAccessBlocker(const std::vector<std::string>& tenants, OperationContext* opCtx); ~ScopedTenantAccessBlocker(); void dismiss(); @@ -58,6 +53,14 @@ private: OperationContext* _opCtx; }; +/** + * Build a new configuration with tagged nodes, and install it in ReplicationCoordinatorMock + */ +void reconfigToAddRecipientNodes(ServiceContext* serviceContext, + const std::string& recipientTagName, + const std::vector<HostAndPort>& nodes); + + } // namespace shard_split } // namespace test } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp new file mode 100644 index 00000000000..35c138122b3 --- /dev/null +++ b/src/mongo/db/serverless/shard_split_utils.cpp @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/serverless/shard_split_utils.h" +#include "mongo/db/repl/repl_set_config.h" + +namespace mongo { +namespace repl { +std::vector<MemberConfig> getRecipientMembers(const ReplSetConfig& config, + const StringData& recipientTagName) { + std::vector<MemberConfig> result; + const auto& tagConfig = config.getTagConfig(); + for (auto member : config.members()) { + auto matchesTag = + std::any_of(member.tagsBegin(), member.tagsEnd(), [&](const ReplSetTag& tag) { + return tagConfig.getTagKey(tag) == recipientTagName; + }); + + if (matchesTag) { + result.emplace_back(member); + } + } + + return result; +} + + +ConnectionString makeRecipientConnectionString(const ReplSetConfig& config, + const StringData& recipientTagName, + const StringData& recipientSetName) { + auto recipientMembers = getRecipientMembers(config, recipientTagName); + std::vector<HostAndPort> recipientNodes; + std::transform(recipientMembers.cbegin(), + recipientMembers.cend(), + std::back_inserter(recipientNodes), + [](const MemberConfig& member) { return member.getHostAndPort(); }); + + return ConnectionString::forReplicaSet(recipientSetName.toString(), recipientNodes); +} + +ReplSetConfig makeSplitConfig(const ReplSetConfig& config, + const std::string& recipientSetName, + const std::string& recipientTagName) { + dassert(!recipientSetName.empty() && recipientSetName != config.getReplSetName()); + uassert(6201800, + "We can not make a split config of an existing split config.", + !config.isSplitConfig()); + + const auto& tagConfig = config.getTagConfig(); + std::vector<BSONObj> recipientMembers, donorMembers; + int donorIndex = 0, recipientIndex = 0; + for (const auto& member : config.members()) { + bool isRecipient = + std::any_of(member.tagsBegin(), member.tagsEnd(), [&](const ReplSetTag& tag) { + return tagConfig.getTagKey(tag) == recipientTagName; + }); + if (isRecipient) { + BSONObjBuilder bob( + member.toBSON().removeField("votes").removeField("priority").removeField("_id")); + bob.appendNumber("_id", recipientIndex); + recipientMembers.push_back(bob.obj()); + recipientIndex++; + } else { + BSONObjBuilder bob(member.toBSON().removeField("_id")); + bob.appendNumber("_id", donorIndex); + donorMembers.push_back(bob.obj()); + donorIndex++; + } + } + + uassert(6201801, "No recipient members found for split config.", !recipientMembers.empty()); + uassert(6201802, "No donor members found for split config.", !donorMembers.empty()); + + const auto configNoMembersBson = config.toBSON().removeField("members"); + + BSONObjBuilder recipientConfigBob( + configNoMembersBson.removeField("_id").removeField("settings")); + recipientConfigBob.append("_id", recipientSetName).append("members", recipientMembers); + if (configNoMembersBson.hasField("settings") && + configNoMembersBson.getField("settings").isABSONObj()) { + BSONObj settings = configNoMembersBson.getField("settings").Obj(); + if (settings.hasField("replicaSetId")) { + recipientConfigBob.append("settings", settings.removeField("replicaSetId")); + } + } + + BSONObjBuilder splitConfigBob(configNoMembersBson); + splitConfigBob.append("members", donorMembers); + splitConfigBob.append("recipientConfig", recipientConfigBob.obj()); + + return ReplSetConfig::parse(splitConfigBob.obj()); +} +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_utils.h b/src/mongo/db/serverless/shard_split_utils.h index 4355c7359d9..ce3a4e6783f 100644 --- a/src/mongo/db/serverless/shard_split_utils.h +++ b/src/mongo/db/serverless/shard_split_utils.h @@ -35,57 +35,31 @@ namespace mongo { namespace repl { -static ReplSetConfig makeSplitConfig(const ReplSetConfig& config, - const std::string& recipientSetName, - const std::string& recipientTagName) { - dassert(!recipientSetName.empty() && recipientSetName != config.getReplSetName()); - uassert(6201800, - "We can not make a split config on an existing split config.", - !config.isSplitConfig()); - - const auto& tagConfig = config.getTagConfig(); - std::vector<BSONObj> recipientMembers, donorMembers; - int donorIndex = 0, recipientIndex = 0; - for (const auto& member : config.members()) { - bool isRecipient = - std::any_of(member.tagsBegin(), member.tagsEnd(), [&](const ReplSetTag& tag) { - return tagConfig.getTagKey(tag) == recipientTagName; - }); - if (isRecipient) { - BSONObjBuilder bob( - member.toBSON().removeField("votes").removeField("priority").removeField("_id")); - bob.appendNumber("_id", recipientIndex); - recipientMembers.push_back(bob.obj()); - recipientIndex++; - } else { - BSONObjBuilder bob(member.toBSON().removeField("_id")); - bob.appendNumber("_id", donorIndex); - donorMembers.push_back(bob.obj()); - donorIndex++; - } - } - - uassert(6201801, "No recipient members found for split config.", !recipientMembers.empty()); - uassert(6201802, "No donor members found for split config.", !donorMembers.empty()); +/** + * @returns A list of `MemberConfig` for member nodes which match a provided replica set tag name + */ +std::vector<MemberConfig> getRecipientMembers(const ReplSetConfig& config, + const StringData& recipientTagName); - const auto configNoMembersBson = config.toBSON().removeField("members"); - BSONObjBuilder recipientConfigBob( - configNoMembersBson.removeField("_id").removeField("settings")); - recipientConfigBob.append("_id", recipientSetName).append("members", recipientMembers); - if (configNoMembersBson.hasField("settings") && - configNoMembersBson.getField("settings").isABSONObj()) { - BSONObj settings = configNoMembersBson.getField("settings").Obj(); - if (settings.hasField("replicaSetId")) { - recipientConfigBob.append("settings", settings.removeField("replicaSetId")); - } - } +/** + * Builds a connection string for a shard split recipient by filtering local member nodes by + * `recipientTagName`. The `recipientSetName` is the `replSet` parameter of the recipient + * connection string. + */ +ConnectionString makeRecipientConnectionString(const ReplSetConfig& config, + const StringData& recipientTagName, + const StringData& recipientSetName); - BSONObjBuilder splitConfigBob(configNoMembersBson); - splitConfigBob.append("members", donorMembers); - splitConfigBob.append("recipientConfig", recipientConfigBob.obj()); +/** + * Builds a split config, which is a ReplSetConfig with a subdocument identifying a recipient config + * to be applied to a recipient shard during a shard split operation. The `recipientTagName` is used + * to filter the local member list for recipient nodes. The `recipientSetName` is used to validate + * that we are indeed generating a config for a recipient set with a new name. + */ +ReplSetConfig makeSplitConfig(const ReplSetConfig& config, + const std::string& recipientSetName, + const std::string& recipientTagName); - return ReplSetConfig::parse(splitConfigBob.obj()); -} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_utils_test.cpp b/src/mongo/db/serverless/shard_split_utils_test.cpp new file mode 100644 index 00000000000..12994379de3 --- /dev/null +++ b/src/mongo/db/serverless/shard_split_utils_test.cpp @@ -0,0 +1,176 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/repl/repl_set_config_test.h" +#include "mongo/db/serverless/shard_split_utils.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace repl { +namespace { +TEST(MakeSplitConfig, toBSONRoundTripAbility) { + ReplSetConfig configA; + ReplSetConfig configB; + const std::string recipientTagName{"recipient"}; + const auto donorReplSetId = OID::gen(); + const auto recipientMemberBSON = + BSON("_id" << 1 << "host" + << "localhost:20002" + << "priority" << 0 << "votes" << 0 << "tags" << BSON(recipientTagName << "one")); + + configA = ReplSetConfig::parse(BSON("_id" + << "rs0" + << "version" << 1 << "protocolVersion" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "localhost:12345") + << recipientMemberBSON) + << "settings" + << BSON("heartbeatIntervalMillis" + << 5000 << "heartbeatTimeoutSecs" << 20 + << "replicaSetId" << donorReplSetId))); + configB = ReplSetConfig::parse(configA.toBSON()); + ASSERT_TRUE(configA == configB); + + // here we will test that the result from the method `makeSplitConfig` matches the hardcoded + // resultSplitConfigBSON. We will also check that the recipient from the splitConfig matches + // the hardcoded recipientConfig. + const std::string recipientConfigSetName{"newSet"}; + BSONObj resultRecipientConfigBSON = BSON( + "_id" << recipientConfigSetName << "version" << 1 << "protocolVersion" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "localhost:20002" + << "priority" << 1 << "votes" << 1 << "tags" + << BSON(recipientTagName << "one"))) + << "settings" + << BSON("heartbeatIntervalMillis" << 5000 << "heartbeatTimeoutSecs" << 20)); + + BSONObj resultSplitConfigBSON = BSON("_id" + << "rs0" + << "version" << 1 << "protocolVersion" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "localhost:12345")) + << "settings" + << BSON("heartbeatIntervalMillis" + << 5000 << "heartbeatTimeoutSecs" << 20 + << "replicaSetId" << donorReplSetId) + << "recipientConfig" << resultRecipientConfigBSON); + + const ReplSetConfig splitConfigResult = + repl::makeSplitConfig(configA, recipientConfigSetName, recipientTagName); + + ASSERT_OK(splitConfigResult.validate()); + ASSERT_TRUE(splitConfigResult == ReplSetConfig::parse(splitConfigResult.toBSON())); + + auto resultSplitConfig = ReplSetConfig::parse(resultSplitConfigBSON); + ASSERT_OK(resultSplitConfig.validate()); + ASSERT_TRUE(splitConfigResult == resultSplitConfig); + + auto recipientConfigResultPtr = splitConfigResult.getRecipientConfig(); + // we use getReplicaSetId to match the newly replicaSetId created from makeSplitConfig on the + // recipientConfig since configA had a replicaSetId in its config. + ASSERT_TRUE(*recipientConfigResultPtr == ReplSetConfig::parse(resultRecipientConfigBSON)); +} + +TEST(MakeSplitConfig, ValidateSplitConfigIntegrityTest) { + const std::string recipientTagName{"recipient"}; + const std::string donorConfigSetName{"rs0"}; + const std::string recipientConfigSetName{"newSet"}; + const ReplSetConfig config = ReplSetConfig::parse( + BSON("_id" << donorConfigSetName << "version" << 1 << "protocolVersion" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "localhost:20001" + << "priority" << 1 << "tags" + << BSON("NYC" + << "NY")) + << BSON("_id" << 1 << "host" + << "localhost:20002" + << "priority" << 0 << "votes" << 0 << "tags" + << BSON(recipientTagName << "one")) + << BSON("_id" << 2 << "host" + << "localhost:20003" + << "priority" << 6)) + << "settings" << BSON("electionTimeoutMillis" << 1000))); + + + const ReplSetConfig splitConfig = + repl::makeSplitConfig(config, recipientConfigSetName, recipientTagName); + ASSERT_OK(splitConfig.validate()); + ASSERT_EQ(splitConfig.getReplSetName(), donorConfigSetName); + ASSERT_TRUE(splitConfig.toBSON().hasField("members")); + ASSERT_EQUALS(2, splitConfig.getNumMembers()); + ASSERT_TRUE(splitConfig.isSplitConfig()); + + auto recipientConfigPtr = splitConfig.getRecipientConfig(); + ASSERT_OK(recipientConfigPtr->validate()); + ASSERT_TRUE(recipientConfigPtr->toBSON().hasField("members")); + ASSERT_EQUALS(1, recipientConfigPtr->getNumMembers()); + + ASSERT_FALSE(recipientConfigPtr->isSplitConfig()); + ASSERT_TRUE(recipientConfigPtr->getRecipientConfig() == nullptr); + ASSERT_EQ(recipientConfigPtr->getReplSetName(), recipientConfigSetName); + + ASSERT_THROWS_CODE(repl::makeSplitConfig(splitConfig, recipientConfigSetName, recipientTagName), + AssertionException, + 6201800 /*calling on a splitconfig*/); +} + +TEST(MakeSplitConfig, SplitConfigAssertionsTest) { + const std::string recipientConfigSetName{"newSet"}; + const std::string recipientTagName{"recipient"}; + auto baseConfigBSON = BSON("_id" + << "rs0" + << "version" << 1 << "protocolVersion" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "localhost:20002" + << "priority" << 0 << "votes" << 0))); + + ASSERT_THROWS_CODE(repl::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON), + recipientConfigSetName, + recipientTagName), + AssertionException, + 6201801 /*no recipient members created*/); + + baseConfigBSON = BSON("_id" + << "rs0" + << "version" << 1 << "protocolVersion" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "localhost:20002" + << "priority" << 0 << "votes" << 0 << "tags" + << BSON(recipientTagName << "one"))) + << "settings" << BSON("electionTimeoutMillis" << 1000)); + + ASSERT_THROWS_CODE(repl::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON), + recipientConfigSetName, + recipientTagName), + AssertionException, + 6201802 /*no donor members created*/); +} +} // namespace +} // namespace repl +} // namespace mongo |