diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2021-11-11 21:58:55 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-11-11 23:13:16 +0000 |
commit | bf817cb2c73a75eb51c5ff7de327b6d2cdeccbef (patch) | |
tree | 1cdb6fe2f765f6b29446d20be07b63d246daeb58 /src | |
parent | 84d485c25a0d238467048200bb7e894ada67bedc (diff) | |
download | mongo-bf817cb2c73a75eb51c5ff7de327b6d2cdeccbef.tar.gz |
SERVER-60921 Simplify and optimise the BalancerCommandsScheduler
Diffstat (limited to 'src')
3 files changed, 163 insertions, 198 deletions
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp index 1f923f4edbc..e835b8c73bd 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp @@ -42,7 +42,7 @@ namespace mongo { -MONGO_FAIL_POINT_DEFINE(pauseBalancerWorkerThread); +MONGO_FAIL_POINT_DEFINE(pauseSubmissionsFailPoint); MONGO_FAIL_POINT_DEFINE(deferredCleanupCompletedCheckpoint); const std::string MergeChunksCommandInfo::kCommandName = "mergeChunks"; @@ -81,15 +81,11 @@ BalancerCommandsSchedulerImpl::~BalancerCommandsSchedulerImpl() { void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx) { LOGV2(5847200, "Balancer command scheduler start requested"); - stdx::lock_guard<Latch> lgss(_startStopMutex); stdx::lock_guard<Latch> lg(_mutex); - if (_state == SchedulerState::Recovering || _state == SchedulerState::Running) { - return; - } - invariant(!_workerThreadHandle.joinable()); auto requestsToRecover = _loadRequestsToRecover(opCtx); - _state = requestsToRecover.empty() ? SchedulerState::Running : SchedulerState::Recovering; + _numRequestsToRecover = requestsToRecover.size(); + _state = _numRequestsToRecover == 0 ? SchedulerState::Running : SchedulerState::Recovering; for (auto& requestToRecover : requestsToRecover) { _enqueueRequest(lg, std::move(requestToRecover)); @@ -100,7 +96,6 @@ void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx) { void BalancerCommandsSchedulerImpl::stop() { LOGV2(5847201, "Balancer command scheduler stop requested"); - stdx::lock_guard<Latch> lgss(_startStopMutex); { stdx::lock_guard<Latch> lg(_mutex); if (_state == SchedulerState::Stopped) { @@ -260,8 +255,8 @@ ResponseHandle BalancerCommandsSchedulerImpl::_enqueueRequest(WithLock, RequestD auto requestId = request.getId(); auto deferredResponseHandle = request.getResponseHandle(); if (_state == SchedulerState::Recovering || _state == SchedulerState::Running) { - _incompleteRequests.emplace(std::make_pair(requestId, std::move(request))); - _pendingRequestIds.push_back(requestId); + _requests.emplace(std::make_pair(requestId, std::move(request))); + _unsubmittedRequestIds.push_back(requestId); _stateUpdatedCV.notify_all(); } else { deferredResponseHandle.handle->set(Status( @@ -271,90 +266,65 @@ ResponseHandle BalancerCommandsSchedulerImpl::_enqueueRequest(WithLock, RequestD return deferredResponseHandle; } -bool BalancerCommandsSchedulerImpl::_canSubmitNewRequests(WithLock) { - return (!_pendingRequestIds.empty() && MONGO_likely(!pauseBalancerWorkerThread.shouldFail())); -} - -bool BalancerCommandsSchedulerImpl::_deferredCleanupRequired(WithLock) { - return (!_obsoleteRecoveryDocumentIds.empty() || !_lockedReferencesToRelease.empty()); -} - CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit( - OperationContext* opCtx, const CommandSubmissionHandle& handle) { - LOGV2(5847203, "Balancer command request submitted for execution", "reqId"_attr = handle.id); + OperationContext* opCtx, const CommandSubmissionParameters& params) { + LOGV2(5847203, "Balancer command request submitted for execution", "reqId"_attr = params.id); + bool distLockTaken = false; auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); const auto shardWithStatus = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, handle.commandInfo->getTarget()); + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, params.commandInfo->getTarget()); if (!shardWithStatus.isOK()) { - return CommandSubmissionResult(handle.id, false, shardWithStatus.getStatus()); + return CommandSubmissionResult(params.id, distLockTaken, shardWithStatus.getStatus()); } const auto shardHostWithStatus = shardWithStatus.getValue()->getTargeter()->findHost( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); if (!shardHostWithStatus.isOK()) { - return CommandSubmissionResult(handle.id, false, shardHostWithStatus.getStatus()); + return CommandSubmissionResult(params.id, distLockTaken, shardHostWithStatus.getStatus()); } const executor::RemoteCommandRequest remoteCommand(shardHostWithStatus.getValue(), NamespaceString::kAdminDb.toString(), - handle.commandInfo->serialise(), + params.commandInfo->serialise(), opCtx); auto onRemoteResponseReceived = [this, - requestId = handle.id](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + requestId = params.id](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { _applyCommandResponse(requestId, args.response); }; - if (handle.commandInfo->requiresDistributedLock()) { + if (params.commandInfo->requiresDistributedLock()) { Status lockAcquisitionResponse = - _distributedLocks.acquireFor(opCtx, handle.commandInfo->getNameSpace()); + _distributedLocks.acquireFor(opCtx, params.commandInfo->getNameSpace()); if (!lockAcquisitionResponse.isOK()) { - return CommandSubmissionResult(handle.id, false, lockAcquisitionResponse); + return CommandSubmissionResult(params.id, distLockTaken, lockAcquisitionResponse); } + distLockTaken = true; } - auto remoteCommandHandleWithStatus = + auto swRemoteCommandHandle = executor->scheduleRemoteCommand(remoteCommand, onRemoteResponseReceived); return ( - remoteCommandHandleWithStatus.isOK() - ? CommandSubmissionResult(handle.id, true, remoteCommandHandleWithStatus.getValue()) - : CommandSubmissionResult(handle.id, true, remoteCommandHandleWithStatus.getStatus())); + swRemoteCommandHandle.isOK() + ? CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getValue()) + : CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getStatus())); } void BalancerCommandsSchedulerImpl::_applySubmissionResult( WithLock, CommandSubmissionResult&& submissionResult) { - auto requestToUpdateIt = _incompleteRequests.find(submissionResult.id); - if (requestToUpdateIt == _incompleteRequests.end()) { - LOGV2(5847209, - "Skipping _applySubmissionResult: request already completed/canceled", - "reqId"_attr = submissionResult.id); - return; - } - /* - * Rules for processing the outcome of a command submission: - * If successful, - * - set the execution context on the RequestData to be able to serve future cancel requests - * - update the data structure describing running command requests - * If failed, - * - store the outcome into the deferred response - * - remove the RequestData and its persisted info (if any) - */ - auto& requestToUpdate = requestToUpdateIt->second; - if (submissionResult.context.isOK()) { - requestToUpdate.addExecutionContext(std::move(submissionResult.context.getValue())); - _runningRequestIds.insert(submissionResult.id); - } else { - const auto& submittedCommandInfo = requestToUpdate.getCommandInfo(); - if (submissionResult.acquiredDistLock) { - _lockedReferencesToRelease.emplace_back(submittedCommandInfo.getNameSpace()); - } - if (submittedCommandInfo.requiresRecoveryOnCrash()) { - _obsoleteRecoveryDocumentIds.push_back(submissionResult.id); + auto submittedRequestIt = _requests.find(submissionResult.id); + invariant(submittedRequestIt != _requests.end()); + auto& submittedRequest = submittedRequestIt->second; + auto submissionOutcome = submittedRequest.applySubmissionResult(std::move(submissionResult)); + if (!submissionOutcome.isOK()) { + // The request was resolved as failed on submission time - move it to the complete list. + _recentlyCompletedRequestIds.emplace_back(submittedRequestIt->first); + if (_state == SchedulerState::Recovering && --_numRequestsToRecover == 0) { + LOGV2(5847213, "Balancer scheduler recovery complete. Switching to regular execution"); + _state = SchedulerState::Running; } - requestToUpdate.setOutcome(submissionResult.context.getStatus()); - _incompleteRequests.erase(requestToUpdateIt); } } @@ -362,25 +332,17 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse( UUID requestId, const executor::TaskExecutor::ResponseStatus& response) { { stdx::lock_guard<Latch> lg(_mutex); - auto requestToCompleteIt = _incompleteRequests.find(requestId); - if (_state == SchedulerState::Stopping || _state == SchedulerState::Stopped || - requestToCompleteIt == _incompleteRequests.end()) { + if (_state == SchedulerState::Stopping || _state == SchedulerState::Stopped) { // Drop the response - the request is being cancelled in the worker thread. return; } - auto& requestToComplete = requestToCompleteIt->second; - requestToComplete.setOutcome(response); - auto& commandInfo = requestToComplete.getCommandInfo(); - if (commandInfo.requiresDistributedLock()) { - _lockedReferencesToRelease.emplace_back(commandInfo.getNameSpace()); - } - if (commandInfo.requiresRecoveryOnCrash()) { - _obsoleteRecoveryDocumentIds.push_back(requestId); - } - _runningRequestIds.erase(requestId); - _incompleteRequests.erase(requestToCompleteIt); - if (_incompleteRequests.empty() && _state == SchedulerState::Recovering) { - LOGV2(5847213, "Balancer scheduler recovery complete. Switching to regular execution"); + auto requestIt = _requests.find(requestId); + invariant(requestIt != _requests.end()); + auto& request = requestIt->second; + request.setOutcome(response); + _recentlyCompletedRequestIds.emplace_back(request.getId()); + if (_state == SchedulerState::Recovering && --_numRequestsToRecover == 0) { + LOGV2(5847207, "Balancer scheduler recovery complete. Switching to regular execution"); _state = SchedulerState::Running; } _stateUpdatedCV.notify_all(); @@ -409,59 +371,43 @@ std::vector<RequestData> BalancerCommandsSchedulerImpl::_loadRequestsToRecover( dbClient.query( documentProcessor, NamespaceString::kConfigBalancerCommandsNamespace, BSONObj()); } catch (const DBException& e) { - LOGV2(5847225, "Failed to load requests to recover", "error"_attr = redact(e)); + LOGV2(5847215, "Failed to load requests to recover", "error"_attr = redact(e)); } return requestsToRecover; } -void BalancerCommandsSchedulerImpl::_performDeferredCleanup(WithLock, OperationContext* opCtx) { - auto recoveryDocsDeleted = [this, opCtx] { - if (_obsoleteRecoveryDocumentIds.empty()) { - return false; +void BalancerCommandsSchedulerImpl::_performDeferredCleanup( + OperationContext* opCtx, std::vector<RequestData>&& requestsHoldingResources) { + std::vector<UUID> persistedRequestsIds; + for (const auto& request : requestsHoldingResources) { + if (request.holdsDistributedLock()) { + _distributedLocks.releaseFor(opCtx, request.getNamespace()); } - BSONObjBuilder queryBuilder; - if (_obsoleteRecoveryDocumentIds.size() == 1) { - _obsoleteRecoveryDocumentIds[0].appendToBuilder( - &queryBuilder, PersistedBalancerCommand::kRequestIdFieldName); - } else { - BSONObjBuilder requestIdClause( - queryBuilder.subobjStart(PersistedBalancerCommand::kRequestIdFieldName)); - BSONArrayBuilder valuesBuilder(requestIdClause.subarrayStart("$in")); - for (const auto& requestId : _obsoleteRecoveryDocumentIds) { - requestId.appendToArrayBuilder(&valuesBuilder); - } + if (request.isRecoverable()) { + persistedRequestsIds.emplace_back(request.getId()); } + } - _obsoleteRecoveryDocumentIds.clear(); - - auto query = queryBuilder.obj(); - DBDirectClient dbClient(opCtx); - - auto reply = dbClient.removeAcknowledged( - NamespaceString::kConfigBalancerCommandsNamespace.toString(), query); - - LOGV2(5847211, - "Clean up of obsolete document info performed", - "query"_attr = query, - "reply"_attr = reply); - return true; - }(); - - auto distributedLocksReleased = [this, opCtx] { - if (_lockedReferencesToRelease.empty()) { - return false; - } - for (const auto& nss : _lockedReferencesToRelease) { - _distributedLocks.releaseFor(opCtx, nss); - } - _lockedReferencesToRelease.clear(); - return true; - }(); + if (persistedRequestsIds.empty()) { + return; + } + BSONArrayBuilder idsToRemoveBuilder; + for (const auto& requestId : persistedRequestsIds) { + requestId.appendToArrayBuilder(&idsToRemoveBuilder); + } + BSONObjBuilder queryBuilder; + queryBuilder.append(PersistedBalancerCommand::kRequestIdFieldName, + BSON("$in" << idsToRemoveBuilder.arr())); - if (recoveryDocsDeleted || distributedLocksReleased) { - deferredCleanupCompletedCheckpoint.pauseWhileSet(); + auto query = queryBuilder.obj(); + DBDirectClient dbClient(opCtx); + try { + dbClient.remove(NamespaceString::kConfigBalancerCommandsNamespace.toString(), query); + } catch (const DBException& e) { + LOGV2(5847214, "Failed to remove recovery info", "error"_attr = redact(e)); } + deferredCleanupCompletedCheckpoint.pauseWhileSet(); } void BalancerCommandsSchedulerImpl::_workerThread() { @@ -473,46 +419,55 @@ void BalancerCommandsSchedulerImpl::_workerThread() { }); Client::initThread("BalancerCommandsScheduler"); + bool stopWorkerRequested = false; stdx::unordered_map<UUID, RequestData, UUID::Hash> requestsToCleanUpOnExit; LOGV2(5847205, "Balancer scheduler thread started"); - while (true) { - std::vector<CommandSubmissionHandle> commandsToSubmit; + while (!stopWorkerRequested) { + std::vector<CommandSubmissionParameters> commandsToSubmit; + std::vector<CommandSubmissionResult> submissionResults; + std::vector<RequestData> completedRequestsToCleanUp; + + // 1. Check the internal state and plan for the actions to be taken ont this round. { stdx::unique_lock<Latch> ul(_mutex); invariant(_state != SchedulerState::Stopped); _stateUpdatedCV.wait(ul, [this, &ul] { - return (_state == SchedulerState::Stopping || _canSubmitNewRequests(ul) || - _deferredCleanupRequired(ul)); + return ((!_unsubmittedRequestIds.empty() && + !MONGO_likely(pauseSubmissionsFailPoint.shouldFail())) || + _state == SchedulerState::Stopping || + !_recentlyCompletedRequestIds.empty()); }); - // Completed commands defer the clean up of acquired resources - { - auto opCtxHolder = cc().makeOperationContext(); - _performDeferredCleanup(ul, opCtxHolder.get()); + for (const auto& requestId : _recentlyCompletedRequestIds) { + auto it = _requests.find(requestId); + completedRequestsToCleanUp.emplace_back(std::move(it->second)); + _requests.erase(it); } + _recentlyCompletedRequestIds.clear(); if (_state == SchedulerState::Stopping) { - _runningRequestIds.clear(); - _pendingRequestIds.clear(); - _incompleteRequests.swap(requestsToCleanUpOnExit); - break; - } - - if (!_canSubmitNewRequests(ul)) { - continue; + // reset the internal state and + _unsubmittedRequestIds.clear(); + _requests.swap(requestsToCleanUpOnExit); + stopWorkerRequested = true; + } else { + // Pick up new commands to be submitted + for (const auto& requestId : _unsubmittedRequestIds) { + const auto& requestData = _requests.at(requestId); + commandsToSubmit.push_back(requestData.getSubmissionParameters()); + } + _unsubmittedRequestIds.clear(); } + } - // 1. Pick up new requests to be submitted from the pending list - for (auto pendingRequestId : _pendingRequestIds) { - const auto& requestData = _incompleteRequests.at(pendingRequestId); - commandsToSubmit.push_back(requestData.getSubmissionInfo()); - } - _pendingRequestIds.clear(); + // 2.a Free any resource acquired by already completed/aborted requests. + { + auto opCtxHolder = cc().makeOperationContext(); + _performDeferredCleanup(opCtxHolder.get(), std::move(completedRequestsToCleanUp)); } - // 2. Serve the picked up requests, submitting their related commands. - std::vector<CommandSubmissionResult> submissionResults; + // 2.b Serve the picked up requests, submitting their related commands. for (auto& submissionInfo : commandsToSubmit) { auto opCtxHolder = cc().makeOperationContext(); if (submissionInfo.commandInfo) { @@ -528,21 +483,12 @@ void BalancerCommandsSchedulerImpl::_workerThread() { } // 3. Process the outcome of each submission. - int numRunningRequests = 0; - int numPendingRequests = 0; - { + if (!submissionResults.empty()) { stdx::lock_guard<Latch> lg(_mutex); for (auto& submissionResult : submissionResults) { _applySubmissionResult(lg, std::move(submissionResult)); } - numRunningRequests = _runningRequestIds.size(); - numPendingRequests = _pendingRequestIds.size(); } - LOGV2_DEBUG(5847207, - 1, - "Ending balancer command scheduler round", - "numRunningRequests"_attr = numRunningRequests, - "numPendingRequests"_attr = numPendingRequests); } // In case of clean exit, cancel all the pending/running command requests @@ -556,8 +502,7 @@ void BalancerCommandsSchedulerImpl::_workerThread() { if (cancelHandle) { executor->cancel(*cancelHandle); } - _distributedLocks.releaseFor(opCtxHolder.get(), - idAndRequest.second.getCommandInfo().getNameSpace()); + _distributedLocks.releaseFor(opCtxHolder.get(), idAndRequest.second.getNamespace()); } } diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h index 79fc7aac793..9a36443236b 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h @@ -580,14 +580,14 @@ private: }; /** - * Helper data structure for submitting the shard command associated to a Request to the - * BalancerCommandsScheduler. + * Helper data structure for submitting the remote command associated to a BalancerCommandsScheduler + * Request. */ -struct CommandSubmissionHandle { - CommandSubmissionHandle(UUID id, const std::shared_ptr<CommandInfo>& commandInfo) +struct CommandSubmissionParameters { + CommandSubmissionParameters(UUID id, const std::shared_ptr<CommandInfo>& commandInfo) : id(id), commandInfo(commandInfo) {} - CommandSubmissionHandle(CommandSubmissionHandle&& rhs) + CommandSubmissionParameters(CommandSubmissionParameters&& rhs) : id(rhs.id), commandInfo(std::move(rhs.commandInfo)) {} const UUID id; @@ -614,12 +614,12 @@ struct CommandSubmissionResult { /** * The class encapsulating all the properties supporting a request to BalancerCommandsSchedulerImpl * as it gets created, executed and completed/cancelled. - * It offers helper methods to generate supporting classes to support the request processing. */ class RequestData { public: RequestData(UUID id, std::shared_ptr<CommandInfo>&& commandInfo) : _id(id), + _holdingDistLock(false), _commandInfo(std::move(commandInfo)), _deferredResponse(std::make_shared<Notification<executor::RemoteCommandResponse>>()), _executionContext(boost::none) { @@ -628,6 +628,7 @@ public: RequestData(RequestData&& rhs) : _id(rhs._id), + _holdingDistLock(rhs._holdingDistLock), _commandInfo(std::move(rhs._commandInfo)), _deferredResponse(std::move(rhs._deferredResponse)), _executionContext(std::move(rhs._executionContext)) {} @@ -638,16 +639,27 @@ public: return _id; } - const CommandInfo& getCommandInfo() const { - return *_commandInfo; + CommandSubmissionParameters getSubmissionParameters() const { + return CommandSubmissionParameters(_id, _commandInfo); } - CommandSubmissionHandle getSubmissionInfo() const { - return CommandSubmissionHandle(_id, _commandInfo); - } - - void addExecutionContext(ExecutionContext&& executionContext) { - _executionContext = std::move(executionContext); + Status applySubmissionResult(CommandSubmissionResult&& submissionResult) { + invariant(_id == submissionResult.id); + _holdingDistLock = submissionResult.acquiredDistLock; + if (!!(*_deferredResponse)) { + // The request has been already completed by the time the submission gets processed. + // Keep the original outcome and continue the workflow. + return Status::OK(); + } + auto submissionStatus = submissionResult.context.getStatus(); + if (submissionStatus.isOK()) { + // store the execution context to be able to serve future cancel requests. + _executionContext = std::move(submissionResult.context.getValue()); + } else { + // cascade the submission failure + setOutcome(submissionStatus); + } + return submissionStatus; } const boost::optional<executor::TaskExecutor::CallbackHandle>& getExecutionContext() { @@ -658,6 +670,18 @@ public: return ResponseHandle(_id, _deferredResponse); } + const NamespaceString& getNamespace() const { + return _commandInfo->getNameSpace(); + } + + const bool holdsDistributedLock() const { + return _holdingDistLock; + } + + const bool isRecoverable() const { + return _commandInfo->requiresRecoveryOnCrash(); + } + void setOutcome(const executor::TaskExecutor::ResponseStatus& response) { _deferredResponse->set(response); } @@ -669,6 +693,8 @@ private: const UUID _id; + bool _holdingDistLock; + std::shared_ptr<CommandInfo> _commandInfo; std::shared_ptr<Notification<executor::RemoteCommandResponse>> _deferredResponse; @@ -729,11 +755,9 @@ private: enum class SchedulerState { Recovering, Running, Stopping, Stopped }; // Protects the in-memory state of the Scheduler + // (_state, _requests, _unsubmittedRequestIds, _recentlyCompletedRequests). Mutex _mutex = MONGO_MAKE_LATCH("BalancerCommandsSchedulerImpl::_mutex"); - // Ensures that concurrent calls to start() and stop() get serialised - Mutex _startStopMutex = MONGO_MAKE_LATCH("BalancerCommandsSchedulerImpl::_startStopMutex"); - SchedulerState _state{SchedulerState::Stopped}; stdx::condition_variable _stateUpdatedCV; @@ -741,24 +765,21 @@ private: stdx::thread _workerThreadHandle; /** - * Collection of all pending + running requests currently managed by + * List of all unsubmitted + submitted + completed, but not cleaned up yet requests managed by * BalancerCommandsSchedulerImpl, organized by ID. */ - stdx::unordered_map<UUID, RequestData, UUID::Hash> _incompleteRequests; + stdx::unordered_map<UUID, RequestData, UUID::Hash> _requests; /** - * List of request IDs that have not been yet submitted. + * List of request IDs that have not been yet submitted for remote execution. */ - std::list<UUID> _pendingRequestIds; + std::vector<UUID> _unsubmittedRequestIds; /** - * List of request IDs that are currently running (submitted, but not yet completed). + * List of completed/cancelled requests IDs that may still hold synchronisation resources or + * persisted state that the scheduler needs to release/clean up. */ - stdx::unordered_set<UUID, UUID::Hash> _runningRequestIds; - - std::vector<UUID> _obsoleteRecoveryDocumentIds; - - std::vector<NamespaceString> _lockedReferencesToRelease; + std::vector<UUID> _recentlyCompletedRequestIds; /** * Centralised accessor for all the distributed locks required by the Scheduler. @@ -766,18 +787,22 @@ private: */ BalancerDistLocks _distributedLocks; + /* + * Counter of oustanding requests that were interrupted by a prior step-down/crash event, + * and that the scheduler is currently submitting as part of its initial recovery phase. + */ + size_t _numRequestsToRecover{0}; + ResponseHandle _buildAndEnqueueNewRequest(OperationContext* opCtx, std::shared_ptr<CommandInfo>&& commandInfo); ResponseHandle _enqueueRequest(WithLock, RequestData&& request); - bool _canSubmitNewRequests(WithLock); - - bool _deferredCleanupRequired(WithLock); - - void _performDeferredCleanup(WithLock, OperationContext* opCtx); + void _performDeferredCleanup(OperationContext* opCtx, + std::vector<RequestData>&& requestsHoldingResources); - CommandSubmissionResult _submit(OperationContext* opCtx, const CommandSubmissionHandle& data); + CommandSubmissionResult _submit(OperationContext* opCtx, + const CommandSubmissionParameters& data); void _applySubmissionResult(WithLock, CommandSubmissionResult&& submissionResult); diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp index 51ea5d95995..339ce137003 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp @@ -117,11 +117,6 @@ TEST_F(BalancerCommandsSchedulerTest, StartAndStopScheduler) { _scheduler.stop(); } -TEST_F(BalancerCommandsSchedulerTest, ResilientToMultipleStarts) { - _scheduler.start(operationContext()); - _scheduler.start(operationContext()); -} - TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) { auto deferredCleanupCompletedCheckpoint = globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint"); @@ -308,7 +303,7 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenSchedulerIsStopped) { TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) { std::unique_ptr<MoveChunkResponse> resp; { - FailPointEnableBlock failPoint("pauseBalancerWorkerThread"); + FailPointEnableBlock failPoint("pauseSubmissionsFailPoint"); _scheduler.start(operationContext()); ChunkType moveChunk = makeChunk(0, kShardId0); resp = _scheduler.requestMoveChunk(operationContext(), @@ -335,7 +330,7 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) { TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenRequestIsSubmitted) { // This prevents the request from being submitted by the scheduler worker thread. - FailPointEnableBlock failPoint("pauseBalancerWorkerThread"); + FailPointEnableBlock failPoint("pauseSubmissionsFailPoint"); auto opCtx = operationContext(); _scheduler.start(opCtx); @@ -374,7 +369,7 @@ TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenReq } TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecoveringFromCrash) { - FailPoint* failpoint = globalFailPointRegistry().find("pauseBalancerWorkerThread"); + FailPoint* failpoint = globalFailPointRegistry().find("pauseSubmissionsFailPoint"); failpoint->setMode(FailPoint::Mode::alwaysOn); auto opCtx = operationContext(); _scheduler.start(opCtx); @@ -431,7 +426,7 @@ TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecovering TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL) { OperationContext* opCtx; - FailPoint* failpoint = globalFailPointRegistry().find("pauseBalancerWorkerThread"); + FailPoint* failpoint = globalFailPointRegistry().find("pauseSubmissionsFailPoint"); failpoint->setMode(FailPoint::Mode::alwaysOn); { _scheduler.start(operationContext()); |