diff options
Diffstat (limited to 'src/mongo/db/s/balancer')
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h | 6 |
2 files changed, 12 insertions, 21 deletions
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp index 1234acfa6b0..797d4c00bbc 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp @@ -175,8 +175,7 @@ void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx, stdx::lock_guard<Latch> lg(_mutex); invariant(!_workerThreadHandle.joinable()); if (!_executor) { - _executor = std::make_unique<executor::ScopedTaskExecutor>( - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()); + _executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); } auto requestsToRecover = rebuildRequestsFromRecoveryInfo(opCtx, defaultValues); _numRequestsToRecover = requestsToRecover.size(); @@ -421,7 +420,7 @@ CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit( } auto swRemoteCommandHandle = - (*_executor)->scheduleRemoteCommand(remoteCommand, onRemoteResponseReceived); + _executor->scheduleRemoteCommand(remoteCommand, onRemoteResponseReceived); return CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getStatus()); } @@ -466,8 +465,7 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse( void BalancerCommandsSchedulerImpl::_performDeferredCleanup( OperationContext* opCtx, - const stdx::unordered_map<UUID, RequestData, UUID::Hash>& requestsHoldingResources, - bool includePersistedData) { + const stdx::unordered_map<UUID, RequestData, UUID::Hash>& requestsHoldingResources) { if (requestsHoldingResources.empty()) { return; } @@ -477,7 +475,7 @@ void BalancerCommandsSchedulerImpl::_performDeferredCleanup( if (request.holdsDistributedLock()) { _distributedLocks.releaseFor(opCtx, request.getNamespace()); } - if (includePersistedData && request.requiresRecoveryCleanupOnCompletion()) { + if (request.requiresRecoveryCleanupOnCompletion()) { deletePersistedRecoveryInfo(dbClient, request.getCommandInfo()); } } @@ -539,8 +537,7 @@ void BalancerCommandsSchedulerImpl::_workerThread() { // 2.a Free any resource acquired by already completed/aborted requests. { auto opCtxHolder = cc().makeOperationContext(); - _performDeferredCleanup( - opCtxHolder.get(), completedRequestsToCleanUp, true /*includePersistedData*/); + _performDeferredCleanup(opCtxHolder.get(), completedRequestsToCleanUp); completedRequestsToCleanUp.clear(); } @@ -568,21 +565,17 @@ void BalancerCommandsSchedulerImpl::_workerThread() { } } // Wait for each outstanding command to complete, clean out its resources and leave. - (*_executor)->shutdown(); - (*_executor)->join(); - - stdx::unordered_map<UUID, RequestData, UUID::Hash> cancelledRequests; + stdx::unordered_map<UUID, RequestData, UUID::Hash> requestsToClean; { stdx::unique_lock<Latch> ul(_mutex); - cancelledRequests.swap(_requests); + _stateUpdatedCV.wait( + ul, [this] { return (_requests.size() == _recentlyCompletedRequestIds.size()); }); + requestsToClean.swap(_requests); _requests.clear(); _recentlyCompletedRequestIds.clear(); - _executor.reset(); } auto opCtxHolder = cc().makeOperationContext(); - // Ensure that the clean up won't delete any request recovery document (the commands will be - // reissued once the scheduler is restarted) - _performDeferredCleanup(opCtxHolder.get(), cancelledRequests, false /*includePersistedData*/); + _performDeferredCleanup(opCtxHolder.get(), requestsToClean); } diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h index 92f9f074441..ba85ee9a6e3 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h @@ -34,7 +34,6 @@ #include "mongo/db/s/balancer/type_migration.h" #include "mongo/db/s/forwardable_operation_metadata.h" #include "mongo/db/service_context.h" -#include "mongo/executor/scoped_task_executor.h" #include "mongo/platform/mutex.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/client/shard.h" @@ -594,7 +593,7 @@ public: private: enum class SchedulerState { Recovering, Running, Stopping, Stopped }; - std::unique_ptr<executor::ScopedTaskExecutor> _executor{nullptr}; + std::shared_ptr<executor::TaskExecutor> _executor{nullptr}; // Protects the in-memory state of the Scheduler // (_state, _requests, _unsubmittedRequestIds, _recentlyCompletedRequests). @@ -648,8 +647,7 @@ private: */ void _performDeferredCleanup( OperationContext* opCtx, - const stdx::unordered_map<UUID, RequestData, UUID::Hash>& requestsHoldingResources, - bool includePersistedData); + const stdx::unordered_map<UUID, RequestData, UUID::Hash>& requestsHoldingResources); CommandSubmissionResult _submit(OperationContext* opCtx, const CommandSubmissionParameters& data); |