summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2021-11-11 21:58:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-11 23:13:16 +0000
commitbf817cb2c73a75eb51c5ff7de327b6d2cdeccbef (patch)
tree1cdb6fe2f765f6b29446d20be07b63d246daeb58
parent84d485c25a0d238467048200bb7e894ada67bedc (diff)
downloadmongo-bf817cb2c73a75eb51c5ff7de327b6d2cdeccbef.tar.gz
SERVER-60921 Simplify and optimise the BalancerCommandsScheduler
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp257
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h91
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp13
3 files changed, 163 insertions, 198 deletions
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
index 1f923f4edbc..e835b8c73bd 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
@@ -42,7 +42,7 @@
namespace mongo {
-MONGO_FAIL_POINT_DEFINE(pauseBalancerWorkerThread);
+MONGO_FAIL_POINT_DEFINE(pauseSubmissionsFailPoint);
MONGO_FAIL_POINT_DEFINE(deferredCleanupCompletedCheckpoint);
const std::string MergeChunksCommandInfo::kCommandName = "mergeChunks";
@@ -81,15 +81,11 @@ BalancerCommandsSchedulerImpl::~BalancerCommandsSchedulerImpl() {
void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx) {
LOGV2(5847200, "Balancer command scheduler start requested");
- stdx::lock_guard<Latch> lgss(_startStopMutex);
stdx::lock_guard<Latch> lg(_mutex);
- if (_state == SchedulerState::Recovering || _state == SchedulerState::Running) {
- return;
- }
-
invariant(!_workerThreadHandle.joinable());
auto requestsToRecover = _loadRequestsToRecover(opCtx);
- _state = requestsToRecover.empty() ? SchedulerState::Running : SchedulerState::Recovering;
+ _numRequestsToRecover = requestsToRecover.size();
+ _state = _numRequestsToRecover == 0 ? SchedulerState::Running : SchedulerState::Recovering;
for (auto& requestToRecover : requestsToRecover) {
_enqueueRequest(lg, std::move(requestToRecover));
@@ -100,7 +96,6 @@ void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx) {
void BalancerCommandsSchedulerImpl::stop() {
LOGV2(5847201, "Balancer command scheduler stop requested");
- stdx::lock_guard<Latch> lgss(_startStopMutex);
{
stdx::lock_guard<Latch> lg(_mutex);
if (_state == SchedulerState::Stopped) {
@@ -260,8 +255,8 @@ ResponseHandle BalancerCommandsSchedulerImpl::_enqueueRequest(WithLock, RequestD
auto requestId = request.getId();
auto deferredResponseHandle = request.getResponseHandle();
if (_state == SchedulerState::Recovering || _state == SchedulerState::Running) {
- _incompleteRequests.emplace(std::make_pair(requestId, std::move(request)));
- _pendingRequestIds.push_back(requestId);
+ _requests.emplace(std::make_pair(requestId, std::move(request)));
+ _unsubmittedRequestIds.push_back(requestId);
_stateUpdatedCV.notify_all();
} else {
deferredResponseHandle.handle->set(Status(
@@ -271,90 +266,65 @@ ResponseHandle BalancerCommandsSchedulerImpl::_enqueueRequest(WithLock, RequestD
return deferredResponseHandle;
}
-bool BalancerCommandsSchedulerImpl::_canSubmitNewRequests(WithLock) {
- return (!_pendingRequestIds.empty() && MONGO_likely(!pauseBalancerWorkerThread.shouldFail()));
-}
-
-bool BalancerCommandsSchedulerImpl::_deferredCleanupRequired(WithLock) {
- return (!_obsoleteRecoveryDocumentIds.empty() || !_lockedReferencesToRelease.empty());
-}
-
CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit(
- OperationContext* opCtx, const CommandSubmissionHandle& handle) {
- LOGV2(5847203, "Balancer command request submitted for execution", "reqId"_attr = handle.id);
+ OperationContext* opCtx, const CommandSubmissionParameters& params) {
+ LOGV2(5847203, "Balancer command request submitted for execution", "reqId"_attr = params.id);
+ bool distLockTaken = false;
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
const auto shardWithStatus =
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, handle.commandInfo->getTarget());
+ Grid::get(opCtx)->shardRegistry()->getShard(opCtx, params.commandInfo->getTarget());
if (!shardWithStatus.isOK()) {
- return CommandSubmissionResult(handle.id, false, shardWithStatus.getStatus());
+ return CommandSubmissionResult(params.id, distLockTaken, shardWithStatus.getStatus());
}
const auto shardHostWithStatus = shardWithStatus.getValue()->getTargeter()->findHost(
opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly});
if (!shardHostWithStatus.isOK()) {
- return CommandSubmissionResult(handle.id, false, shardHostWithStatus.getStatus());
+ return CommandSubmissionResult(params.id, distLockTaken, shardHostWithStatus.getStatus());
}
const executor::RemoteCommandRequest remoteCommand(shardHostWithStatus.getValue(),
NamespaceString::kAdminDb.toString(),
- handle.commandInfo->serialise(),
+ params.commandInfo->serialise(),
opCtx);
auto onRemoteResponseReceived =
[this,
- requestId = handle.id](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+ requestId = params.id](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
_applyCommandResponse(requestId, args.response);
};
- if (handle.commandInfo->requiresDistributedLock()) {
+ if (params.commandInfo->requiresDistributedLock()) {
Status lockAcquisitionResponse =
- _distributedLocks.acquireFor(opCtx, handle.commandInfo->getNameSpace());
+ _distributedLocks.acquireFor(opCtx, params.commandInfo->getNameSpace());
if (!lockAcquisitionResponse.isOK()) {
- return CommandSubmissionResult(handle.id, false, lockAcquisitionResponse);
+ return CommandSubmissionResult(params.id, distLockTaken, lockAcquisitionResponse);
}
+ distLockTaken = true;
}
- auto remoteCommandHandleWithStatus =
+ auto swRemoteCommandHandle =
executor->scheduleRemoteCommand(remoteCommand, onRemoteResponseReceived);
return (
- remoteCommandHandleWithStatus.isOK()
- ? CommandSubmissionResult(handle.id, true, remoteCommandHandleWithStatus.getValue())
- : CommandSubmissionResult(handle.id, true, remoteCommandHandleWithStatus.getStatus()));
+ swRemoteCommandHandle.isOK()
+ ? CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getValue())
+ : CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getStatus()));
}
void BalancerCommandsSchedulerImpl::_applySubmissionResult(
WithLock, CommandSubmissionResult&& submissionResult) {
- auto requestToUpdateIt = _incompleteRequests.find(submissionResult.id);
- if (requestToUpdateIt == _incompleteRequests.end()) {
- LOGV2(5847209,
- "Skipping _applySubmissionResult: request already completed/canceled",
- "reqId"_attr = submissionResult.id);
- return;
- }
- /*
- * Rules for processing the outcome of a command submission:
- * If successful,
- * - set the execution context on the RequestData to be able to serve future cancel requests
- * - update the data structure describing running command requests
- * If failed,
- * - store the outcome into the deferred response
- * - remove the RequestData and its persisted info (if any)
- */
- auto& requestToUpdate = requestToUpdateIt->second;
- if (submissionResult.context.isOK()) {
- requestToUpdate.addExecutionContext(std::move(submissionResult.context.getValue()));
- _runningRequestIds.insert(submissionResult.id);
- } else {
- const auto& submittedCommandInfo = requestToUpdate.getCommandInfo();
- if (submissionResult.acquiredDistLock) {
- _lockedReferencesToRelease.emplace_back(submittedCommandInfo.getNameSpace());
- }
- if (submittedCommandInfo.requiresRecoveryOnCrash()) {
- _obsoleteRecoveryDocumentIds.push_back(submissionResult.id);
+ auto submittedRequestIt = _requests.find(submissionResult.id);
+ invariant(submittedRequestIt != _requests.end());
+ auto& submittedRequest = submittedRequestIt->second;
+ auto submissionOutcome = submittedRequest.applySubmissionResult(std::move(submissionResult));
+ if (!submissionOutcome.isOK()) {
+ // The request was resolved as failed on submission time - move it to the complete list.
+ _recentlyCompletedRequestIds.emplace_back(submittedRequestIt->first);
+ if (_state == SchedulerState::Recovering && --_numRequestsToRecover == 0) {
+ LOGV2(5847213, "Balancer scheduler recovery complete. Switching to regular execution");
+ _state = SchedulerState::Running;
}
- requestToUpdate.setOutcome(submissionResult.context.getStatus());
- _incompleteRequests.erase(requestToUpdateIt);
}
}
@@ -362,25 +332,17 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse(
UUID requestId, const executor::TaskExecutor::ResponseStatus& response) {
{
stdx::lock_guard<Latch> lg(_mutex);
- auto requestToCompleteIt = _incompleteRequests.find(requestId);
- if (_state == SchedulerState::Stopping || _state == SchedulerState::Stopped ||
- requestToCompleteIt == _incompleteRequests.end()) {
+ if (_state == SchedulerState::Stopping || _state == SchedulerState::Stopped) {
// Drop the response - the request is being cancelled in the worker thread.
return;
}
- auto& requestToComplete = requestToCompleteIt->second;
- requestToComplete.setOutcome(response);
- auto& commandInfo = requestToComplete.getCommandInfo();
- if (commandInfo.requiresDistributedLock()) {
- _lockedReferencesToRelease.emplace_back(commandInfo.getNameSpace());
- }
- if (commandInfo.requiresRecoveryOnCrash()) {
- _obsoleteRecoveryDocumentIds.push_back(requestId);
- }
- _runningRequestIds.erase(requestId);
- _incompleteRequests.erase(requestToCompleteIt);
- if (_incompleteRequests.empty() && _state == SchedulerState::Recovering) {
- LOGV2(5847213, "Balancer scheduler recovery complete. Switching to regular execution");
+ auto requestIt = _requests.find(requestId);
+ invariant(requestIt != _requests.end());
+ auto& request = requestIt->second;
+ request.setOutcome(response);
+ _recentlyCompletedRequestIds.emplace_back(request.getId());
+ if (_state == SchedulerState::Recovering && --_numRequestsToRecover == 0) {
+ LOGV2(5847207, "Balancer scheduler recovery complete. Switching to regular execution");
_state = SchedulerState::Running;
}
_stateUpdatedCV.notify_all();
@@ -409,59 +371,43 @@ std::vector<RequestData> BalancerCommandsSchedulerImpl::_loadRequestsToRecover(
dbClient.query(
documentProcessor, NamespaceString::kConfigBalancerCommandsNamespace, BSONObj());
} catch (const DBException& e) {
- LOGV2(5847225, "Failed to load requests to recover", "error"_attr = redact(e));
+ LOGV2(5847215, "Failed to load requests to recover", "error"_attr = redact(e));
}
return requestsToRecover;
}
-void BalancerCommandsSchedulerImpl::_performDeferredCleanup(WithLock, OperationContext* opCtx) {
- auto recoveryDocsDeleted = [this, opCtx] {
- if (_obsoleteRecoveryDocumentIds.empty()) {
- return false;
+void BalancerCommandsSchedulerImpl::_performDeferredCleanup(
+ OperationContext* opCtx, std::vector<RequestData>&& requestsHoldingResources) {
+ std::vector<UUID> persistedRequestsIds;
+ for (const auto& request : requestsHoldingResources) {
+ if (request.holdsDistributedLock()) {
+ _distributedLocks.releaseFor(opCtx, request.getNamespace());
}
- BSONObjBuilder queryBuilder;
- if (_obsoleteRecoveryDocumentIds.size() == 1) {
- _obsoleteRecoveryDocumentIds[0].appendToBuilder(
- &queryBuilder, PersistedBalancerCommand::kRequestIdFieldName);
- } else {
- BSONObjBuilder requestIdClause(
- queryBuilder.subobjStart(PersistedBalancerCommand::kRequestIdFieldName));
- BSONArrayBuilder valuesBuilder(requestIdClause.subarrayStart("$in"));
- for (const auto& requestId : _obsoleteRecoveryDocumentIds) {
- requestId.appendToArrayBuilder(&valuesBuilder);
- }
+ if (request.isRecoverable()) {
+ persistedRequestsIds.emplace_back(request.getId());
}
+ }
- _obsoleteRecoveryDocumentIds.clear();
-
- auto query = queryBuilder.obj();
- DBDirectClient dbClient(opCtx);
-
- auto reply = dbClient.removeAcknowledged(
- NamespaceString::kConfigBalancerCommandsNamespace.toString(), query);
-
- LOGV2(5847211,
- "Clean up of obsolete document info performed",
- "query"_attr = query,
- "reply"_attr = reply);
- return true;
- }();
-
- auto distributedLocksReleased = [this, opCtx] {
- if (_lockedReferencesToRelease.empty()) {
- return false;
- }
- for (const auto& nss : _lockedReferencesToRelease) {
- _distributedLocks.releaseFor(opCtx, nss);
- }
- _lockedReferencesToRelease.clear();
- return true;
- }();
+ if (persistedRequestsIds.empty()) {
+ return;
+ }
+ BSONArrayBuilder idsToRemoveBuilder;
+ for (const auto& requestId : persistedRequestsIds) {
+ requestId.appendToArrayBuilder(&idsToRemoveBuilder);
+ }
+ BSONObjBuilder queryBuilder;
+ queryBuilder.append(PersistedBalancerCommand::kRequestIdFieldName,
+ BSON("$in" << idsToRemoveBuilder.arr()));
- if (recoveryDocsDeleted || distributedLocksReleased) {
- deferredCleanupCompletedCheckpoint.pauseWhileSet();
+ auto query = queryBuilder.obj();
+ DBDirectClient dbClient(opCtx);
+ try {
+ dbClient.remove(NamespaceString::kConfigBalancerCommandsNamespace.toString(), query);
+ } catch (const DBException& e) {
+ LOGV2(5847214, "Failed to remove recovery info", "error"_attr = redact(e));
}
+ deferredCleanupCompletedCheckpoint.pauseWhileSet();
}
void BalancerCommandsSchedulerImpl::_workerThread() {
@@ -473,46 +419,55 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
});
Client::initThread("BalancerCommandsScheduler");
+ bool stopWorkerRequested = false;
stdx::unordered_map<UUID, RequestData, UUID::Hash> requestsToCleanUpOnExit;
LOGV2(5847205, "Balancer scheduler thread started");
- while (true) {
- std::vector<CommandSubmissionHandle> commandsToSubmit;
+ while (!stopWorkerRequested) {
+ std::vector<CommandSubmissionParameters> commandsToSubmit;
+ std::vector<CommandSubmissionResult> submissionResults;
+ std::vector<RequestData> completedRequestsToCleanUp;
+
+ // 1. Check the internal state and plan for the actions to be taken ont this round.
{
stdx::unique_lock<Latch> ul(_mutex);
invariant(_state != SchedulerState::Stopped);
_stateUpdatedCV.wait(ul, [this, &ul] {
- return (_state == SchedulerState::Stopping || _canSubmitNewRequests(ul) ||
- _deferredCleanupRequired(ul));
+ return ((!_unsubmittedRequestIds.empty() &&
+ !MONGO_likely(pauseSubmissionsFailPoint.shouldFail())) ||
+ _state == SchedulerState::Stopping ||
+ !_recentlyCompletedRequestIds.empty());
});
- // Completed commands defer the clean up of acquired resources
- {
- auto opCtxHolder = cc().makeOperationContext();
- _performDeferredCleanup(ul, opCtxHolder.get());
+ for (const auto& requestId : _recentlyCompletedRequestIds) {
+ auto it = _requests.find(requestId);
+ completedRequestsToCleanUp.emplace_back(std::move(it->second));
+ _requests.erase(it);
}
+ _recentlyCompletedRequestIds.clear();
if (_state == SchedulerState::Stopping) {
- _runningRequestIds.clear();
- _pendingRequestIds.clear();
- _incompleteRequests.swap(requestsToCleanUpOnExit);
- break;
- }
-
- if (!_canSubmitNewRequests(ul)) {
- continue;
+ // reset the internal state and
+ _unsubmittedRequestIds.clear();
+ _requests.swap(requestsToCleanUpOnExit);
+ stopWorkerRequested = true;
+ } else {
+ // Pick up new commands to be submitted
+ for (const auto& requestId : _unsubmittedRequestIds) {
+ const auto& requestData = _requests.at(requestId);
+ commandsToSubmit.push_back(requestData.getSubmissionParameters());
+ }
+ _unsubmittedRequestIds.clear();
}
+ }
- // 1. Pick up new requests to be submitted from the pending list
- for (auto pendingRequestId : _pendingRequestIds) {
- const auto& requestData = _incompleteRequests.at(pendingRequestId);
- commandsToSubmit.push_back(requestData.getSubmissionInfo());
- }
- _pendingRequestIds.clear();
+ // 2.a Free any resource acquired by already completed/aborted requests.
+ {
+ auto opCtxHolder = cc().makeOperationContext();
+ _performDeferredCleanup(opCtxHolder.get(), std::move(completedRequestsToCleanUp));
}
- // 2. Serve the picked up requests, submitting their related commands.
- std::vector<CommandSubmissionResult> submissionResults;
+ // 2.b Serve the picked up requests, submitting their related commands.
for (auto& submissionInfo : commandsToSubmit) {
auto opCtxHolder = cc().makeOperationContext();
if (submissionInfo.commandInfo) {
@@ -528,21 +483,12 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
}
// 3. Process the outcome of each submission.
- int numRunningRequests = 0;
- int numPendingRequests = 0;
- {
+ if (!submissionResults.empty()) {
stdx::lock_guard<Latch> lg(_mutex);
for (auto& submissionResult : submissionResults) {
_applySubmissionResult(lg, std::move(submissionResult));
}
- numRunningRequests = _runningRequestIds.size();
- numPendingRequests = _pendingRequestIds.size();
}
- LOGV2_DEBUG(5847207,
- 1,
- "Ending balancer command scheduler round",
- "numRunningRequests"_attr = numRunningRequests,
- "numPendingRequests"_attr = numPendingRequests);
}
// In case of clean exit, cancel all the pending/running command requests
@@ -556,8 +502,7 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
if (cancelHandle) {
executor->cancel(*cancelHandle);
}
- _distributedLocks.releaseFor(opCtxHolder.get(),
- idAndRequest.second.getCommandInfo().getNameSpace());
+ _distributedLocks.releaseFor(opCtxHolder.get(), idAndRequest.second.getNamespace());
}
}
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
index 79fc7aac793..9a36443236b 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
@@ -580,14 +580,14 @@ private:
};
/**
- * Helper data structure for submitting the shard command associated to a Request to the
- * BalancerCommandsScheduler.
+ * Helper data structure for submitting the remote command associated to a BalancerCommandsScheduler
+ * Request.
*/
-struct CommandSubmissionHandle {
- CommandSubmissionHandle(UUID id, const std::shared_ptr<CommandInfo>& commandInfo)
+struct CommandSubmissionParameters {
+ CommandSubmissionParameters(UUID id, const std::shared_ptr<CommandInfo>& commandInfo)
: id(id), commandInfo(commandInfo) {}
- CommandSubmissionHandle(CommandSubmissionHandle&& rhs)
+ CommandSubmissionParameters(CommandSubmissionParameters&& rhs)
: id(rhs.id), commandInfo(std::move(rhs.commandInfo)) {}
const UUID id;
@@ -614,12 +614,12 @@ struct CommandSubmissionResult {
/**
* The class encapsulating all the properties supporting a request to BalancerCommandsSchedulerImpl
* as it gets created, executed and completed/cancelled.
- * It offers helper methods to generate supporting classes to support the request processing.
*/
class RequestData {
public:
RequestData(UUID id, std::shared_ptr<CommandInfo>&& commandInfo)
: _id(id),
+ _holdingDistLock(false),
_commandInfo(std::move(commandInfo)),
_deferredResponse(std::make_shared<Notification<executor::RemoteCommandResponse>>()),
_executionContext(boost::none) {
@@ -628,6 +628,7 @@ public:
RequestData(RequestData&& rhs)
: _id(rhs._id),
+ _holdingDistLock(rhs._holdingDistLock),
_commandInfo(std::move(rhs._commandInfo)),
_deferredResponse(std::move(rhs._deferredResponse)),
_executionContext(std::move(rhs._executionContext)) {}
@@ -638,16 +639,27 @@ public:
return _id;
}
- const CommandInfo& getCommandInfo() const {
- return *_commandInfo;
+ CommandSubmissionParameters getSubmissionParameters() const {
+ return CommandSubmissionParameters(_id, _commandInfo);
}
- CommandSubmissionHandle getSubmissionInfo() const {
- return CommandSubmissionHandle(_id, _commandInfo);
- }
-
- void addExecutionContext(ExecutionContext&& executionContext) {
- _executionContext = std::move(executionContext);
+ Status applySubmissionResult(CommandSubmissionResult&& submissionResult) {
+ invariant(_id == submissionResult.id);
+ _holdingDistLock = submissionResult.acquiredDistLock;
+ if (!!(*_deferredResponse)) {
+ // The request has been already completed by the time the submission gets processed.
+ // Keep the original outcome and continue the workflow.
+ return Status::OK();
+ }
+ auto submissionStatus = submissionResult.context.getStatus();
+ if (submissionStatus.isOK()) {
+ // store the execution context to be able to serve future cancel requests.
+ _executionContext = std::move(submissionResult.context.getValue());
+ } else {
+ // cascade the submission failure
+ setOutcome(submissionStatus);
+ }
+ return submissionStatus;
}
const boost::optional<executor::TaskExecutor::CallbackHandle>& getExecutionContext() {
@@ -658,6 +670,18 @@ public:
return ResponseHandle(_id, _deferredResponse);
}
+ const NamespaceString& getNamespace() const {
+ return _commandInfo->getNameSpace();
+ }
+
+ const bool holdsDistributedLock() const {
+ return _holdingDistLock;
+ }
+
+ const bool isRecoverable() const {
+ return _commandInfo->requiresRecoveryOnCrash();
+ }
+
void setOutcome(const executor::TaskExecutor::ResponseStatus& response) {
_deferredResponse->set(response);
}
@@ -669,6 +693,8 @@ private:
const UUID _id;
+ bool _holdingDistLock;
+
std::shared_ptr<CommandInfo> _commandInfo;
std::shared_ptr<Notification<executor::RemoteCommandResponse>> _deferredResponse;
@@ -729,11 +755,9 @@ private:
enum class SchedulerState { Recovering, Running, Stopping, Stopped };
// Protects the in-memory state of the Scheduler
+ // (_state, _requests, _unsubmittedRequestIds, _recentlyCompletedRequests).
Mutex _mutex = MONGO_MAKE_LATCH("BalancerCommandsSchedulerImpl::_mutex");
- // Ensures that concurrent calls to start() and stop() get serialised
- Mutex _startStopMutex = MONGO_MAKE_LATCH("BalancerCommandsSchedulerImpl::_startStopMutex");
-
SchedulerState _state{SchedulerState::Stopped};
stdx::condition_variable _stateUpdatedCV;
@@ -741,24 +765,21 @@ private:
stdx::thread _workerThreadHandle;
/**
- * Collection of all pending + running requests currently managed by
+ * List of all unsubmitted + submitted + completed, but not cleaned up yet requests managed by
* BalancerCommandsSchedulerImpl, organized by ID.
*/
- stdx::unordered_map<UUID, RequestData, UUID::Hash> _incompleteRequests;
+ stdx::unordered_map<UUID, RequestData, UUID::Hash> _requests;
/**
- * List of request IDs that have not been yet submitted.
+ * List of request IDs that have not been yet submitted for remote execution.
*/
- std::list<UUID> _pendingRequestIds;
+ std::vector<UUID> _unsubmittedRequestIds;
/**
- * List of request IDs that are currently running (submitted, but not yet completed).
+ * List of completed/cancelled requests IDs that may still hold synchronisation resources or
+ * persisted state that the scheduler needs to release/clean up.
*/
- stdx::unordered_set<UUID, UUID::Hash> _runningRequestIds;
-
- std::vector<UUID> _obsoleteRecoveryDocumentIds;
-
- std::vector<NamespaceString> _lockedReferencesToRelease;
+ std::vector<UUID> _recentlyCompletedRequestIds;
/**
* Centralised accessor for all the distributed locks required by the Scheduler.
@@ -766,18 +787,22 @@ private:
*/
BalancerDistLocks _distributedLocks;
+ /*
+ * Counter of oustanding requests that were interrupted by a prior step-down/crash event,
+ * and that the scheduler is currently submitting as part of its initial recovery phase.
+ */
+ size_t _numRequestsToRecover{0};
+
ResponseHandle _buildAndEnqueueNewRequest(OperationContext* opCtx,
std::shared_ptr<CommandInfo>&& commandInfo);
ResponseHandle _enqueueRequest(WithLock, RequestData&& request);
- bool _canSubmitNewRequests(WithLock);
-
- bool _deferredCleanupRequired(WithLock);
-
- void _performDeferredCleanup(WithLock, OperationContext* opCtx);
+ void _performDeferredCleanup(OperationContext* opCtx,
+ std::vector<RequestData>&& requestsHoldingResources);
- CommandSubmissionResult _submit(OperationContext* opCtx, const CommandSubmissionHandle& data);
+ CommandSubmissionResult _submit(OperationContext* opCtx,
+ const CommandSubmissionParameters& data);
void _applySubmissionResult(WithLock, CommandSubmissionResult&& submissionResult);
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
index 51ea5d95995..339ce137003 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
@@ -117,11 +117,6 @@ TEST_F(BalancerCommandsSchedulerTest, StartAndStopScheduler) {
_scheduler.stop();
}
-TEST_F(BalancerCommandsSchedulerTest, ResilientToMultipleStarts) {
- _scheduler.start(operationContext());
- _scheduler.start(operationContext());
-}
-
TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) {
auto deferredCleanupCompletedCheckpoint =
globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint");
@@ -308,7 +303,7 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenSchedulerIsStopped) {
TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) {
std::unique_ptr<MoveChunkResponse> resp;
{
- FailPointEnableBlock failPoint("pauseBalancerWorkerThread");
+ FailPointEnableBlock failPoint("pauseSubmissionsFailPoint");
_scheduler.start(operationContext());
ChunkType moveChunk = makeChunk(0, kShardId0);
resp = _scheduler.requestMoveChunk(operationContext(),
@@ -335,7 +330,7 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) {
TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenRequestIsSubmitted) {
// This prevents the request from being submitted by the scheduler worker thread.
- FailPointEnableBlock failPoint("pauseBalancerWorkerThread");
+ FailPointEnableBlock failPoint("pauseSubmissionsFailPoint");
auto opCtx = operationContext();
_scheduler.start(opCtx);
@@ -374,7 +369,7 @@ TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenReq
}
TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecoveringFromCrash) {
- FailPoint* failpoint = globalFailPointRegistry().find("pauseBalancerWorkerThread");
+ FailPoint* failpoint = globalFailPointRegistry().find("pauseSubmissionsFailPoint");
failpoint->setMode(FailPoint::Mode::alwaysOn);
auto opCtx = operationContext();
_scheduler.start(opCtx);
@@ -431,7 +426,7 @@ TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecovering
TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL) {
OperationContext* opCtx;
- FailPoint* failpoint = globalFailPointRegistry().find("pauseBalancerWorkerThread");
+ FailPoint* failpoint = globalFailPointRegistry().find("pauseSubmissionsFailPoint");
failpoint->setMode(FailPoint::Mode::alwaysOn);
{
_scheduler.start(operationContext());