summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/balancer
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/balancer')
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp27
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h6
2 files changed, 12 insertions, 21 deletions
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
index 1234acfa6b0..797d4c00bbc 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
@@ -175,8 +175,7 @@ void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx,
stdx::lock_guard<Latch> lg(_mutex);
invariant(!_workerThreadHandle.joinable());
if (!_executor) {
- _executor = std::make_unique<executor::ScopedTaskExecutor>(
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
+ _executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
}
auto requestsToRecover = rebuildRequestsFromRecoveryInfo(opCtx, defaultValues);
_numRequestsToRecover = requestsToRecover.size();
@@ -421,7 +420,7 @@ CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit(
}
auto swRemoteCommandHandle =
- (*_executor)->scheduleRemoteCommand(remoteCommand, onRemoteResponseReceived);
+ _executor->scheduleRemoteCommand(remoteCommand, onRemoteResponseReceived);
return CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getStatus());
}
@@ -466,8 +465,7 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse(
void BalancerCommandsSchedulerImpl::_performDeferredCleanup(
OperationContext* opCtx,
- const stdx::unordered_map<UUID, RequestData, UUID::Hash>& requestsHoldingResources,
- bool includePersistedData) {
+ const stdx::unordered_map<UUID, RequestData, UUID::Hash>& requestsHoldingResources) {
if (requestsHoldingResources.empty()) {
return;
}
@@ -477,7 +475,7 @@ void BalancerCommandsSchedulerImpl::_performDeferredCleanup(
if (request.holdsDistributedLock()) {
_distributedLocks.releaseFor(opCtx, request.getNamespace());
}
- if (includePersistedData && request.requiresRecoveryCleanupOnCompletion()) {
+ if (request.requiresRecoveryCleanupOnCompletion()) {
deletePersistedRecoveryInfo(dbClient, request.getCommandInfo());
}
}
@@ -539,8 +537,7 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
// 2.a Free any resource acquired by already completed/aborted requests.
{
auto opCtxHolder = cc().makeOperationContext();
- _performDeferredCleanup(
- opCtxHolder.get(), completedRequestsToCleanUp, true /*includePersistedData*/);
+ _performDeferredCleanup(opCtxHolder.get(), completedRequestsToCleanUp);
completedRequestsToCleanUp.clear();
}
@@ -568,21 +565,17 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
}
}
// Wait for each outstanding command to complete, clean out its resources and leave.
- (*_executor)->shutdown();
- (*_executor)->join();
-
- stdx::unordered_map<UUID, RequestData, UUID::Hash> cancelledRequests;
+ stdx::unordered_map<UUID, RequestData, UUID::Hash> requestsToClean;
{
stdx::unique_lock<Latch> ul(_mutex);
- cancelledRequests.swap(_requests);
+ _stateUpdatedCV.wait(
+ ul, [this] { return (_requests.size() == _recentlyCompletedRequestIds.size()); });
+ requestsToClean.swap(_requests);
_requests.clear();
_recentlyCompletedRequestIds.clear();
- _executor.reset();
}
auto opCtxHolder = cc().makeOperationContext();
- // Ensure that the clean up won't delete any request recovery document (the commands will be
- // reissued once the scheduler is restarted)
- _performDeferredCleanup(opCtxHolder.get(), cancelledRequests, false /*includePersistedData*/);
+ _performDeferredCleanup(opCtxHolder.get(), requestsToClean);
}
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
index 92f9f074441..ba85ee9a6e3 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
@@ -34,7 +34,6 @@
#include "mongo/db/s/balancer/type_migration.h"
#include "mongo/db/s/forwardable_operation_metadata.h"
#include "mongo/db/service_context.h"
-#include "mongo/executor/scoped_task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/client/shard.h"
@@ -594,7 +593,7 @@ public:
private:
enum class SchedulerState { Recovering, Running, Stopping, Stopped };
- std::unique_ptr<executor::ScopedTaskExecutor> _executor{nullptr};
+ std::shared_ptr<executor::TaskExecutor> _executor{nullptr};
// Protects the in-memory state of the Scheduler
// (_state, _requests, _unsubmittedRequestIds, _recentlyCompletedRequests).
@@ -648,8 +647,7 @@ private:
*/
void _performDeferredCleanup(
OperationContext* opCtx,
- const stdx::unordered_map<UUID, RequestData, UUID::Hash>& requestsHoldingResources,
- bool includePersistedData);
+ const stdx::unordered_map<UUID, RequestData, UUID::Hash>& requestsHoldingResources);
CommandSubmissionResult _submit(OperationContext* opCtx,
const CommandSubmissionParameters& data);