summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Broadstone <mbroadst@mongodb.com>2022-04-02 11:56:52 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-02 12:24:51 +0000
commitc78ba79626722ae69ea9b64762ffd1dc075ce960 (patch)
tree697f1c1b4bfb3629017687b940f86ad6e6076382
parent22f72dd4701f6a6a0d6cda810a4fde4cb36bf895 (diff)
downloadmongo-c78ba79626722ae69ea9b64762ffd1dc075ce960.tar.gz
SERVER-65059 Store recipient connection string in state document
-rw-r--r--src/mongo/db/repl/primary_only_service_test_fixture.cpp17
-rw-r--r--src/mongo/db/repl/primary_only_service_test_fixture.h2
-rw-r--r--src/mongo/db/s/add_shard_cmd.idl10
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp137
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.h53
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp257
-rw-r--r--src/mongo/db/serverless/shard_split_state_machine.idl6
-rw-r--r--src/mongo/idl/basic_types.idl7
8 files changed, 248 insertions, 241 deletions
diff --git a/src/mongo/db/repl/primary_only_service_test_fixture.cpp b/src/mongo/db/repl/primary_only_service_test_fixture.cpp
index 70f7453cf9e..cb21b22f243 100644
--- a/src/mongo/db/repl/primary_only_service_test_fixture.cpp
+++ b/src/mongo/db/repl/primary_only_service_test_fixture.cpp
@@ -124,22 +124,5 @@ void PrimaryOnlyServiceMongoDTest::stepDown() {
_registry->onStepDown();
}
-std::shared_ptr<executor::TaskExecutor> makeTestExecutor() {
- ThreadPool::Options threadPoolOptions;
- threadPoolOptions.threadNamePrefix = "PrimaryOnlyServiceTest-";
- threadPoolOptions.poolName = "PrimaryOnlyServiceTestThreadPool";
- threadPoolOptions.onCreateThread = [](const std::string& threadName) {
- Client::initThread(threadName.c_str());
- };
-
- auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
- auto executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
- std::make_unique<ThreadPool>(threadPoolOptions),
- executor::makeNetworkInterface(
- "PrimaryOnlyServiceTestNetwork", nullptr, std::move(hookList)));
- executor->startup();
- return executor;
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/primary_only_service_test_fixture.h b/src/mongo/db/repl/primary_only_service_test_fixture.h
index 8ca3dae3bb2..906cbd15297 100644
--- a/src/mongo/db/repl/primary_only_service_test_fixture.h
+++ b/src/mongo/db/repl/primary_only_service_test_fixture.h
@@ -81,7 +81,5 @@ protected:
long long _term = 0;
};
-std::shared_ptr<executor::TaskExecutor> makeTestExecutor();
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/s/add_shard_cmd.idl b/src/mongo/db/s/add_shard_cmd.idl
index b57a2edc14d..4955d8e57a5 100644
--- a/src/mongo/db/s/add_shard_cmd.idl
+++ b/src/mongo/db/s/add_shard_cmd.idl
@@ -33,14 +33,6 @@ global:
imports:
- "mongo/idl/basic_types.idl"
-types:
- connectionstring:
- bson_serialization_type: string
- description: "A MongoDB ConnectionString"
- cpp_type: "mongo::ConnectionString"
- serializer: mongo::ConnectionString::toString
- deserializer: mongo::ConnectionString::deserialize
-
structs:
ShardIdentity:
description: "Contains all the information needed to identify a shard and lookup the shard identity document from storage"
@@ -53,7 +45,7 @@ structs:
type: objectid
configsvrConnectionString:
description: "Connection string to the config server"
- type: connectionstring
+ type: connection_string
commands:
_addShard:
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index d240f9c5496..defb74e9a3c 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -116,15 +116,8 @@ void insertTenantAccessBlocker(WithLock lk,
ShardSplitDonorDocument donorStateDoc) {
auto optionalTenants = donorStateDoc.getTenantIds();
invariant(optionalTenants);
-
- auto recipientTagName = donorStateDoc.getRecipientTagName();
- auto recipientSetName = donorStateDoc.getRecipientSetName();
- invariant(recipientTagName);
- invariant(recipientSetName);
-
- auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
- auto recipientConnectionString =
- serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName);
+ auto recipientConnectionString = donorStateDoc.getRecipientConnectionString();
+ invariant(recipientConnectionString);
for (const auto& tenantId : optionalTenants.get()) {
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(
@@ -132,7 +125,7 @@ void insertTenantAccessBlocker(WithLock lk,
donorStateDoc.getId(),
tenantId.toString(),
MigrationProtocolEnum::kMultitenantMigrations,
- recipientConnectionString.toString());
+ recipientConnectionString->toString());
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenantId, mtab);
@@ -154,13 +147,7 @@ namespace detail {
SemiFuture<void> makeRecipientAcceptSplitFuture(
std::shared_ptr<executor::TaskExecutor> taskExecutor,
const CancellationToken& token,
- const StringData& recipientTagName,
- const StringData& recipientSetName) {
-
- auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext());
- invariant(replCoord);
- auto recipientConnectionString = serverless::makeRecipientConnectionString(
- replCoord->getConfig(), recipientTagName, recipientSetName);
+ const ConnectionString& recipientConnectionString) {
// build a vector of single server discovery monitors to listen for heartbeats
auto eventsPublisher = std::make_shared<sdam::TopologyEventsPublisher>(taskExecutor);
@@ -257,6 +244,8 @@ ExecutorFuture<void> ShardSplitDonorService::_rebuildService(
return _createStateDocumentTTLIndex(executor, token);
}
+boost::optional<TaskExecutorPtr>
+ ShardSplitDonorService::DonorStateMachine::_splitAcceptanceTaskExecutorForTest;
ShardSplitDonorService::DonorStateMachine::DonorStateMachine(
ServiceContext* serviceContext,
ShardSplitDonorService* splitService,
@@ -383,15 +372,13 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
"id"_attr = _migrationId,
"timeout"_attr = repl::shardSplitTimeoutMS.load());
- _createReplicaSetMonitor(abortToken);
-
_decisionPromise.setWith([&] {
return ExecutorFuture(**executor)
.then([this, executor, primaryToken] {
// Note we do not use the abort split token here because the abortShardSplit
// command waits for a decision to be persisted which will not happen if
// inserting the initial state document fails.
- return _writeInitialDocument(executor, primaryToken);
+ return _enterBlockingOrAbortedState(executor, primaryToken);
})
.then([this, executor, abortToken] {
checkForTokenInterrupt(abortToken);
@@ -561,7 +548,7 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipien
}
return ExecutorFuture(**executor)
- .then([&]() { return _recipientAcceptedSplit.getFuture(); })
+ .then([&]() { return _splitAcceptancePromise.getFuture(); })
.then([this, executor, token] {
LOGV2(6142503,
"Recipient has accepted the split, committing decision.",
@@ -574,13 +561,12 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipien
});
}
-ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDocument(
+ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOrAbortedState(
const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryServiceToken) {
ShardSplitDonorStateEnum nextState;
{
stdx::lock_guard<Latch> lg(_mutex);
-
if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) {
if (isAbortedDocumentPersistent(lg, _stateDoc)) {
// Node has step up and created an instance using a document in abort state. No need
@@ -594,14 +580,42 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDoc
_abortReason->serializeErrorToBSON(&bob);
_stateDoc.setAbortReason(bob.obj());
nextState = ShardSplitDonorStateEnum::kAborted;
+ } else {
+ auto recipientTagName = _stateDoc.getRecipientTagName();
+ invariant(recipientTagName);
+ auto recipientSetName = _stateDoc.getRecipientSetName();
+ invariant(recipientSetName);
+ auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
+ auto recipientConnectionString = serverless::makeRecipientConnectionString(
+ config, *recipientTagName, *recipientSetName);
+
+ // Always start the replica set monitor if we haven't reached a decision yet
+ _splitAcceptancePromise.setWith([&]() -> Future<void> {
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking ||
+ MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) {
+ return SemiFuture<void>::makeReady().unsafeToInlineFuture();
+ }
+
+ // Optionally select a task executor for unit testing
+ auto executor = _splitAcceptanceTaskExecutorForTest
+ ? *_splitAcceptanceTaskExecutorForTest
+ : _shardSplitService->getInstanceCleanupExecutor();
+
+ return detail::makeRecipientAcceptSplitFuture(
+ executor, primaryServiceToken, recipientConnectionString)
+ .unsafeToInlineFuture();
+ });
+
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) {
+ // Node has step up and resumed a shard split. No need to write the document as it
+ // already exists.
+ return ExecutorFuture(**executor);
+ }
- } else if (_stateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized) {
+ // Otherwise, record the recipient connection string
+ _stateDoc.setRecipientConnectionString(recipientConnectionString);
_stateDoc.setState(ShardSplitDonorStateEnum::kBlocking);
nextState = ShardSplitDonorStateEnum::kBlocking;
- } else {
- // Node has step up and resumed a shard split. No need to write the document as it
- // already exists.
- return ExecutorFuture(**executor);
}
}
@@ -756,36 +770,6 @@ void ShardSplitDonorService::DonorStateMachine::_initiateTimeout(
.semi();
}
-void ShardSplitDonorService::DonorStateMachine::_createReplicaSetMonitor(
- const CancellationToken& abortToken) {
- {
- stdx::lock_guard<Latch> lg(_mutex);
- if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) {
- return;
- }
- }
-
- auto future = [&]() {
- stdx::lock_guard<Latch> lg(_mutex);
- if (MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { // Test-only.
- return SemiFuture<void>::makeReady();
- }
-
- auto recipientTagName = _stateDoc.getRecipientTagName();
- auto recipientSetName = _stateDoc.getRecipientSetName();
- invariant(recipientTagName);
- invariant(recipientSetName);
-
- return detail::makeRecipientAcceptSplitFuture(
- _shardSplitService->getInstanceCleanupExecutor(),
- abortToken,
- *recipientTagName,
- *recipientSetName);
- }();
-
- _recipientAcceptedSplit.setFrom(std::move(future).unsafeToInlineFuture());
-}
-
ExecutorFuture<ShardSplitDonorService::DonorStateMachine::DurableState>
ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
StatusWith<DurableState> statusWithState,
@@ -846,26 +830,6 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
});
}
-ExecutorFuture<repl::OpTime>
-ShardSplitDonorService::DonorStateMachine::_markStateDocAsGarbageCollectable(
- std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) {
- stdx::lock_guard<Latch> lg(_mutex);
-
- _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
- Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()});
-
- return AsyncTry([this, self = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
- auto opCtx = opCtxHolder.get();
-
- uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc));
- return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- })
- .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
- .withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, token);
-}
-
ExecutorFuture<void>
ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectible(
const ScopedTaskExecutorPtr& executor, const CancellationToken& token) {
@@ -888,7 +852,22 @@ ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageColle
LOGV2(6236606,
"Marking shard split as garbage-collectable.",
"migrationId"_attr = _migrationId);
- return _markStateDocAsGarbageCollectable(executor, token);
+
+ stdx::lock_guard<Latch> lg(_mutex);
+ _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
+ Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()});
+
+ return AsyncTry([this, self = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
+ uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc));
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ })
+ .until(
+ [](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .withBackoffBetweenIterations(kExponentialBackoff)
+ .on(**executor, token);
})
.then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h
index 6af6d79ad49..43759fa5dee 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.h
+++ b/src/mongo/db/serverless/shard_split_donor_service.h
@@ -38,19 +38,9 @@
namespace mongo {
+using TaskExecutorPtr = std::shared_ptr<executor::TaskExecutor>;
using ScopedTaskExecutorPtr = std::shared_ptr<executor::ScopedTaskExecutor>;
-namespace detail {
-
-SemiFuture<void> makeRecipientAcceptSplitFuture(
- ExecutorPtr executor,
- std::shared_ptr<executor::TaskExecutor> taskExecutor,
- const CancellationToken& token,
- const StringData& recipientTagName,
- const StringData& recipientSetName);
-
-}; // namespace detail
-
class ShardSplitDonorService final : public repl::PrimaryOnlyService {
public:
static constexpr StringData kServiceName = "ShardSplitDonorService"_sd;
@@ -150,8 +140,18 @@ public:
return !!_stateDoc.getExpireAt();
}
+ /**
+ * Only used for testing. Allows settinga custom task executor for observing split acceptance.
+ */
+ static void setSplitAcceptanceTaskExecutor_forTest(TaskExecutorPtr taskExecutor) {
+ _splitAcceptanceTaskExecutorForTest = taskExecutor;
+ }
+
private:
// Tasks
+ ExecutorFuture<void> _enterBlockingOrAbortedState(const ScopedTaskExecutorPtr& executor,
+ const CancellationToken& token);
+
ExecutorFuture<void> _waitForRecipientToReachBlockTimestamp(
const ScopedTaskExecutorPtr& executor, const CancellationToken& token);
@@ -161,10 +161,16 @@ private:
ExecutorFuture<void> _waitForRecipientToAcceptSplit(const ScopedTaskExecutorPtr& executor,
const CancellationToken& token);
- // Helpers
- ExecutorFuture<void> _writeInitialDocument(const ScopedTaskExecutorPtr& executor,
- const CancellationToken& token);
+ ExecutorFuture<void> _waitForForgetCmdThenMarkGarbageCollectible(
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& token);
+
+ ExecutorFuture<DurableState> _handleErrorOrEnterAbortedState(
+ StatusWith<DurableState> durableState,
+ const ScopedTaskExecutorPtr& executor,
+ const CancellationToken& instanceAbortToken,
+ const CancellationToken& abortToken);
+ // Helpers
ExecutorFuture<repl::OpTime> _updateStateDocument(const ScopedTaskExecutorPtr& executor,
const CancellationToken& token,
ShardSplitDonorStateEnum nextState);
@@ -176,20 +182,6 @@ private:
void _initiateTimeout(const ScopedTaskExecutorPtr& executor,
const CancellationToken& abortToken);
- void _createReplicaSetMonitor(const CancellationToken& abortToken);
-
- ExecutorFuture<DurableState> _handleErrorOrEnterAbortedState(
- StatusWith<DurableState> durableState,
- const ScopedTaskExecutorPtr& executor,
- const CancellationToken& instanceAbortToken,
- const CancellationToken& abortToken);
-
- ExecutorFuture<repl::OpTime> _markStateDocAsGarbageCollectable(
- std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token);
-
- ExecutorFuture<void> _waitForForgetCmdThenMarkGarbageCollectible(
- const ScopedTaskExecutorPtr& executor, const CancellationToken& token);
-
/*
* We need to call this method when we find out the replica set name is the same as the state
* doc recipient set name and the current state doc state is blocking.
@@ -223,10 +215,13 @@ private:
SharedPromise<void> _completionPromise;
// A promise fulfilled when all recipient nodes have accepted the split.
- SharedPromise<void> _recipientAcceptedSplit;
+ SharedPromise<void> _splitAcceptancePromise;
// A promise fulfilled when tryForget is called.
SharedPromise<void> _forgetShardSplitReceivedPromise;
+
+ // A task executor used for the split acceptance future in tests
+ static boost::optional<TaskExecutorPtr> _splitAcceptanceTaskExecutorForTest;
};
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index 65abd349de0..36a2b03c4a5 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -200,8 +200,22 @@ public:
ClockSourceMock clockSource;
clockSource.advance(Milliseconds(1000));
- // Fake replSet just for creating consistent URI for monitor
- _rsmMonitor.setup(_replSet.getURI());
+ // setup mock networking for split acceptance
+ auto network = std::make_unique<executor::NetworkInterfaceMock>();
+ _net = network.get();
+ _executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<executor::ThreadPoolMock>(
+ _net, 1, executor::ThreadPoolMock::Options{}),
+ std::move(network));
+ _executor->startup();
+ }
+
+ void tearDown() override {
+ _net->exitNetwork();
+ _executor->shutdown();
+ _executor->join();
+
+ repl::PrimaryOnlyServiceMongoDTest::tearDown();
}
protected:
@@ -227,24 +241,80 @@ protected:
"recipientSetForTest", 3, true /* hasPrimary */, false /* dollarPrefixHosts */};
const NamespaceString _nss{"testDB2", "testColl2"};
std::vector<std::string> _tenantIds = {"tenant1", "tenantAB"};
- StreamableReplicaSetMonitorForTesting _rsmMonitor;
std::string _recipientTagName{"$recipientNode"};
- std::string _recipientSetName{_replSet.getURI().getSetName()};
- FailPointEnableBlock _skipAcceptanceFP{"skipShardSplitWaitForSplitAcceptance"};
+ std::string _recipientSetName{_recipientSet.getURI().getSetName()};
+
+ std::unique_ptr<FailPointEnableBlock> _skipAcceptanceFP =
+ std::make_unique<FailPointEnableBlock>("skipShardSplitWaitForSplitAcceptance");
+
+
+ // for mocking split acceptance
+ executor::NetworkInterfaceMock* _net;
+ TaskExecutorPtr _executor;
};
+executor::RemoteCommandRequest assertRemoteCommandNameEquals(
+ StringData cmdName, const executor::RemoteCommandRequest& request) {
+ auto&& cmdObj = request.cmdObj;
+ ASSERT_FALSE(cmdObj.isEmpty());
+ if (cmdName != cmdObj.firstElementFieldName()) {
+ std::string msg = str::stream()
+ << "Expected command name \"" << cmdName << "\" in remote command request but found \""
+ << cmdObj.firstElementFieldName() << "\" instead: " << request.toString();
+ FAIL(msg);
+ }
+ return request;
+}
+
+void processHelloRequest(executor::NetworkInterfaceMock* net, MockReplicaSet* replSet) {
+ ASSERT(net->hasReadyRequests());
+ net->runReadyNetworkOperations();
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+
+ assertRemoteCommandNameEquals("isMaster", request);
+ auto requestHost = request.target.toString();
+ const auto node = replSet->getNode(requestHost);
+ if (node->isRunning()) {
+ const auto opmsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
+ const auto reply = node->runCommand(request.id, opmsg)->getCommandReply();
+ net->scheduleSuccessfulResponse(noi,
+ executor::RemoteCommandResponse(reply, Milliseconds(0)));
+ } else {
+ net->scheduleErrorResponse(noi, Status(ErrorCodes::HostUnreachable, ""));
+ }
+}
+
+void waitForHelloRequest(executor::NetworkInterfaceMock* net) {
+ while (!net->hasReadyRequests()) {
+ net->advanceTime(net->now() + Milliseconds{1});
+ }
+}
+
TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) {
auto opCtx = makeOperationContext();
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
test::shard_split::reconfigToAddRecipientNodes(
getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts());
+ ShardSplitDonorService::DonorStateMachine::setSplitAcceptanceTaskExecutor_forTest(_executor);
+ _skipAcceptanceFP.reset();
+
// Create and start the instance.
auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate(
opCtx.get(), _service, defaultStateDocument().toBSON());
ASSERT(serviceInstance.get());
ASSERT_EQ(_uuid, serviceInstance->getId());
+ // Wait for monitors to start, and enqueue successfull hello responses
+ _net->enterNetwork();
+ waitForHelloRequest(_net);
+ processHelloRequest(_net, &_recipientSet);
+ processHelloRequest(_net, &_recipientSet);
+ processHelloRequest(_net, &_recipientSet);
+ _net->runReadyNetworkOperations();
+ _net->exitNetwork();
+
auto decisionFuture = serviceInstance->decisionFuture();
decisionFuture.wait();
@@ -459,63 +529,28 @@ TEST_F(ShardSplitDonorServiceTest, AbortDueToRecipientNodesValidation) {
ASSERT_TRUE(!serviceInstance->isGarbageCollectable());
}
-class SplitReplicaSetObserverTest : public ServiceContextTest {
-public:
- void setUp() override {
- ServiceContextTest::setUp();
-
- // we need a mock replication coordinator in order to identify recipient nodes
- auto serviceContext = getServiceContext();
- auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext);
- repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord));
-
- _rsmMonitor.setup(_validRepl.getURI());
- _otherRsmMonitor.setup(_invalidRepl.getURI());
-
- _executor = repl::makeTestExecutor();
-
- // Retrieve monitor installed by _rsmMonitor.setup(...)
- auto monitor = checked_pointer_cast<StreamableReplicaSetMonitor>(
- ReplicaSetMonitor::createIfNeeded(_validRepl.getURI()));
- invariant(monitor);
- _publisher = monitor->getEventsPublisher();
- }
-
-protected:
- MockReplicaSet _validRepl{
- "replInScope", 3, true /* hasPrimary */, false /* dollarPrefixHosts */};
- MockReplicaSet _recipientSet{
- "recipientReplSet", 3, true /* hasPrimary */, false /* dollarPrefixHosts */};
- MockReplicaSet _invalidRepl{
- "replNotInScope", 3, true /* hasPrimary */, false /* dollarPrefixHosts */};
-
- StreamableReplicaSetMonitorForTesting _rsmMonitor;
- StreamableReplicaSetMonitorForTesting _otherRsmMonitor;
- std::shared_ptr<executor::TaskExecutor> _executor;
- std::shared_ptr<sdam::TopologyEventsPublisher> _publisher;
- std::string _recipientTagName{"$recipientNode"};
- std::string _recipientSetName{_validRepl.getURI().getSetName()};
-};
-
-TEST_F(SplitReplicaSetObserverTest, FutureReady) {
+TEST(RecipientAcceptSplitListenerTest, FutureReady) {
+ MockReplicaSet donor{"donor", 3, true /* hasPrimary */, false /* dollarPrefixHosts */};
auto listener =
- mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString());
+ mongo::serverless::RecipientAcceptSplitListener(donor.getURI().connectionString());
- for (const auto& host : _validRepl.getHosts()) {
+ for (const auto& host : donor.getHosts()) {
ASSERT_FALSE(listener.getFuture().isReady());
- listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << _validRepl.getSetName()));
+ listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << donor.getSetName()));
}
+ ASSERT_FALSE(listener.getFuture().isReady());
+
listener.onServerHeartbeatSucceededEvent(
- _validRepl.getHosts().front(),
- BSON("setName" << _validRepl.getSetName() << "ismaster" << true));
+ donor.getHosts().front(), BSON("setName" << donor.getSetName() << "ismaster" << true));
ASSERT_TRUE(listener.getFuture().isReady());
}
-TEST_F(SplitReplicaSetObserverTest, FutureReadyNameChange) {
+TEST(RecipientAcceptSplitListenerTest, FutureReadyNameChange) {
+ MockReplicaSet donor{"donor", 3, true /* hasPrimary */, false /* dollarPrefixHosts */};
auto listener =
- mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString());
+ mongo::serverless::RecipientAcceptSplitListener(donor.getURI().connectionString());
- for (const auto& host : _validRepl.getHosts()) {
+ for (const auto& host : donor.getHosts()) {
listener.onServerHeartbeatSucceededEvent(host,
BSON("setName"
<< "donorSetName"));
@@ -523,44 +558,54 @@ TEST_F(SplitReplicaSetObserverTest, FutureReadyNameChange) {
ASSERT_FALSE(listener.getFuture().isReady());
- for (const auto& host : _validRepl.getHosts()) {
- listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << _validRepl.getSetName()));
+ for (const auto& host : donor.getHosts()) {
+ listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << donor.getSetName()));
}
listener.onServerHeartbeatSucceededEvent(
- _validRepl.getHosts().front(),
- BSON("setName" << _validRepl.getSetName() << "ismaster" << true));
+ donor.getHosts().front(), BSON("setName" << donor.getSetName() << "ismaster" << true));
ASSERT_TRUE(listener.getFuture().isReady());
}
-TEST_F(SplitReplicaSetObserverTest, FutureNotReadyMissingNodes) {
+TEST(RecipientAcceptSplitListenerTest, FutureNotReadyMissingNodes) {
+ MockReplicaSet donor{"donor", 3, false /* hasPrimary */, false /* dollarPrefixHosts */};
auto listener =
- mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString());
+ mongo::serverless::RecipientAcceptSplitListener(donor.getURI().connectionString());
- for (size_t i = 0; i < _validRepl.getHosts().size() - 1; ++i) {
- listener.onServerHeartbeatSucceededEvent(_validRepl.getHosts()[i],
- BSON("setName" << _validRepl.getSetName()));
+
+ for (size_t i = 0; i < donor.getHosts().size() - 1; ++i) {
+ listener.onServerHeartbeatSucceededEvent(donor.getHosts()[i],
+ BSON("setName" << donor.getSetName()));
}
ASSERT_FALSE(listener.getFuture().isReady());
+ listener.onServerHeartbeatSucceededEvent(donor.getHosts()[donor.getHosts().size() - 1],
+ BSON("setName" << donor.getSetName()));
+ ASSERT_FALSE(listener.getFuture().isReady());
+ donor.setPrimary(donor.getHosts()[0].host());
+ listener.onServerHeartbeatSucceededEvent(
+ donor.getHosts()[0], BSON("setName" << donor.getSetName() << "ismaster" << true));
+ ASSERT_TRUE(listener.getFuture().isReady());
}
-TEST_F(SplitReplicaSetObserverTest, FutureNotReadyNoSetName) {
+TEST(RecipientAcceptSplitListenerTest, FutureNotReadyNoSetName) {
+ MockReplicaSet donor{"donor", 3, true /* hasPrimary */, false /* dollarPrefixHosts */};
auto listener =
- mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString());
+ mongo::serverless::RecipientAcceptSplitListener(donor.getURI().connectionString());
- for (size_t i = 0; i < _validRepl.getHosts().size() - 1; ++i) {
- listener.onServerHeartbeatSucceededEvent(_validRepl.getHosts()[i], BSONObj());
+ for (size_t i = 0; i < donor.getHosts().size() - 1; ++i) {
+ listener.onServerHeartbeatSucceededEvent(donor.getHosts()[i], BSONObj());
}
ASSERT_FALSE(listener.getFuture().isReady());
}
-TEST_F(SplitReplicaSetObserverTest, FutureNotReadyWrongSet) {
+TEST(RecipientAcceptSplitListenerTest, FutureNotReadyWrongSet) {
+ MockReplicaSet donor{"donor", 3, true /* hasPrimary */, false /* dollarPrefixHosts */};
auto listener =
- mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString());
+ mongo::serverless::RecipientAcceptSplitListener(donor.getURI().connectionString());
- for (const auto& host : _validRepl.getHosts()) {
+ for (const auto& host : donor.getHosts()) {
listener.onServerHeartbeatSucceededEvent(host,
BSON("setName"
<< "wrongSetName"));
@@ -569,36 +614,6 @@ TEST_F(SplitReplicaSetObserverTest, FutureNotReadyWrongSet) {
ASSERT_FALSE(listener.getFuture().isReady());
}
-class ShardSplitPersistenceTest : public ShardSplitDonorServiceTest {
-public:
- void setUpPersistence(OperationContext* opCtx) override {
-
- // We need to allow writes during the test's setup.
- auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>(
- repl::ReplicationCoordinator::get(opCtx->getServiceContext()));
- replCoord->alwaysAllowWrites(true);
-
- replCoord->setGetConfigReturnValue(initialDonorConfig());
-
- _recStateDoc = initialStateDocument();
- uassertStatusOK(serverless::insertStateDoc(opCtx, _recStateDoc));
-
- _pauseBeforeRecipientCleanupFp =
- std::make_unique<FailPointEnableBlock>("pauseShardSplitBeforeRecipientCleanup");
-
- _initialTimesEntered = _pauseBeforeRecipientCleanupFp->initialTimesEntered();
- }
-
- virtual repl::ReplSetConfig initialDonorConfig() = 0;
-
- virtual ShardSplitDonorDocument initialStateDocument() = 0;
-
-protected:
- ShardSplitDonorDocument _recStateDoc;
- std::unique_ptr<FailPointEnableBlock> _pauseBeforeRecipientCleanupFp;
- FailPoint::EntryCountT _initialTimesEntered;
-};
-
TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) {
auto opCtx = makeOperationContext();
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
@@ -624,6 +639,7 @@ TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) {
ASSERT_FALSE(result.isOK());
ASSERT_EQ(ErrorCodes::InterruptedDueToReplStateChange, result.getStatus().code());
+ // verify that the state document exists
ASSERT_OK(serverless::getStateDocument(opCtx.get(), _uuid).getStatus());
auto fp = std::make_unique<FailPointEnableBlock>("pauseShardSplitAfterBlocking");
@@ -633,20 +649,51 @@ TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) {
fp->failPoint()->waitForTimesEntered(initialTimesEntered + 1);
- std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance =
- ShardSplitDonorService::DonorStateMachine::getOrCreate(
- opCtx.get(), _service, defaultStateDocument().toBSON());
- ASSERT(serviceInstance.get());
-
+ // verify that the state document exists
ASSERT_OK(serverless::getStateDocument(opCtx.get(), _uuid).getStatus());
+ auto donor = ShardSplitDonorService::DonorStateMachine::lookup(
+ opCtx.get(), _service, BSON("_id" << _uuid));
+ ASSERT(donor);
fp.reset();
- ASSERT_OK(serviceInstance->decisionFuture().getNoThrow().getStatus());
+ ASSERT_OK((*donor)->decisionFuture().getNoThrow().getStatus());
- serviceInstance->tryForget();
+ (*donor)->tryForget();
+ ASSERT_OK((*donor)->completionFuture().getNoThrow());
+ ASSERT_TRUE((*donor)->isGarbageCollectable());
}
+class ShardSplitPersistenceTest : public ShardSplitDonorServiceTest {
+public:
+ void setUpPersistence(OperationContext* opCtx) override {
+
+ // We need to allow writes during the test's setup.
+ auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>(
+ repl::ReplicationCoordinator::get(opCtx->getServiceContext()));
+ replCoord->alwaysAllowWrites(true);
+
+ replCoord->setGetConfigReturnValue(initialDonorConfig());
+
+ _recStateDoc = initialStateDocument();
+ uassertStatusOK(serverless::insertStateDoc(opCtx, _recStateDoc));
+
+ _pauseBeforeRecipientCleanupFp =
+ std::make_unique<FailPointEnableBlock>("pauseShardSplitBeforeRecipientCleanup");
+
+ _initialTimesEntered = _pauseBeforeRecipientCleanupFp->initialTimesEntered();
+ }
+
+ virtual repl::ReplSetConfig initialDonorConfig() = 0;
+
+ virtual ShardSplitDonorDocument initialStateDocument() = 0;
+
+protected:
+ ShardSplitDonorDocument _recStateDoc;
+ std::unique_ptr<FailPointEnableBlock> _pauseBeforeRecipientCleanupFp;
+ FailPoint::EntryCountT _initialTimesEntered;
+};
+
class ShardSplitRecipientCleanupTest : public ShardSplitPersistenceTest {
public:
repl::ReplSetConfig initialDonorConfig() override {
diff --git a/src/mongo/db/serverless/shard_split_state_machine.idl b/src/mongo/db/serverless/shard_split_state_machine.idl
index 81cef079bff..8aa65017c1b 100644
--- a/src/mongo/db/serverless/shard_split_state_machine.idl
+++ b/src/mongo/db/serverless/shard_split_state_machine.idl
@@ -61,6 +61,12 @@ structs:
type: string
description: "The replica set tag that identifies recipient nodes."
optional: true
+ recipientConnectionString:
+ type: connection_string
+ description: >-
+ The connection string generated by the donor which it uses to
+ monitor the recipient for split acceptance.
+ optional: true
tenantIds:
type: array<string>
optional: true
diff --git a/src/mongo/idl/basic_types.idl b/src/mongo/idl/basic_types.idl
index b635ca97b0e..d124f29cdae 100644
--- a/src/mongo/idl/basic_types.idl
+++ b/src/mongo/idl/basic_types.idl
@@ -225,6 +225,13 @@ types:
serializer: mongo::NamespaceString::toString
deserializer: mongo::NamespaceString
+ connection_string:
+ bson_serialization_type: string
+ description: "A MongoDB ConnectionString"
+ cpp_type: "mongo::ConnectionString"
+ serializer: mongo::ConnectionString::toString
+ deserializer: mongo::ConnectionString::deserialize
+
fcv_string:
bson_serialization_type: string
description: >-