diff options
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service_test.cpp | 42 |
2 files changed, 64 insertions, 4 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index f85e865fd76..9ad1443b8ff 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -57,6 +57,7 @@ namespace mongo { namespace { MONGO_FAIL_POINT_DEFINE(abortShardSplitBeforeLeavingBlockingState); +MONGO_FAIL_POINT_DEFINE(pauseShardSplitRecipientListenerCompletion); MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeBlockingState); MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking); MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterDecision); @@ -152,7 +153,8 @@ namespace detail { SemiFuture<void> makeRecipientAcceptSplitFuture( std::shared_ptr<executor::TaskExecutor> taskExecutor, const CancellationToken& abortToken, - const ConnectionString& recipientConnectionString) { + const ConnectionString& recipientConnectionString, + const UUID migrationId) { // build a vector of single server discovery monitors to listen for heartbeats auto eventsPublisher = std::make_shared<sdam::TopologyEventsPublisher>(taskExecutor); @@ -186,13 +188,21 @@ SemiFuture<void> makeRecipientAcceptSplitFuture( // Preserve lifetime of listener and monitor until the future is fulfilled and remove the // listener. .onCompletion( - [monitors = std::move(monitors), listener, eventsPublisher, taskExecutor](Status s) { + [monitors = std::move(monitors), listener, eventsPublisher, taskExecutor, migrationId]( + Status s) { eventsPublisher->close(); for (auto& monitor : monitors) { monitor->shutdown(); } + LOGV2(6634900, + "Shutting down shard split recipient listener.", + "id"_attr = migrationId, + "recipientReady"_attr = listener->getFuture().isReady()); + + pauseShardSplitRecipientListenerCompletion.pauseWhileSet(); + return s; }) .semi(); @@ -677,7 +687,7 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOr "recipientConnectionString"_attr = recipientConnectionString); return detail::makeRecipientAcceptSplitFuture( - executor, abortToken, recipientConnectionString) + executor, abortToken, recipientConnectionString, _migrationId) .unsafeToInlineFuture(); }); @@ -850,6 +860,15 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken, const CancellationToken& abortToken) { + ON_BLOCK_EXIT([&] { + stdx::lock_guard<Latch> lg(_mutex); + if (_abortSource) { + // Cancel source to ensure all child threads (RSM monitor, etc) + // terminate. + _abortSource->cancel(); + } + }); + { stdx::lock_guard<Latch> lg(_mutex); if (isAbortedDocumentPersistent(lg, _stateDoc)) { @@ -863,7 +882,6 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( if (ErrorCodes::isNotPrimaryError(status) || ErrorCodes::isShutdownError(status)) { // Don't abort the split on retriable errors that may have been generated by the local // server shutting/stepping down because it can be resumed when the client retries. - return ExecutorFuture(**executor, statusWithState); } diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index 51d203ef6db..b1748419510 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -427,6 +427,48 @@ TEST_F(ShardSplitDonorServiceTest, ShardSplitDonorServiceTimeout) { ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } +// Verify the recipient acceptance listener stops when the shard split aborts due to an error. If it +// does not, the test will timeout. +TEST_F(ShardSplitDonorServiceTest, ListenerTerminatesOnError) { + auto opCtx = makeOperationContext(); + auto serviceContext = getServiceContext(); + + FailPointEnableBlock fp("abortShardSplitBeforeLeavingBlockingState"); + + auto listenerCompleted = + std::make_unique<FailPointEnableBlock>("pauseShardSplitRecipientListenerCompletion"); + const auto initialTimes = listenerCompleted->initialTimesEntered(); + + // Ensure the listener is created. + ShardSplitDonorService::DonorStateMachine::setSplitAcceptanceTaskExecutor_forTest(_executor); + _skipAcceptanceFP.reset(); + + test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); + test::shard_split::reconfigToAddRecipientNodes( + serviceContext, _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); + + auto stateDocument = defaultStateDocument(); + + auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( + opCtx.get(), _service, stateDocument.toBSON()); + ASSERT(serviceInstance.get()); + + auto result = serviceInstance->decisionFuture().get(opCtx.get()); + + ASSERT(!!result.abortReason); + ASSERT_EQ(result.abortReason->code(), ErrorCodes::InternalError); + ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted); + + serviceInstance->tryForget(); + + ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); + ASSERT_TRUE(serviceInstance->isGarbageCollectable()); + + // If the listener does not stop, this will hang indefinitely and the test will timeout. + (*listenerCompleted)->waitForTimesEntered(initialTimes + 1); + listenerCompleted.reset(); +} + // Abort scenario : abortSplit called before startSplit. TEST_F(ShardSplitDonorServiceTest, CreateInstanceInAbortState) { auto opCtx = makeOperationContext(); |