summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDidier Nadeau <didier.nadeau@mongodb.com>2022-03-14 21:09:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-14 21:44:47 +0000
commit30c1271b8675b65fbfb1c78f405aa6d08c5154e8 (patch)
tree41b80a3fa03519588ce7981de48cbb0865228f59
parent1d9bde37450950dbaba7fc2eae912c4e9ba9f679 (diff)
downloadmongo-30c1271b8675b65fbfb1c78f405aa6d08c5154e8.tar.gz
SERVER-64420 Fix split acceptance to successfully complete split operation
-rw-r--r--jstests/serverless/libs/basic_serverless_test.js8
-rw-r--r--jstests/serverless/shard_split_basic_test.js44
-rw-r--r--jstests/serverless/shard_split_enabled.js2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp5
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp121
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.h16
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp97
-rw-r--r--src/mongo/db/serverless/shard_split_utils.cpp29
-rw-r--r--src/mongo/db/serverless/shard_split_utils.h24
9 files changed, 224 insertions, 122 deletions
diff --git a/jstests/serverless/libs/basic_serverless_test.js b/jstests/serverless/libs/basic_serverless_test.js
index acf0e6e6877..9ad9ce565ac 100644
--- a/jstests/serverless/libs/basic_serverless_test.js
+++ b/jstests/serverless/libs/basic_serverless_test.js
@@ -10,6 +10,7 @@ class BasicServerlessTest {
}
stop() {
+ this.removeAndStopRecipientNodes();
// If we validate, it will try to list all collections and the migrated collections will
// return a TenantMigrationCommitted error.
this.donor.stopSet(undefined /* signal */, false /* forRestart */, {skipValidation: 1});
@@ -57,6 +58,13 @@ class BasicServerlessTest {
print("Removing and stopping recipient nodes");
this.recipientNodes.forEach(node => this.donor.remove(node));
}
+
+ /**
+ * Crafts a tenant database name.
+ */
+ tenantDB(tenantId, dbName) {
+ return `${tenantId}_${dbName}`;
+ }
}
function findMigration(primary, uuid) {
diff --git a/jstests/serverless/shard_split_basic_test.js b/jstests/serverless/shard_split_basic_test.js
new file mode 100644
index 00000000000..333fda6805f
--- /dev/null
+++ b/jstests/serverless/shard_split_basic_test.js
@@ -0,0 +1,44 @@
+/**
+ *
+ * Tests that runs a shard split to completion.
+ * @tags: [requires_fcv_52, featureFlagShardSplit]
+ */
+
+load("jstests/libs/fail_point_util.js"); // for "configureFailPoint"
+load('jstests/libs/parallel_shell_helpers.js'); // for "startParallelShell"
+load("jstests/serverless/libs/basic_serverless_test.js");
+
+(function() {
+"use strict";
+
+const recipientTagName = "recipientNode";
+const recipientSetName = "recipientSetName";
+const test = new BasicServerlessTest({
+ recipientTagName,
+ recipientSetName,
+ nodeOptions: {
+ // Set a short timeout to test that the operation times out waiting for replication
+ setParameter: "shardSplitTimeoutMS=100000"
+ }
+});
+
+test.addRecipientNodes();
+test.donor.awaitSecondaryNodes();
+
+const migrationId = UUID();
+
+jsTestLog("Running the commitShardSplit operation");
+const admin = test.donor.getPrimary().getDB("admin");
+assert.commandWorked(admin.runCommand({
+ commitShardSplit: 1,
+ migrationId,
+ tenantIds: ["tenant1", "tenant2"],
+ recipientTagName,
+ recipientSetName
+}));
+
+jsTestLog("Forgetting shard split");
+assert.commandWorked(test.donor.getPrimary().adminCommand({forgetShardSplit: 1, migrationId}));
+
+test.stop();
+})();
diff --git a/jstests/serverless/shard_split_enabled.js b/jstests/serverless/shard_split_enabled.js
index cb2075421b8..46a310b8574 100644
--- a/jstests/serverless/shard_split_enabled.js
+++ b/jstests/serverless/shard_split_enabled.js
@@ -79,8 +79,6 @@ function makeShardSplitTest() {
6236600,
`forgetShardSplit should reject when featureFlagShardSplit is disabled`);
- // shut down recipient nodes
- test.removeAndStopRecipientNodes();
// shut down replica set
test.stop();
};
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 6d356487893..dd7cb413540 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -716,6 +716,11 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
}
const auto configToApply = statusWithConf.getValue();
+ if (isSplitRecipientConfig) {
+ LOGV2(6309200,
+ "Applying a recipient split config for a shard split operation.",
+ "config"_attr = configToApply);
+ }
const StatusWith<int> myIndex = validateConfigForHeartbeatReconfig(
_externalState.get(), configToApply, getGlobalServiceContext());
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index e401f565c27..1af4c492723 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -43,8 +43,10 @@
#include "mongo/executor/cancelable_executor.h"
#include "mongo/executor/connection_pool.h"
#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/network_interface_thread_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
+#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/future_util.h"
#include "mongo/util/time_support.h"
@@ -114,83 +116,58 @@ const std::string kTTLIndexName = "ShardSplitDonorTTLIndex";
} // namespace
namespace detail {
-std::function<bool(const std::vector<sdam::ServerDescriptionPtr>&)>
-makeRecipientAcceptSplitPredicate(const ConnectionString& recipientConnectionString) {
- return [recipientConnectionString](const std::vector<sdam::ServerDescriptionPtr>& servers) {
- auto recipientNodeCount =
- static_cast<uint32_t>(recipientConnectionString.getServers().size());
- auto nodesReportingRecipientSetName =
- std::count_if(servers.begin(), servers.end(), [&](const auto& server) {
- return server->getSetName() &&
- *(server->getSetName()) == recipientConnectionString.getSetName();
- });
-
- return nodesReportingRecipientSetName == recipientNodeCount;
- };
-}
-
-SemiFuture<void> makeRecipientAcceptSplitFuture(ExecutorPtr executor,
- const CancellationToken& token,
- const StringData& recipientTagName,
- const StringData& recipientSetName) {
- class RecipientAcceptSplitListener : public sdam::TopologyListener {
- public:
- RecipientAcceptSplitListener(const ConnectionString& recipientConnectionString)
- : _predicate(makeRecipientAcceptSplitPredicate(recipientConnectionString)) {}
- void onTopologyDescriptionChangedEvent(TopologyDescriptionPtr previousDescription,
- TopologyDescriptionPtr newDescription) final {
- stdx::lock_guard<Latch> lg(_mutex);
- if (_fulfilled) {
- return;
- }
-
- if (_predicate(newDescription->getServers())) {
- _fulfilled = true;
- _promise.emplaceValue();
- }
- }
-
- // Fulfilled when all nodes have accepted the split.
- SharedSemiFuture<void> getFuture() const {
- return _promise.getFuture();
- }
- private:
- bool _fulfilled = false;
- std::function<bool(const std::vector<sdam::ServerDescriptionPtr>&)> _predicate;
- SharedPromise<void> _promise;
- mutable Mutex _mutex =
- MONGO_MAKE_LATCH("ShardSplitDonorService::getRecipientAcceptSplitFuture::_mutex");
- };
+SemiFuture<void> makeRecipientAcceptSplitFuture(
+ std::shared_ptr<executor::TaskExecutor> taskExecutor,
+ const CancellationToken& token,
+ const StringData& recipientTagName,
+ const StringData& recipientSetName) {
auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext());
invariant(replCoord);
auto recipientConnectionString = serverless::makeRecipientConnectionString(
replCoord->getConfig(), recipientTagName, recipientSetName);
- auto monitor = ReplicaSetMonitor::createIfNeeded(MongoURI{recipientConnectionString});
- invariant(monitor);
-
- // Only StreamableReplicaSetMonitor derives ReplicaSetMonitor. Therefore static cast is
- // possible
- auto streamableMonitor = checked_pointer_cast<StreamableReplicaSetMonitor>(monitor);
- auto listener = std::make_shared<RecipientAcceptSplitListener>(recipientConnectionString);
- streamableMonitor->getEventsPublisher()->registerListener(listener);
+ // build a vector of single server discovery monitors to listen for heartbeats
+ auto eventsPublisher = std::make_shared<sdam::TopologyEventsPublisher>(taskExecutor);
+
+ auto listener = std::make_shared<mongo::serverless::RecipientAcceptSplitListener>(
+ recipientConnectionString);
+ eventsPublisher->registerListener(listener);
+
+ auto managerStats = std::make_shared<ReplicaSetMonitorManagerStats>();
+ auto stats = std::make_shared<ReplicaSetMonitorStats>(managerStats);
+ auto recipientNodes = recipientConnectionString.getServers();
+
+ std::vector<SingleServerDiscoveryMonitorPtr> monitors;
+ for (const auto& server : recipientNodes) {
+ SdamConfiguration sdamConfiguration(std::vector<HostAndPort>{server});
+ auto connectionString = ConnectionString::forStandalones(std::vector<HostAndPort>{server});
+
+ monitors.push_back(
+ std::make_shared<SingleServerDiscoveryMonitor>(MongoURI{connectionString},
+ server,
+ boost::none,
+ sdamConfiguration,
+ eventsPublisher,
+ taskExecutor,
+ stats));
+ monitors.back()->init();
+ }
LOGV2(6142508,
"Monitoring recipient nodes for split acceptance.",
"recipientConnectionString"_attr = recipientConnectionString);
return future_util::withCancellation(listener->getFuture(), token)
- .thenRunOn(executor)
+ .thenRunOn(taskExecutor)
// Preserve lifetime of listener and monitor until the future is fulfilled and remove the
// listener.
- .onCompletion([listener, monitor = streamableMonitor](Status s) {
- monitor->getEventsPublisher()->removeListener(listener);
- return s;
- })
+ .onCompletion([monitors = std::move(monitors), listener, eventsPublisher, taskExecutor](
+ Status s) { return s; })
.semi();
}
+
} // namespace detail
ThreadPool::Limits ShardSplitDonorService::getThreadPoolLimits() const {
@@ -358,10 +335,16 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
.then([this, executor, abortToken] {
checkForTokenInterrupt(abortToken);
_cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
- _createReplicaSetMonitor(executor, abortToken);
+ _createReplicaSetMonitor(abortToken);
return _enterBlockingState(executor, abortToken);
})
- .then([this] { pauseShardSplitAfterBlocking.pauseWhileSet(); })
+ .then([this, abortToken] {
+ if (MONGO_unlikely(pauseShardSplitAfterBlocking.shouldFail())) {
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ pauseShardSplitAfterBlocking.pauseWhileSetAndNotCanceled(opCtx.get(),
+ abortToken);
+ }
+ })
.then([this, executor, abortToken] {
return _waitForRecipientToReachBlockTimestamp(executor, abortToken);
})
@@ -738,7 +721,14 @@ void ShardSplitDonorService::DonorStateMachine::_initiateTimeout(
}
void ShardSplitDonorService::DonorStateMachine::_createReplicaSetMonitor(
- const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) {
+ const CancellationToken& abortToken) {
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) {
+ return;
+ }
+ }
+
auto future = [&]() {
stdx::lock_guard<Latch> lg(_mutex);
if (MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { // Test-only.
@@ -751,7 +741,10 @@ void ShardSplitDonorService::DonorStateMachine::_createReplicaSetMonitor(
invariant(recipientSetName);
return detail::makeRecipientAcceptSplitFuture(
- **executor, abortToken, *recipientTagName, *recipientSetName);
+ _shardSplitService->getInstanceCleanupExecutor(),
+ abortToken,
+ *recipientTagName,
+ *recipientSetName);
}();
_recipientAcceptedSplit.setFrom(std::move(future).unsafeToInlineFuture());
diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h
index dc69d8cb1eb..d001bba0de1 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.h
+++ b/src/mongo/db/serverless/shard_split_donor_service.h
@@ -41,13 +41,14 @@ namespace mongo {
using ScopedTaskExecutorPtr = std::shared_ptr<executor::ScopedTaskExecutor>;
namespace detail {
-std::function<bool(const std::vector<sdam::ServerDescriptionPtr>&)>
-makeRecipientAcceptSplitPredicate(const ConnectionString& recipientConnectionString);
-SemiFuture<void> makeRecipientAcceptSplitFuture(ExecutorPtr executor,
- const CancellationToken& token,
- const StringData& recipientTagName,
- const StringData& recipientSetName);
+SemiFuture<void> makeRecipientAcceptSplitFuture(
+ ExecutorPtr executor,
+ std::shared_ptr<executor::TaskExecutor> taskExecutor,
+ const CancellationToken& token,
+ const StringData& recipientTagName,
+ const StringData& recipientSetName);
+
}; // namespace detail
class ShardSplitDonorService final : public repl::PrimaryOnlyService {
@@ -182,8 +183,7 @@ private:
void _initiateTimeout(const ScopedTaskExecutorPtr& executor,
const CancellationToken& abortToken);
- void _createReplicaSetMonitor(const ScopedTaskExecutorPtr& executor,
- const CancellationToken& abortToken);
+ void _createReplicaSetMonitor(const CancellationToken& abortToken);
ExecutorFuture<DurableState> _handleErrorOrEnterAbortedState(
StatusWith<DurableState> durableState,
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index 415ddbace97..069c4926fb3 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -231,6 +231,8 @@ protected:
};
TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) {
+ FailPointEnableBlock fp("skipShardSplitWaitForSplitAcceptance");
+
auto opCtx = makeOperationContext();
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
test::shard_split::reconfigToAddRecipientNodes(
@@ -279,6 +281,9 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
}
TEST_F(ShardSplitDonorServiceTest, ShardSplitDonorServiceTimeout) {
+ FailPointEnableBlock fp("pauseShardSplitAfterBlocking");
+ FailPointEnableBlock fp2("skipShardSplitWaitForSplitAcceptance");
+
auto opCtx = makeOperationContext();
auto serviceContext = getServiceContext();
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
@@ -509,75 +514,71 @@ protected:
std::string _recipientSetName{_validRepl.getURI().getSetName()};
};
-TEST_F(SplitReplicaSetObserverTest, SupportsCancellation) {
- test::shard_split::reconfigToAddRecipientNodes(
- getServiceContext(), _recipientTagName, _validRepl.getHosts(), _recipientSet.getHosts());
-
- CancellationSource source;
- auto future = detail::makeRecipientAcceptSplitFuture(
- _executor, source.token(), _recipientTagName, _recipientSetName);
+TEST_F(SplitReplicaSetObserverTest, FutureReady) {
+ auto listener =
+ mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString());
- ASSERT_FALSE(future.isReady());
- source.cancel();
+ for (const auto& host : _validRepl.getHosts()) {
+ ASSERT_FALSE(listener.getFuture().isReady());
+ listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << _validRepl.getSetName()));
+ }
- ASSERT_EQ(future.getNoThrow().code(), ErrorCodes::CallbackCanceled);
+ ASSERT_TRUE(listener.getFuture().isReady());
}
-TEST_F(SplitReplicaSetObserverTest, GetRecipientAcceptSplitFutureTest) {
- test::shard_split::reconfigToAddRecipientNodes(
- getServiceContext(), _recipientTagName, _validRepl.getHosts(), _recipientSet.getHosts());
+TEST_F(SplitReplicaSetObserverTest, FutureReadyNameChange) {
+ auto listener =
+ mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString());
- CancellationSource source;
- auto future = detail::makeRecipientAcceptSplitFuture(
- _executor, source.token(), _recipientTagName, _recipientSetName);
+ for (const auto& host : _validRepl.getHosts()) {
+ listener.onServerHeartbeatSucceededEvent(host,
+ BSON("setName"
+ << "donorSetName"));
+ }
- std::shared_ptr<TopologyDescription> topologyDescriptionOld =
- std::make_shared<sdam::TopologyDescription>(sdam::SdamConfiguration());
- std::shared_ptr<TopologyDescription> topologyDescriptionNew =
- makeRecipientTopologyDescription(_validRepl);
+ ASSERT_FALSE(listener.getFuture().isReady());
- _publisher->onTopologyDescriptionChangedEvent(topologyDescriptionOld, topologyDescriptionNew);
+ for (const auto& host : _validRepl.getHosts()) {
+ listener.onServerHeartbeatSucceededEvent(host, BSON("setName" << _validRepl.getSetName()));
+ }
- future.wait();
+ ASSERT_TRUE(listener.getFuture().isReady());
}
TEST_F(SplitReplicaSetObserverTest, FutureNotReadyMissingNodes) {
- auto predicate =
- detail::makeRecipientAcceptSplitPredicate(_validRepl.getURI().connectionString());
+ auto listener =
+ mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString());
- std::shared_ptr<TopologyDescription> topologyDescriptionNew =
- makeRecipientTopologyDescription(_validRepl);
- topologyDescriptionNew->removeServerDescription(_validRepl.getHosts()[0]);
+ for (size_t i = 0; i < _validRepl.getHosts().size() - 1; ++i) {
+ listener.onServerHeartbeatSucceededEvent(_validRepl.getHosts()[i],
+ BSON("setName" << _validRepl.getSetName()));
+ }
- ASSERT_FALSE(predicate(topologyDescriptionNew->getServers()));
+ ASSERT_FALSE(listener.getFuture().isReady());
}
-TEST_F(SplitReplicaSetObserverTest, FutureNotReadyWrongSet) {
- auto predicate =
- detail::makeRecipientAcceptSplitPredicate(_validRepl.getURI().connectionString());
+TEST_F(SplitReplicaSetObserverTest, FutureNotReadyNoSetName) {
+ auto listener =
+ mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString());
- std::shared_ptr<TopologyDescription> topologyDescriptionNew =
- makeRecipientTopologyDescription(_invalidRepl);
+ for (size_t i = 0; i < _validRepl.getHosts().size() - 1; ++i) {
+ listener.onServerHeartbeatSucceededEvent(_validRepl.getHosts()[i], BSONObj());
+ }
- ASSERT_FALSE(predicate(topologyDescriptionNew->getServers()));
+ ASSERT_FALSE(listener.getFuture().isReady());
}
-TEST_F(SplitReplicaSetObserverTest, ExecutorCanceled) {
- test::shard_split::reconfigToAddRecipientNodes(
- getServiceContext(), _recipientTagName, _validRepl.getHosts(), _recipientSet.getHosts());
-
- CancellationSource source;
- auto future = detail::makeRecipientAcceptSplitFuture(
- _executor, source.token(), _recipientTagName, _recipientSetName);
-
- _executor->shutdown();
- _executor->join();
+TEST_F(SplitReplicaSetObserverTest, FutureNotReadyWrongSet) {
+ auto listener =
+ mongo::serverless::RecipientAcceptSplitListener(_validRepl.getURI().connectionString());
- ASSERT_FALSE(future.isReady());
+ for (const auto& host : _validRepl.getHosts()) {
+ listener.onServerHeartbeatSucceededEvent(host,
+ BSON("setName"
+ << "wrongSetName"));
+ }
- // Ensure the test does not hang.
- source.cancel();
- ASSERT_EQ(future.getNoThrow().code(), ErrorCodes::ShutdownInProgress);
+ ASSERT_FALSE(listener.getFuture().isReady());
}
class ShardSplitRecipientCleanupTest : public ShardSplitDonorServiceTest {
diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp
index f2db7aaf4d1..16e9b7f5a59 100644
--- a/src/mongo/db/serverless/shard_split_utils.cpp
+++ b/src/mongo/db/serverless/shard_split_utils.cpp
@@ -238,5 +238,34 @@ bool shouldRemoveStateDocumentOnRecipient(OperationContext* opCtx,
stateDocument.getState() == ShardSplitDonorStateEnum::kBlocking;
}
+RecipientAcceptSplitListener::RecipientAcceptSplitListener(
+ const ConnectionString& recipientConnectionString)
+ : _numberOfRecipient(recipientConnectionString.getServers().size()),
+ _recipientSetName(recipientConnectionString.getSetName()) {}
+
+void RecipientAcceptSplitListener::onServerHeartbeatSucceededEvent(const HostAndPort& hostAndPort,
+ const BSONObj reply) {
+ if (_fulfilled.load() || !reply["setName"]) {
+ return;
+ }
+
+ stdx::lock_guard<Latch> lg(_mutex);
+ _reportedSetNames[hostAndPort] = reply["setName"].str();
+
+ auto allReportCorrectly =
+ std::all_of(_reportedSetNames.begin(),
+ _reportedSetNames.end(),
+ [&](const auto& entry) { return entry.second == _recipientSetName; }) &&
+ _reportedSetNames.size() == _numberOfRecipient;
+ if (allReportCorrectly) {
+ _fulfilled.store(true);
+ _promise.emplaceValue();
+ }
+}
+
+SharedSemiFuture<void> RecipientAcceptSplitListener::getFuture() const {
+ return _promise.getFuture();
+}
+
} // namespace serverless
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_utils.h b/src/mongo/db/serverless/shard_split_utils.h
index ec2f1becd58..19978fba1da 100644
--- a/src/mongo/db/serverless/shard_split_utils.h
+++ b/src/mongo/db/serverless/shard_split_utils.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/client/sdam/topology_listener.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
@@ -116,6 +117,29 @@ StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx,
bool shouldRemoveStateDocumentOnRecipient(OperationContext* opCtx,
const ShardSplitDonorDocument& stateDocument);
+/**
+ * Listener that receives heartbeat events and fulfills a future once it sees the expected number
+ * of nodes in the recipient replica set to monitor.
+ */
+class RecipientAcceptSplitListener : public sdam::TopologyListener {
+public:
+ RecipientAcceptSplitListener(const ConnectionString& recipientConnectionString);
+
+ void onServerHeartbeatSucceededEvent(const HostAndPort& hostAndPort, BSONObj reply) final;
+
+ // Fulfilled when all nodes have accepted the split.
+ SharedSemiFuture<void> getFuture() const;
+
+private:
+ mutable Mutex _mutex =
+ MONGO_MAKE_LATCH("ShardSplitDonorService::getRecipientAcceptSplitFuture::_mutex");
+
+ AtomicWord<bool> _fulfilled{false};
+ const size_t _numberOfRecipient;
+ std::string _recipientSetName;
+ std::map<HostAndPort, std::string> _reportedSetNames;
+ SharedPromise<void> _promise;
+};
} // namespace serverless
} // namespace mongo