summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2022-03-01 10:52:15 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-01 11:21:25 +0000
commit07d0be8c575c796065fdc18d05723bd92f66c8f2 (patch)
tree5922856c22cc8bfdbc03a6b3e3f2f0bc3b50d25f /src/mongo
parent8c85963c5e8e6cec0ba996de80e3cc7aa5a6b39d (diff)
downloadmongo-07d0be8c575c796065fdc18d05723bd92f66c8f2.tar.gz
SERVER-60920 BalancesCommandsScheduler completes every outstanding command before stopping
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp80
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h36
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp2
3 files changed, 50 insertions, 68 deletions
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
index e62202236ba..40ed0cc032f 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
@@ -404,10 +404,7 @@ CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit(
auto swRemoteCommandHandle =
_executor->scheduleRemoteCommand(remoteCommand, onRemoteResponseReceived);
- return (
- swRemoteCommandHandle.isOK()
- ? CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getValue())
- : CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getStatus()));
+ return CommandSubmissionResult(params.id, distLockTaken, swRemoteCommandHandle.getStatus());
}
void BalancerCommandsSchedulerImpl::_applySubmissionResult(
@@ -430,10 +427,7 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse(
UUID requestId, const executor::RemoteCommandResponse& response) {
{
stdx::lock_guard<Latch> lg(_mutex);
- if (_state == SchedulerState::Stopping || _state == SchedulerState::Stopped) {
- // Drop the response - the request is being cancelled in the worker thread.
- return;
- }
+ invariant(_state != SchedulerState::Stopped);
auto requestIt = _requests.find(requestId);
invariant(requestIt != _requests.end());
auto& request = requestIt->second;
@@ -453,9 +447,14 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse(
}
void BalancerCommandsSchedulerImpl::_performDeferredCleanup(
- OperationContext* opCtx, std::vector<RequestData>&& requestsHoldingResources) {
+ OperationContext* opCtx,
+ const stdx::unordered_map<UUID, RequestData, UUID::Hash>& requestsHoldingResources) {
+ if (requestsHoldingResources.empty()) {
+ return;
+ }
+
DBDirectClient dbClient(opCtx);
- for (const auto& request : requestsHoldingResources) {
+ for (const auto& [_, request] : requestsHoldingResources) {
if (request.holdsDistributedLock()) {
_distributedLocks.releaseFor(opCtx, request.getNamespace());
}
@@ -463,9 +462,8 @@ void BalancerCommandsSchedulerImpl::_performDeferredCleanup(
deletePersistedRecoveryInfo(dbClient, request.getCommandInfo());
}
}
- if (!requestsHoldingResources.empty()) {
- deferredCleanupCompletedCheckpoint.pauseWhileSet();
- }
+
+ deferredCleanupCompletedCheckpoint.pauseWhileSet();
}
void BalancerCommandsSchedulerImpl::_workerThread() {
@@ -478,19 +476,18 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
Client::initThread("BalancerCommandsScheduler");
bool stopWorkerRequested = false;
- stdx::unordered_map<UUID, RequestData, UUID::Hash> requestsToCleanUpOnExit;
LOGV2(5847205, "Balancer scheduler thread started");
while (!stopWorkerRequested) {
std::vector<CommandSubmissionParameters> commandsToSubmit;
std::vector<CommandSubmissionResult> submissionResults;
- std::vector<RequestData> completedRequestsToCleanUp;
+ stdx::unordered_map<UUID, RequestData, UUID::Hash> completedRequestsToCleanUp;
// 1. Check the internal state and plan for the actions to be taken ont this round.
{
stdx::unique_lock<Latch> ul(_mutex);
invariant(_state != SchedulerState::Stopped);
- _stateUpdatedCV.wait(ul, [this, &ul] {
+ _stateUpdatedCV.wait(ul, [this] {
return ((!_unsubmittedRequestIds.empty() &&
!MONGO_likely(pauseSubmissionsFailPoint.shouldFail())) ||
_state == SchedulerState::Stopping ||
@@ -499,30 +496,32 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
for (const auto& requestId : _recentlyCompletedRequestIds) {
auto it = _requests.find(requestId);
- completedRequestsToCleanUp.emplace_back(std::move(it->second));
+ completedRequestsToCleanUp.emplace(it->first, std::move(it->second));
_requests.erase(it);
}
_recentlyCompletedRequestIds.clear();
- if (_state == SchedulerState::Stopping) {
- // Reset the internal state and prepare to leave
- _unsubmittedRequestIds.clear();
- _requests.swap(requestsToCleanUpOnExit);
- stopWorkerRequested = true;
- } else {
- // Pick up new commands to be submitted
- for (const auto& requestId : _unsubmittedRequestIds) {
- const auto& requestData = _requests.at(requestId);
+ for (const auto& requestId : _unsubmittedRequestIds) {
+ auto& requestData = _requests.at(requestId);
+ if (_state != SchedulerState::Stopping) {
commandsToSubmit.push_back(requestData.getSubmissionParameters());
+ } else {
+ requestData.setOutcome(
+ Status(ErrorCodes::BalancerInterrupted,
+ "Request cancelled - balancer scheduler is stopping"));
+ completedRequestsToCleanUp.emplace(requestId, std::move(requestData));
+ _requests.erase(requestId);
}
- _unsubmittedRequestIds.clear();
}
+ _unsubmittedRequestIds.clear();
+ stopWorkerRequested = _state == SchedulerState::Stopping;
}
// 2.a Free any resource acquired by already completed/aborted requests.
{
auto opCtxHolder = cc().makeOperationContext();
- _performDeferredCleanup(opCtxHolder.get(), std::move(completedRequestsToCleanUp));
+ _performDeferredCleanup(opCtxHolder.get(), completedRequestsToCleanUp);
+ completedRequestsToCleanUp.clear();
}
// 2.b Serve the picked up requests, submitting their related commands.
@@ -532,11 +531,11 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
submissionInfo.commandInfo.get()->attachOperationMetadataTo(opCtxHolder.get());
}
submissionResults.push_back(_submit(opCtxHolder.get(), submissionInfo));
- if (!submissionResults.back().context.isOK()) {
+ if (!submissionResults.back().outcome.isOK()) {
LOGV2(5847206,
"Submission for scheduler command request failed",
"reqId"_attr = submissionResults.back().id,
- "cause"_attr = submissionResults.back().context.getStatus());
+ "cause"_attr = submissionResults.back().outcome);
}
}
@@ -548,18 +547,15 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
}
}
}
-
- // In case of clean exit, cancel all the pending/running command requests
- // (but keep the related descriptor documents to ensure they will be reissued on recovery).
- auto opCtxHolder = cc().makeOperationContext();
- for (auto& idAndRequest : requestsToCleanUpOnExit) {
- idAndRequest.second.setOutcome(Status(
- ErrorCodes::BalancerInterrupted, "Request cancelled - balancer scheduler is stopping"));
- const auto& cancelHandle = idAndRequest.second.getExecutionContext();
- if (cancelHandle) {
- _executor->cancel(*cancelHandle);
- }
- _distributedLocks.releaseFor(opCtxHolder.get(), idAndRequest.second.getNamespace());
+ // Wait for each outstanding command to complete, clean out its resources and leave.
+ {
+ stdx::unique_lock<Latch> ul(_mutex);
+ _stateUpdatedCV.wait(
+ ul, [this] { return (_requests.size() == _recentlyCompletedRequestIds.size()); });
+ auto opCtxHolder = cc().makeOperationContext();
+ _performDeferredCleanup(opCtxHolder.get(), _requests);
+ _requests.clear();
+ _recentlyCompletedRequestIds.clear();
}
}
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
index e127767a69c..536687848c0 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
@@ -399,21 +399,17 @@ struct CommandSubmissionParameters {
const std::shared_ptr<CommandInfo> commandInfo;
};
-
-using ExecutionContext = executor::TaskExecutor::CallbackHandle;
-
/**
* Helper data structure for storing the outcome of a Command submission.
*/
struct CommandSubmissionResult {
- CommandSubmissionResult(UUID id, bool acquiredDistLock, StatusWith<ExecutionContext>&& context)
- : id(id), acquiredDistLock(acquiredDistLock), context(std::move(context)) {}
- CommandSubmissionResult(CommandSubmissionResult&& rhs)
- : id(rhs.id), acquiredDistLock(rhs.acquiredDistLock), context(std::move(rhs.context)) {}
+ CommandSubmissionResult(UUID id, bool acquiredDistLock, const Status& outcome)
+ : id(id), acquiredDistLock(acquiredDistLock), outcome(outcome) {}
+ CommandSubmissionResult(CommandSubmissionResult&& rhs) = default;
CommandSubmissionResult(const CommandSubmissionResult& rhs) = delete;
UUID id;
bool acquiredDistLock;
- StatusWith<ExecutionContext> context;
+ Status outcome;
};
/**
@@ -427,8 +423,7 @@ public:
_completedOrAborted(false),
_holdingDistLock(false),
_commandInfo(std::move(commandInfo)),
- _responsePromise{NonNullPromiseTag{}},
- _executionContext(boost::none) {
+ _responsePromise{NonNullPromiseTag{}} {
invariant(_commandInfo);
}
@@ -437,8 +432,7 @@ public:
_completedOrAborted(rhs._completedOrAborted),
_holdingDistLock(rhs._holdingDistLock),
_commandInfo(std::move(rhs._commandInfo)),
- _responsePromise(std::move(rhs._responsePromise)),
- _executionContext(std::move(rhs._executionContext)) {}
+ _responsePromise(std::move(rhs._responsePromise)) {}
~RequestData() = default;
@@ -458,21 +452,14 @@ public:
// Keep the original outcome and continue the workflow.
return Status::OK();
}
- auto submissionStatus = submissionResult.context.getStatus();
- if (submissionStatus.isOK()) {
- // store the execution context to be able to serve future cancel requests.
- _executionContext = std::move(submissionResult.context.getValue());
- } else {
+ const auto& submissionStatus = submissionResult.outcome;
+ if (!submissionStatus.isOK()) {
// cascade the submission failure
setOutcome(submissionStatus);
}
return submissionStatus;
}
- const boost::optional<executor::TaskExecutor::CallbackHandle>& getExecutionContext() {
- return _executionContext;
- }
-
const CommandInfo& getCommandInfo() const {
return *_commandInfo;
}
@@ -512,8 +499,6 @@ private:
std::shared_ptr<CommandInfo> _commandInfo;
Promise<executor::RemoteCommandResponse> _responsePromise;
-
- boost::optional<ExecutionContext> _executionContext;
};
/**
@@ -618,8 +603,9 @@ private:
void _enqueueRequest(WithLock, RequestData&& request);
- void _performDeferredCleanup(OperationContext* opCtx,
- std::vector<RequestData>&& requestsHoldingResources);
+ void _performDeferredCleanup(
+ OperationContext* opCtx,
+ const stdx::unordered_map<UUID, RequestData, UUID::Hash>& requestsHoldingResources);
CommandSubmissionResult _submit(OperationContext* opCtx,
const CommandSubmissionParameters& data);
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
index f65638ecb22..18c5dc086a1 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
@@ -323,7 +323,7 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenSchedulerIsStopped) {
}
}
-TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) {
+TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfUnsubmittedBeforeBalancerStops) {
SemiFuture<void> futureResponse;
{
FailPointEnableBlock failPoint("pauseSubmissionsFailPoint");