From 07d0be8c575c796065fdc18d05723bd92f66c8f2 Mon Sep 17 00:00:00 2001 From: Paolo Polato Date: Tue, 1 Mar 2022 10:52:15 +0000 Subject: SERVER-60920 BalancesCommandsScheduler completes every outstanding command before stopping --- .../balancer/balancer_commands_scheduler_impl.cpp | 80 ++++++++++------------ .../s/balancer/balancer_commands_scheduler_impl.h | 36 +++------- .../balancer/balancer_commands_scheduler_test.cpp | 2 +- 3 files changed, 50 insertions(+), 68 deletions(-) (limited to 'src/mongo/db/s') diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp index e62202236ba..40ed0cc032f 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp @@ -404,10 +404,7 @@ CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit( auto swRemoteCommandHandle = _executor->scheduleRemoteCommand(remoteCommand, onRemoteResponseReceived); - return ( - swRemoteCommandHandle.isOK() - ? CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getValue()) - : CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getStatus())); + return CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getStatus()); } void BalancerCommandsSchedulerImpl::_applySubmissionResult( @@ -430,10 +427,7 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse( UUID requestId, const executor::RemoteCommandResponse& response) { { stdx::lock_guard lg(_mutex); - if (_state == SchedulerState::Stopping || _state == SchedulerState::Stopped) { - // Drop the response - the request is being cancelled in the worker thread. - return; - } + invariant(_state != SchedulerState::Stopped); auto requestIt = _requests.find(requestId); invariant(requestIt != _requests.end()); auto& request = requestIt->second; @@ -453,9 +447,14 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse( } void BalancerCommandsSchedulerImpl::_performDeferredCleanup( - OperationContext* opCtx, std::vector&& requestsHoldingResources) { + OperationContext* opCtx, + const stdx::unordered_map& requestsHoldingResources) { + if (requestsHoldingResources.empty()) { + return; + } + DBDirectClient dbClient(opCtx); - for (const auto& request : requestsHoldingResources) { + for (const auto& [_, request] : requestsHoldingResources) { if (request.holdsDistributedLock()) { _distributedLocks.releaseFor(opCtx, request.getNamespace()); } @@ -463,9 +462,8 @@ void BalancerCommandsSchedulerImpl::_performDeferredCleanup( deletePersistedRecoveryInfo(dbClient, request.getCommandInfo()); } } - if (!requestsHoldingResources.empty()) { - deferredCleanupCompletedCheckpoint.pauseWhileSet(); - } + + deferredCleanupCompletedCheckpoint.pauseWhileSet(); } void BalancerCommandsSchedulerImpl::_workerThread() { @@ -478,19 +476,18 @@ void BalancerCommandsSchedulerImpl::_workerThread() { Client::initThread("BalancerCommandsScheduler"); bool stopWorkerRequested = false; - stdx::unordered_map requestsToCleanUpOnExit; LOGV2(5847205, "Balancer scheduler thread started"); while (!stopWorkerRequested) { std::vector commandsToSubmit; std::vector submissionResults; - std::vector completedRequestsToCleanUp; + stdx::unordered_map completedRequestsToCleanUp; // 1. Check the internal state and plan for the actions to be taken ont this round. { stdx::unique_lock ul(_mutex); invariant(_state != SchedulerState::Stopped); - _stateUpdatedCV.wait(ul, [this, &ul] { + _stateUpdatedCV.wait(ul, [this] { return ((!_unsubmittedRequestIds.empty() && !MONGO_likely(pauseSubmissionsFailPoint.shouldFail())) || _state == SchedulerState::Stopping || @@ -499,30 +496,32 @@ void BalancerCommandsSchedulerImpl::_workerThread() { for (const auto& requestId : _recentlyCompletedRequestIds) { auto it = _requests.find(requestId); - completedRequestsToCleanUp.emplace_back(std::move(it->second)); + completedRequestsToCleanUp.emplace(it->first, std::move(it->second)); _requests.erase(it); } _recentlyCompletedRequestIds.clear(); - if (_state == SchedulerState::Stopping) { - // Reset the internal state and prepare to leave - _unsubmittedRequestIds.clear(); - _requests.swap(requestsToCleanUpOnExit); - stopWorkerRequested = true; - } else { - // Pick up new commands to be submitted - for (const auto& requestId : _unsubmittedRequestIds) { - const auto& requestData = _requests.at(requestId); + for (const auto& requestId : _unsubmittedRequestIds) { + auto& requestData = _requests.at(requestId); + if (_state != SchedulerState::Stopping) { commandsToSubmit.push_back(requestData.getSubmissionParameters()); + } else { + requestData.setOutcome( + Status(ErrorCodes::BalancerInterrupted, + "Request cancelled - balancer scheduler is stopping")); + completedRequestsToCleanUp.emplace(requestId, std::move(requestData)); + _requests.erase(requestId); } - _unsubmittedRequestIds.clear(); } + _unsubmittedRequestIds.clear(); + stopWorkerRequested = _state == SchedulerState::Stopping; } // 2.a Free any resource acquired by already completed/aborted requests. { auto opCtxHolder = cc().makeOperationContext(); - _performDeferredCleanup(opCtxHolder.get(), std::move(completedRequestsToCleanUp)); + _performDeferredCleanup(opCtxHolder.get(), completedRequestsToCleanUp); + completedRequestsToCleanUp.clear(); } // 2.b Serve the picked up requests, submitting their related commands. @@ -532,11 +531,11 @@ void BalancerCommandsSchedulerImpl::_workerThread() { submissionInfo.commandInfo.get()->attachOperationMetadataTo(opCtxHolder.get()); } submissionResults.push_back(_submit(opCtxHolder.get(), submissionInfo)); - if (!submissionResults.back().context.isOK()) { + if (!submissionResults.back().outcome.isOK()) { LOGV2(5847206, "Submission for scheduler command request failed", "reqId"_attr = submissionResults.back().id, - "cause"_attr = submissionResults.back().context.getStatus()); + "cause"_attr = submissionResults.back().outcome); } } @@ -548,18 +547,15 @@ void BalancerCommandsSchedulerImpl::_workerThread() { } } } - - // In case of clean exit, cancel all the pending/running command requests - // (but keep the related descriptor documents to ensure they will be reissued on recovery). - auto opCtxHolder = cc().makeOperationContext(); - for (auto& idAndRequest : requestsToCleanUpOnExit) { - idAndRequest.second.setOutcome(Status( - ErrorCodes::BalancerInterrupted, "Request cancelled - balancer scheduler is stopping")); - const auto& cancelHandle = idAndRequest.second.getExecutionContext(); - if (cancelHandle) { - _executor->cancel(*cancelHandle); - } - _distributedLocks.releaseFor(opCtxHolder.get(), idAndRequest.second.getNamespace()); + // Wait for each outstanding command to complete, clean out its resources and leave. + { + stdx::unique_lock ul(_mutex); + _stateUpdatedCV.wait( + ul, [this] { return (_requests.size() == _recentlyCompletedRequestIds.size()); }); + auto opCtxHolder = cc().makeOperationContext(); + _performDeferredCleanup(opCtxHolder.get(), _requests); + _requests.clear(); + _recentlyCompletedRequestIds.clear(); } } diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h index e127767a69c..536687848c0 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h @@ -399,21 +399,17 @@ struct CommandSubmissionParameters { const std::shared_ptr commandInfo; }; - -using ExecutionContext = executor::TaskExecutor::CallbackHandle; - /** * Helper data structure for storing the outcome of a Command submission. */ struct CommandSubmissionResult { - CommandSubmissionResult(UUID id, bool acquiredDistLock, StatusWith&& context) - : id(id), acquiredDistLock(acquiredDistLock), context(std::move(context)) {} - CommandSubmissionResult(CommandSubmissionResult&& rhs) - : id(rhs.id), acquiredDistLock(rhs.acquiredDistLock), context(std::move(rhs.context)) {} + CommandSubmissionResult(UUID id, bool acquiredDistLock, const Status& outcome) + : id(id), acquiredDistLock(acquiredDistLock), outcome(outcome) {} + CommandSubmissionResult(CommandSubmissionResult&& rhs) = default; CommandSubmissionResult(const CommandSubmissionResult& rhs) = delete; UUID id; bool acquiredDistLock; - StatusWith context; + Status outcome; }; /** @@ -427,8 +423,7 @@ public: _completedOrAborted(false), _holdingDistLock(false), _commandInfo(std::move(commandInfo)), - _responsePromise{NonNullPromiseTag{}}, - _executionContext(boost::none) { + _responsePromise{NonNullPromiseTag{}} { invariant(_commandInfo); } @@ -437,8 +432,7 @@ public: _completedOrAborted(rhs._completedOrAborted), _holdingDistLock(rhs._holdingDistLock), _commandInfo(std::move(rhs._commandInfo)), - _responsePromise(std::move(rhs._responsePromise)), - _executionContext(std::move(rhs._executionContext)) {} + _responsePromise(std::move(rhs._responsePromise)) {} ~RequestData() = default; @@ -458,21 +452,14 @@ public: // Keep the original outcome and continue the workflow. return Status::OK(); } - auto submissionStatus = submissionResult.context.getStatus(); - if (submissionStatus.isOK()) { - // store the execution context to be able to serve future cancel requests. - _executionContext = std::move(submissionResult.context.getValue()); - } else { + const auto& submissionStatus = submissionResult.outcome; + if (!submissionStatus.isOK()) { // cascade the submission failure setOutcome(submissionStatus); } return submissionStatus; } - const boost::optional& getExecutionContext() { - return _executionContext; - } - const CommandInfo& getCommandInfo() const { return *_commandInfo; } @@ -512,8 +499,6 @@ private: std::shared_ptr _commandInfo; Promise _responsePromise; - - boost::optional _executionContext; }; /** @@ -618,8 +603,9 @@ private: void _enqueueRequest(WithLock, RequestData&& request); - void _performDeferredCleanup(OperationContext* opCtx, - std::vector&& requestsHoldingResources); + void _performDeferredCleanup( + OperationContext* opCtx, + const stdx::unordered_map& requestsHoldingResources); CommandSubmissionResult _submit(OperationContext* opCtx, const CommandSubmissionParameters& data); diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp index f65638ecb22..18c5dc086a1 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp @@ -323,7 +323,7 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenSchedulerIsStopped) { } } -TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) { +TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfUnsubmittedBeforeBalancerStops) { SemiFuture futureResponse; { FailPointEnableBlock failPoint("pauseSubmissionsFailPoint"); -- cgit v1.2.1