summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp26
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp42
2 files changed, 64 insertions, 4 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index f85e865fd76..9ad1443b8ff 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -57,6 +57,7 @@ namespace mongo {
namespace {
MONGO_FAIL_POINT_DEFINE(abortShardSplitBeforeLeavingBlockingState);
+MONGO_FAIL_POINT_DEFINE(pauseShardSplitRecipientListenerCompletion);
MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeBlockingState);
MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking);
MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterDecision);
@@ -152,7 +153,8 @@ namespace detail {
SemiFuture<void> makeRecipientAcceptSplitFuture(
std::shared_ptr<executor::TaskExecutor> taskExecutor,
const CancellationToken& abortToken,
- const ConnectionString& recipientConnectionString) {
+ const ConnectionString& recipientConnectionString,
+ const UUID migrationId) {
// build a vector of single server discovery monitors to listen for heartbeats
auto eventsPublisher = std::make_shared<sdam::TopologyEventsPublisher>(taskExecutor);
@@ -186,13 +188,21 @@ SemiFuture<void> makeRecipientAcceptSplitFuture(
// Preserve lifetime of listener and monitor until the future is fulfilled and remove the
// listener.
.onCompletion(
- [monitors = std::move(monitors), listener, eventsPublisher, taskExecutor](Status s) {
+ [monitors = std::move(monitors), listener, eventsPublisher, taskExecutor, migrationId](
+ Status s) {
eventsPublisher->close();
for (auto& monitor : monitors) {
monitor->shutdown();
}
+ LOGV2(6634900,
+ "Shutting down shard split recipient listener.",
+ "id"_attr = migrationId,
+ "recipientReady"_attr = listener->getFuture().isReady());
+
+ pauseShardSplitRecipientListenerCompletion.pauseWhileSet();
+
return s;
})
.semi();
@@ -677,7 +687,7 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOr
"recipientConnectionString"_attr = recipientConnectionString);
return detail::makeRecipientAcceptSplitFuture(
- executor, abortToken, recipientConnectionString)
+ executor, abortToken, recipientConnectionString, _migrationId)
.unsafeToInlineFuture();
});
@@ -850,6 +860,15 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
const ScopedTaskExecutorPtr& executor,
const CancellationToken& primaryToken,
const CancellationToken& abortToken) {
+ ON_BLOCK_EXIT([&] {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_abortSource) {
+ // Cancel source to ensure all child threads (RSM monitor, etc)
+ // terminate.
+ _abortSource->cancel();
+ }
+ });
+
{
stdx::lock_guard<Latch> lg(_mutex);
if (isAbortedDocumentPersistent(lg, _stateDoc)) {
@@ -863,7 +882,6 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
if (ErrorCodes::isNotPrimaryError(status) || ErrorCodes::isShutdownError(status)) {
// Don't abort the split on retriable errors that may have been generated by the local
// server shutting/stepping down because it can be resumed when the client retries.
-
return ExecutorFuture(**executor, statusWithState);
}
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index 51d203ef6db..b1748419510 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -427,6 +427,48 @@ TEST_F(ShardSplitDonorServiceTest, ShardSplitDonorServiceTimeout) {
ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
+// Verify the recipient acceptance listener stops when the shard split aborts due to an error. If it
+// does not, the test will timeout.
+TEST_F(ShardSplitDonorServiceTest, ListenerTerminatesOnError) {
+ auto opCtx = makeOperationContext();
+ auto serviceContext = getServiceContext();
+
+ FailPointEnableBlock fp("abortShardSplitBeforeLeavingBlockingState");
+
+ auto listenerCompleted =
+ std::make_unique<FailPointEnableBlock>("pauseShardSplitRecipientListenerCompletion");
+ const auto initialTimes = listenerCompleted->initialTimesEntered();
+
+ // Ensure the listener is created.
+ ShardSplitDonorService::DonorStateMachine::setSplitAcceptanceTaskExecutor_forTest(_executor);
+ _skipAcceptanceFP.reset();
+
+ test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
+ test::shard_split::reconfigToAddRecipientNodes(
+ serviceContext, _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts());
+
+ auto stateDocument = defaultStateDocument();
+
+ auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate(
+ opCtx.get(), _service, stateDocument.toBSON());
+ ASSERT(serviceInstance.get());
+
+ auto result = serviceInstance->decisionFuture().get(opCtx.get());
+
+ ASSERT(!!result.abortReason);
+ ASSERT_EQ(result.abortReason->code(), ErrorCodes::InternalError);
+ ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted);
+
+ serviceInstance->tryForget();
+
+ ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
+ ASSERT_TRUE(serviceInstance->isGarbageCollectable());
+
+ // If the listener does not stop, this will hang indefinitely and the test will timeout.
+ (*listenerCompleted)->waitForTimesEntered(initialTimes + 1);
+ listenerCompleted.reset();
+}
+
// Abort scenario : abortSplit called before startSplit.
TEST_F(ShardSplitDonorServiceTest, CreateInstanceInAbortState) {
auto opCtx = makeOperationContext();