summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2021-10-05 09:34:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-05 10:31:41 +0000
commitc06be6d51a30e00e3fab6e7baaa95e340cd6352d (patch)
tree8082baa89fb5b52b03c1f7396e7b77b4c7fea4fe
parent0b4fd0cc9ab87b0ca59b1046e5ca4e40060988cb (diff)
downloadmongo-c06be6d51a30e00e3fab6e7baaa95e340cd6352d.tar.gz
SERVER-59964 Persist data on commands issued by BalancerCommandsScheduler to support crash recovery routine
-rw-r--r--src/mongo/db/namespace_string.cpp4
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/balancer/balancer_command_document.idl69
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler.h17
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp217
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h136
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp251
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.cpp13
9 files changed, 538 insertions, 175 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index b57e1054a0f..45d4f231029 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -117,6 +117,10 @@ const NamespaceString NamespaceString::kShardingRenameParticipantsNamespace(
const NamespaceString NamespaceString::kConfigSettingsNamespace(NamespaceString::kConfigDb,
"settings");
+
+const NamespaceString NamespaceString::kConfigBalancerCommandsNamespace(
+ NamespaceString::kConfigDb, "balancerCommandsSchedulerOngoingOperations");
+
const NamespaceString NamespaceString::kVectorClockNamespace(NamespaceString::kConfigDb,
"vectorClock");
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index ca1a0bbd4bc..41efc618e8f 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -169,6 +169,9 @@ public:
// Namespace for balancer settings and default read and write concerns.
static const NamespaceString kConfigSettingsNamespace;
+ // Namespace with information on commands sent by the balancer (thru BalancerCommandsScheduler).
+ static const NamespaceString kConfigBalancerCommandsNamespace;
+
// Namespace for vector clock state.
static const NamespaceString kVectorClockNamespace;
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index d9317dfdbf4..39d418721d5 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -208,6 +208,7 @@ env.Library(
'balancer/balance_stats.cpp',
'balancer/balancer_chunk_selection_policy_impl.cpp',
'balancer/balancer_chunk_selection_policy.cpp',
+ 'balancer/balancer_command_document.idl',
'balancer/balancer_commands_scheduler_impl.cpp',
'balancer/balancer_policy.cpp',
'balancer/balancer.cpp',
@@ -440,7 +441,6 @@ env.CppUnitTest(
'active_migrations_registry_test.cpp',
'auto_split_vector_test.cpp',
'balancer/balance_stats_test.cpp',
- 'balancer/balancer_commands_scheduler_test.cpp',
'chunk_split_state_driver_test.cpp',
'collection_metadata_filtering_test.cpp',
'collection_metadata_test.cpp',
@@ -535,6 +535,7 @@ env.CppUnitTest(
'balancer/balancer_policy_test.cpp',
'balancer/cluster_statistics_test.cpp',
'balancer/core_options_stub.cpp',
+ 'balancer/balancer_commands_scheduler_test.cpp',
'balancer/migration_manager_test.cpp',
'balancer/migration_test_fixture.cpp',
'balancer/scoped_migration_request_test.cpp',
diff --git a/src/mongo/db/s/balancer/balancer_command_document.idl b/src/mongo/db/s/balancer/balancer_command_document.idl
new file mode 100644
index 00000000000..b4d8e23d752
--- /dev/null
+++ b/src/mongo/db/s/balancer/balancer_command_document.idl
@@ -0,0 +1,69 @@
+# Copyright (C) 2021-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# This file defines the format of documents stored in
+# config.balancerCommandsSchedulerOngoingOperations on the config server.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+ - "mongo/s/sharding_types.idl"
+
+structs:
+ PersistedBalancerCommand:
+ description: "Descriptor of a command issued by BalancerCommandsScheduler,
+ persisted on disk (under NamespaceString::kConfigBalancerCommandsNamespace)
+ to be re-executed in case of server crash."
+ strict: false
+ generate_comparison_operators: false
+ fields:
+ requestId:
+ type: uuid
+ description: "Original RequestID associated to the command
+ when this was issued for the first time. For logging purposes."
+ optional: false
+ remoteCommand:
+ type: object_owned
+ description: "Serialised version of the remote command submitted by the
+ BalancerCommandsScheduler."
+ optional: false
+ target:
+ type: shard_id
+ description: "Id of the shard that has to receive and execute the remote command."
+ optional: false
+ nss:
+ type: namespacestring
+ description: "Namespace string being targeted by the remote command."
+ optional: false
+ requiresDistributedLock:
+ type: bool
+ description: "When true, the safe execution of the remote command requires
+ the distributed lock for the targeted namespace to be taken."
+ optional: false
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler.h b/src/mongo/db/s/balancer/balancer_commands_scheduler.h
index d979ba7c800..c749f970f68 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler.h
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler.h
@@ -88,7 +88,7 @@ class DeferredResponse {
public:
virtual ~DeferredResponse() = default;
- virtual uint32_t getRequestId() const = 0;
+ virtual UUID getRequestId() const = 0;
virtual bool hasFinalised() const = 0;
@@ -129,10 +129,10 @@ public:
virtual ~BalancerCommandsScheduler() = default;
/**
- * Activates the scheduler, which will start accepting and processing
- * request<Command>() invocations.
+ * Triggers an asynchronous self-initialisation of the component,
+ * which will start accepting request<Command>() invocations.
*/
- virtual void start() = 0;
+ virtual void start(OperationContext* opCtx) = 0;
/**
* Stops the scheduler and the processing of any outstanding and incoming request
@@ -141,27 +141,34 @@ public:
virtual void stop() = 0;
virtual std::unique_ptr<MoveChunkResponse> requestMoveChunk(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardId& recipient,
const MoveChunkSettings& commandSettings) = 0;
virtual std::unique_ptr<MergeChunksResponse> requestMergeChunks(
- const NamespaceString& nss, const ChunkType& lowerBound, const ChunkType& upperBound) = 0;
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkType& lowerBound,
+ const ChunkType& upperBound) = 0;
virtual std::unique_ptr<SplitVectorResponse> requestSplitVector(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardKeyPattern& shardKeyPattern,
const SplitVectorSettings& commandSettings) = 0;
virtual std::unique_ptr<SplitChunkResponse> requestSplitChunk(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardKeyPattern& shardKeyPattern,
const std::vector<BSONObj>& splitPoints) = 0;
virtual std::unique_ptr<ChunkDataSizeResponse> requestChunkDataSize(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardKeyPattern& shardKeyPattern,
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
index 3e0a2451946..602423605b5 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/s/balancer/balancer_commands_scheduler_impl.h"
#include "mongo/db/client.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/grid.h"
@@ -77,24 +78,24 @@ BalancerCommandsSchedulerImpl::~BalancerCommandsSchedulerImpl() {
stop();
}
-void BalancerCommandsSchedulerImpl::_setState(WithLock, SchedulerState state) {
- _state = state;
- _stateUpdatedCV.notify_all();
-}
-
-void BalancerCommandsSchedulerImpl::start() {
+void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx) {
LOGV2(5847200, "Balancer command scheduler start requested");
stdx::lock_guard<Latch> lgss(_startStopMutex);
stdx::lock_guard<Latch> lg(_mutex);
- if (_state == SchedulerState::Running) {
+ if (_state == SchedulerState::Recovering || _state == SchedulerState::Running) {
return;
}
- // TODO init _requestIdCounter here based on the stored running requests from a past invocation?
invariant(!_workerThreadHandle.joinable());
_incompleteRequests.reserve(_maxRunningRequests * 10);
_runningRequestIds.reserve(_maxRunningRequests);
- _state = SchedulerState::Running;
+ auto requestsToRecover = _loadRequestsToRecover(opCtx);
+ _state = requestsToRecover.empty() ? SchedulerState::Running : SchedulerState::Recovering;
+
+ for (auto& requestToRecover : requestsToRecover) {
+ _enqueueRequest(lg, std::move(requestToRecover));
+ }
+
_workerThreadHandle = stdx::thread([this] { _workerThread(); });
}
@@ -108,12 +109,14 @@ void BalancerCommandsSchedulerImpl::stop() {
}
invariant(_workerThreadHandle.joinable());
- _setState(lg, SchedulerState::Stopping);
+ _state = SchedulerState::Stopping;
+ _stateUpdatedCV.notify_all();
}
_workerThreadHandle.join();
}
std::unique_ptr<MoveChunkResponse> BalancerCommandsSchedulerImpl::requestMoveChunk(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardId& recipient,
@@ -130,17 +133,17 @@ std::unique_ptr<MoveChunkResponse> BalancerCommandsSchedulerImpl::requestMoveChu
commandSettings.forceJumbo,
chunk.getVersion());
- auto requestCollectionInfo = _enqueueNewRequest(std::move(commandInfo));
+ auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo));
return std::make_unique<MoveChunkResponseImpl>(std::move(requestCollectionInfo));
}
std::unique_ptr<MergeChunksResponse> BalancerCommandsSchedulerImpl::requestMergeChunks(
- const NamespaceString& nss, const ChunkType& lowerBound, const ChunkType& upperBound) {
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkType& lowerBound,
+ const ChunkType& upperBound) {
invariant(lowerBound.getShard() == upperBound.getShard());
invariant(lowerBound.getMax().woCompare(upperBound.getMin()) <= 0);
- // TODO why this may fail?
- // invariant(lowerBound.epoch() == upperBound.epoch() &&
- // lowerBound.getVersion().majorVersion() == upperBound.getVersion().majorVersion());
auto commandInfo = std::make_shared<MergeChunksCommandInfo>(
nss,
@@ -150,11 +153,12 @@ std::unique_ptr<MergeChunksResponse> BalancerCommandsSchedulerImpl::requestMerge
lowerBound.getVersion().isOlderThan(upperBound.getVersion()) ? upperBound.getVersion()
: lowerBound.getVersion());
- auto requestCollectionInfo = _enqueueNewRequest(std::move(commandInfo));
+ auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo));
return std::make_unique<MergeChunksResponseImpl>(std::move(requestCollectionInfo));
}
std::unique_ptr<SplitVectorResponse> BalancerCommandsSchedulerImpl::requestSplitVector(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardKeyPattern& shardKeyPattern,
@@ -170,11 +174,12 @@ std::unique_ptr<SplitVectorResponse> BalancerCommandsSchedulerImpl::requestSplit
commandSettings.maxChunkSizeBytes,
commandSettings.force);
- auto requestCollectionInfo = _enqueueNewRequest(std::move(commandInfo));
+ auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo));
return std::make_unique<SplitVectorResponseImpl>(std::move(requestCollectionInfo));
}
std::unique_ptr<SplitChunkResponse> BalancerCommandsSchedulerImpl::requestSplitChunk(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardKeyPattern& shardKeyPattern,
@@ -188,11 +193,12 @@ std::unique_ptr<SplitChunkResponse> BalancerCommandsSchedulerImpl::requestSplitC
chunk.getVersion(),
splitPoints);
- auto requestCollectionInfo = _enqueueNewRequest(std::move(commandInfo));
+ auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo));
return std::make_unique<SplitChunkResponseImpl>(std::move(requestCollectionInfo));
}
std::unique_ptr<ChunkDataSizeResponse> BalancerCommandsSchedulerImpl::requestChunkDataSize(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardKeyPattern& shardKeyPattern,
@@ -205,36 +211,73 @@ std::unique_ptr<ChunkDataSizeResponse> BalancerCommandsSchedulerImpl::requestChu
estimatedValue,
chunk.getVersion());
- auto requestCollectionInfo = _enqueueNewRequest(std::move(commandInfo));
+ auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo));
return std::make_unique<ChunkDataSizeResponseImpl>(std::move(requestCollectionInfo));
}
-ResponseHandle BalancerCommandsSchedulerImpl::_enqueueNewRequest(
- std::shared_ptr<CommandInfo>&& commandInfo) {
- const auto newRequestId = _requestIdCounter.fetchAndAddRelaxed(1);
+ResponseHandle BalancerCommandsSchedulerImpl::_buildAndEnqueueNewRequest(
+ OperationContext* opCtx, std::shared_ptr<CommandInfo>&& commandInfo) {
+ const auto newRequestId = UUID::gen();
LOGV2(5847202,
- "Enqueuing new Balancer command request with id {reqId}. Details: {command} ",
+ "Enqueuing new Balancer command request with id {reqId}. Details: {command}",
"Enqueuing new Balancer command request",
"reqId"_attr = newRequestId,
- "command"_attr = commandInfo->serialise().toString());
+ "command"_attr = commandInfo->serialise().toString(),
+ "recoveryDocRequired"_attr = commandInfo->requiresRecoveryOnCrash());
+
+ if (commandInfo->requiresRecoveryOnCrash()) {
+ DBDirectClient dbClient(opCtx);
+ PersistedBalancerCommand recoveryDoc(newRequestId,
+ commandInfo->serialise(),
+ commandInfo->getTarget(),
+ commandInfo->getNameSpace(),
+ commandInfo->requiresDistributedLock());
+ std::vector<BSONObj> serialisedRecoveryInfo;
+ serialisedRecoveryInfo.emplace_back(recoveryDoc.toBSON());
+ auto reply = dbClient.insertAcknowledged(
+ NamespaceString::kConfigBalancerCommandsNamespace.toString(),
+ serialisedRecoveryInfo,
+ true,
+ WriteConcernOptions::Majority);
+
+ if (auto writeStatus = getStatusFromWriteCommandReply(reply); !writeStatus.isOK()) {
+ LOGV2(5847210,
+ "Failed to persist command document for reqId {reqId}. Error status: {status}",
+ "Failed to persist request command document",
+ "reqId"_attr = newRequestId,
+ "status"_attr = writeStatus);
+ return ResponseHandle(newRequestId, writeStatus);
+ }
+ }
RequestData pendingRequest(newRequestId, std::move(commandInfo));
- auto deferredResponseHandle = pendingRequest.getResponseHandle();
- {
- stdx::lock_guard<Latch> lg(_mutex);
- if (_state == SchedulerState::Running) {
- invariant(_workerThreadHandle.joinable());
- _incompleteRequests.emplace(std::make_pair(newRequestId, std::move(pendingRequest)));
- _pendingRequestIds.push_back(newRequestId);
- _stateUpdatedCV.notify_all();
- } else {
- deferredResponseHandle.handle->set(Status(
- ErrorCodes::CallbackCanceled, "Request rejected - balancer scheduler is stopped"));
- }
+
+ stdx::unique_lock<Latch> ul(_mutex);
+ _stateUpdatedCV.wait(ul, [this] { return _state != SchedulerState::Recovering; });
+
+ return _enqueueRequest(ul, std::move(pendingRequest));
+}
+
+ResponseHandle BalancerCommandsSchedulerImpl::_enqueueRequest(WithLock, RequestData&& request) {
+ auto requestId = request.getId();
+ auto deferredResponseHandle = request.getResponseHandle();
+ if (_state == SchedulerState::Recovering || _state == SchedulerState::Running) {
+ _incompleteRequests.emplace(std::make_pair(requestId, std::move(request)));
+ _pendingRequestIds.push_back(requestId);
+ _stateUpdatedCV.notify_all();
+ } else {
+ deferredResponseHandle.handle->set(Status(
+ ErrorCodes::CallbackCanceled, "Request rejected - balancer scheduler is stopped"));
}
+
return deferredResponseHandle;
}
+bool BalancerCommandsSchedulerImpl::_canSubmitNewRequests(WithLock) {
+ return (!_pendingRequestIds.empty() && _runningRequestIds.size() < _maxRunningRequests &&
+ MONGO_likely(!pauseBalancerWorkerThread.shouldFail()));
+}
+
Status BalancerCommandsSchedulerImpl::_acquireDistLock(OperationContext* opCtx,
NamespaceString nss) {
auto it = _migrationLocks.find(nss);
@@ -304,7 +347,7 @@ CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit(
_applyCommandResponse(opCtx, requestId, args.response);
};
- if (handle.commandInfo->getType() == CommandInfo::Type::kMoveChunk) {
+ if (handle.commandInfo->requiresDistributedLock()) {
Status lockAcquisitionResponse =
_acquireDistLock(opCtx, handle.commandInfo->getNameSpace());
if (!lockAcquisitionResponse.isOK()) {
@@ -336,18 +379,21 @@ void BalancerCommandsSchedulerImpl::_applySubmissionResult(
* - set the execution context on the RequestData to be able to serve future cancel requests
* - update the data structure describing running command requests
* If failed,
- * - persist the outcome into the deferred response
- * - remove the RequestData (and the info on migration)
+ * - store the outcome into the deferred response
+ * - remove the RequestData and its persisted info (if any)
*/
auto& requestToUpdate = requestToUpdateIt->second;
if (submissionResult.context.isOK()) {
requestToUpdate.addExecutionContext(std::move(submissionResult.context.getValue()));
_runningRequestIds.insert(submissionResult.id);
} else {
+ const auto& submittedCommandInfo = requestToUpdate.getCommandInfo();
if (submissionResult.acquiredDistLock) {
- const auto& submittedCommandInfo = requestToUpdate.getCommandInfo();
_releaseDistLock(opCtx, submittedCommandInfo.getNameSpace());
}
+ if (submittedCommandInfo.requiresRecoveryOnCrash()) {
+ _obsoleteRecoveryDocumentIds.push_back(submissionResult.id);
+ }
requestToUpdate.setOutcome(submissionResult.context.getStatus());
_incompleteRequests.erase(requestToUpdateIt);
}
@@ -355,24 +401,31 @@ void BalancerCommandsSchedulerImpl::_applySubmissionResult(
void BalancerCommandsSchedulerImpl::_applyCommandResponse(
OperationContext* opCtx,
- uint32_t requestId,
+ UUID requestId,
const executor::TaskExecutor::ResponseStatus& response) {
{
stdx::lock_guard<Latch> lg(_mutex);
auto requestToCompleteIt = _incompleteRequests.find(requestId);
- if (_state != SchedulerState::Running || requestToCompleteIt == _incompleteRequests.end()) {
+ if (_state == SchedulerState::Stopping || _state == SchedulerState::Stopped ||
+ requestToCompleteIt == _incompleteRequests.end()) {
// Drop the response - the request is being cancelled in the worker thread.
- // (TODO modify to handle stepdown scenarios).
return;
}
auto& requestToComplete = requestToCompleteIt->second;
requestToComplete.setOutcome(response);
auto& commandInfo = requestToComplete.getCommandInfo();
- if (commandInfo.getType() == CommandInfo::Type::kMoveChunk) {
+ if (commandInfo.requiresDistributedLock()) {
_releaseDistLock(opCtx, commandInfo.getNameSpace());
}
+ if (commandInfo.requiresRecoveryOnCrash()) {
+ _obsoleteRecoveryDocumentIds.push_back(requestId);
+ }
_runningRequestIds.erase(requestId);
_incompleteRequests.erase(requestToCompleteIt);
+ if (_incompleteRequests.empty() && _state == SchedulerState::Recovering) {
+ LOGV2(5847213, "Balancer scheduler recovery complete. Switching to regular execution");
+ _state = SchedulerState::Running;
+ }
_stateUpdatedCV.notify_all();
}
LOGV2(5847204,
@@ -382,6 +435,59 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse(
"response"_attr = response.toString());
}
+std::vector<RequestData> BalancerCommandsSchedulerImpl::_loadRequestsToRecover(
+ OperationContext* opCtx) {
+ std::vector<RequestData> requestsToRecover;
+ auto documentProcessor = [&requestsToRecover](const BSONObj& commandToRecoverDoc) {
+ auto originalCommand = PersistedBalancerCommand::parse(
+ IDLParserErrorContext("BalancerCommandsScheduler"), commandToRecoverDoc);
+ auto recoveryCommand = std::make_shared<RecoveryCommandInfo>(originalCommand);
+ LOGV2(5847212,
+ "Recovered request id {reqId}, which command will be rescheduled. Details: "
+ "{command}",
+ "Command request recovered and set for rescheduling",
+ "reqId"_attr = originalCommand.getRequestId(),
+ "command"_attr = recoveryCommand->serialise());
+ requestsToRecover.emplace_back(originalCommand.getRequestId(), std::move(recoveryCommand));
+ };
+ DBDirectClient dbClient(opCtx);
+ dbClient.query(documentProcessor, NamespaceString::kConfigBalancerCommandsNamespace, BSONObj());
+ return requestsToRecover;
+}
+
+void BalancerCommandsSchedulerImpl::_cleanUpObsoleteRecoveryInfo(WithLock,
+ OperationContext* opCtx) {
+ if (_obsoleteRecoveryDocumentIds.empty()) {
+ return;
+ }
+ BSONObjBuilder queryBuilder;
+ if (_obsoleteRecoveryDocumentIds.size() == 1) {
+ _obsoleteRecoveryDocumentIds[0].appendToBuilder(
+ &queryBuilder, PersistedBalancerCommand::kRequestIdFieldName);
+ } else {
+ BSONObjBuilder requestIdClause(
+ queryBuilder.subobjStart(PersistedBalancerCommand::kRequestIdFieldName));
+ BSONArrayBuilder valuesBuilder(requestIdClause.subarrayStart("$in"));
+ for (const auto& requestId : _obsoleteRecoveryDocumentIds) {
+ requestId.appendToArrayBuilder(&valuesBuilder);
+ }
+ }
+
+ _obsoleteRecoveryDocumentIds.clear();
+
+ auto query = queryBuilder.obj();
+ DBDirectClient dbClient(opCtx);
+
+ auto reply = dbClient.removeAcknowledged(
+ NamespaceString::kConfigBalancerCommandsNamespace.toString(), query);
+
+ LOGV2(5847211,
+ "Clean up of obsolete document info performed with outcome {reply}",
+ "Clean up of obsolete document info performed",
+ "query"_attr = query,
+ "reply"_attr = reply);
+}
+
void BalancerCommandsSchedulerImpl::_workerThread() {
ON_BLOCK_EXIT([this] {
@@ -389,32 +495,38 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
"BalancerCommandsScheduler worker thread failed to release all locks on exit");
LOGV2(5847208, "Leaving balancer command scheduler thread");
stdx::lock_guard<Latch> lg(_mutex);
- _setState(lg, SchedulerState::Stopped);
+ _state = SchedulerState::Stopped;
+ _stateUpdatedCV.notify_all();
});
Client::initThread("BalancerCommandsScheduler");
auto opCtxHolder = cc().makeOperationContext();
- stdx::unordered_map<uint32_t, RequestData> requestsToCleanUpOnExit;
+ stdx::unordered_map<UUID, RequestData, UUID::Hash> requestsToCleanUpOnExit;
LOGV2(5847205, "Balancer scheduler thread started");
while (true) {
std::vector<CommandSubmissionHandle> commandsToSubmit;
{
stdx::unique_lock<Latch> ul(_mutex);
- _stateUpdatedCV.wait(ul, [this] {
- return (_state != SchedulerState::Running ||
- (!_pendingRequestIds.empty() &&
- _runningRequestIds.size() < _maxRunningRequests &&
- MONGO_likely(!pauseBalancerWorkerThread.shouldFail())));
+ invariant(_state != SchedulerState::Stopped);
+ _stateUpdatedCV.wait(ul, [this, &ul] {
+ return (_state == SchedulerState::Stopping || _canSubmitNewRequests(ul) ||
+ !_obsoleteRecoveryDocumentIds.empty());
});
- if (_state != SchedulerState::Running) {
+ _cleanUpObsoleteRecoveryInfo(ul, opCtxHolder.get());
+
+ if (_state == SchedulerState::Stopping) {
_runningRequestIds.clear();
_pendingRequestIds.clear();
_incompleteRequests.swap(requestsToCleanUpOnExit);
break;
}
+ if (!_canSubmitNewRequests(ul)) {
+ continue;
+ }
+
// 1. Pick up new requests to be submitted from the pending list, if possible.
const auto availableSubmissionSlots =
static_cast<size_t>(_maxRunningRequests - _runningRequestIds.size());
@@ -450,6 +562,7 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
}
// In case of clean exit, cancel all the pending/running command requests
+ // (but keep the related descriptor documents to ensure they will be reissued on recovery).
auto executor = Grid::get(opCtxHolder.get())->getExecutorPool()->getFixedExecutor();
for (auto& idAndRequest : requestsToCleanUpOnExit) {
idAndRequest.second.setOutcome(Status(
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
index 7dcab8b20fd..6970d5a965c 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
@@ -32,6 +32,7 @@
#include <list>
#include <unordered_map>
+#include "mongo/db/s/balancer/balancer_command_document_gen.h"
#include "mongo/db/s/balancer/balancer_commands_scheduler.h"
#include "mongo/db/s/dist_lock_manager.h"
#include "mongo/db/service_context.h"
@@ -46,13 +47,20 @@ namespace mongo {
* Data structure generated from RequestData to support the creation of SchedulerResponse objects.
*/
struct ResponseHandle {
- ResponseHandle(uint32_t requestId,
+ ResponseHandle(UUID requestId,
const std::shared_ptr<Notification<executor::RemoteCommandResponse>>& handle)
: requestId(requestId), handle(handle) {}
+
+ ResponseHandle(UUID requestId, Status outcome)
+ : requestId(requestId),
+ handle(std::make_shared<Notification<executor::RemoteCommandResponse>>()) {
+ handle->set(outcome);
+ }
+
ResponseHandle(ResponseHandle&& rhs)
: requestId(rhs.requestId), handle(std::move(rhs.handle)) {}
- const uint32_t requestId;
+ const UUID requestId;
const std::shared_ptr<Notification<executor::RemoteCommandResponse>> handle;
};
@@ -69,7 +77,7 @@ public:
~SchedulerResponse() = default;
- uint32_t getRequestId() const {
+ UUID getRequestId() const {
return _requestId;
}
@@ -90,7 +98,7 @@ public:
}
private:
- uint32_t _requestId;
+ UUID _requestId;
std::shared_ptr<Notification<executor::RemoteCommandResponse>> _deferredValue;
};
@@ -104,7 +112,7 @@ public:
MoveChunkResponseImpl(ResponseHandle&& responseHandle)
: SchedulerResponse(std::move(responseHandle)) {}
- uint32_t getRequestId() const override {
+ UUID getRequestId() const override {
return SchedulerResponse::getRequestId();
}
@@ -122,7 +130,7 @@ public:
MergeChunksResponseImpl(ResponseHandle&& responseHandle)
: SchedulerResponse(std::move(responseHandle)) {}
- uint32_t getRequestId() const override {
+ UUID getRequestId() const override {
return SchedulerResponse::getRequestId();
}
@@ -140,7 +148,7 @@ public:
SplitVectorResponseImpl(ResponseHandle&& responseHandle)
: SchedulerResponse(std::move(responseHandle)) {}
- uint32_t getRequestId() const override {
+ UUID getRequestId() const override {
return SchedulerResponse::getRequestId();
}
@@ -171,7 +179,7 @@ public:
SplitChunkResponseImpl(ResponseHandle&& responseHandle)
: SchedulerResponse(std::move(responseHandle)) {}
- uint32_t getRequestId() const override {
+ UUID getRequestId() const override {
return SchedulerResponse::getRequestId();
}
@@ -189,7 +197,7 @@ public:
ChunkDataSizeResponseImpl(ResponseHandle&& responseHandle)
: SchedulerResponse(std::move(responseHandle)) {}
- uint32_t getRequestId() const override {
+ UUID getRequestId() const override {
return SchedulerResponse::getRequestId();
}
@@ -225,17 +233,19 @@ public:
*/
class CommandInfo {
public:
- enum class Type { kMoveChunk, kMergeChunks, kSplitVector, kSplitChunk, kDataSize };
-
- CommandInfo(Type type, const ShardId& targetShardId, const NamespaceString& nss)
- : _type(type), _targetShardId(targetShardId), _nss(nss) {}
+ CommandInfo(const ShardId& targetShardId, const NamespaceString& nss)
+ : _targetShardId(targetShardId), _nss(nss) {}
virtual ~CommandInfo() {}
virtual BSONObj serialise() const = 0;
- Type getType() const {
- return _type;
+ virtual bool requiresRecoveryOnCrash() const {
+ return false;
+ }
+
+ virtual bool requiresDistributedLock() const {
+ return false;
}
const ShardId& getTarget() const {
@@ -247,7 +257,6 @@ public:
}
private:
- Type _type;
ShardId _targetShardId;
NamespaceString _nss;
};
@@ -267,7 +276,7 @@ public:
bool waitForDelete,
MoveChunkRequest::ForceJumbo forceJumbo,
const ChunkVersion& version)
- : CommandInfo(Type::kMoveChunk, origin, nss),
+ : CommandInfo(origin, nss),
_chunkBoundaries(lowerBoundKey, upperBoundKey),
_recipient(recipient),
_version(version),
@@ -291,6 +300,14 @@ public:
return commandBuilder.obj();
}
+ bool requiresRecoveryOnCrash() const override {
+ return true;
+ }
+
+ bool requiresDistributedLock() const override {
+ return true;
+ }
+
private:
ChunkRange _chunkBoundaries;
ShardId _recipient;
@@ -308,7 +325,7 @@ public:
const BSONObj& lowerBoundKey,
const BSONObj& upperBoundKey,
const ChunkVersion& version)
- : CommandInfo(Type::kMergeChunks, shardId, nss),
+ : CommandInfo(shardId, nss),
_lowerBoundKey(lowerBoundKey),
_upperBoundKey(upperBoundKey),
_version(version) {}
@@ -351,7 +368,7 @@ public:
boost::optional<long long> maxChunkObjects,
boost::optional<long long> maxChunkSizeBytes,
bool force)
- : CommandInfo(Type::kSplitVector, shardId, nss),
+ : CommandInfo(shardId, nss),
_shardKeyPattern(shardKeyPattern),
_lowerBoundKey(lowerBoundKey),
_upperBoundKey(upperBoundKey),
@@ -408,7 +425,7 @@ public:
const BSONObj& upperBoundKey,
bool estimatedValue,
const ChunkVersion& version)
- : CommandInfo(Type::kDataSize, shardId, nss),
+ : CommandInfo(shardId, nss),
_shardKeyPattern(shardKeyPattern),
_lowerBoundKey(lowerBoundKey),
_upperBoundKey(upperBoundKey),
@@ -452,7 +469,7 @@ public:
const BSONObj& upperBoundKey,
const ChunkVersion& version,
const std::vector<BSONObj>& splitPoints)
- : CommandInfo(Type::kSplitChunk, shardId, nss),
+ : CommandInfo(shardId, nss),
_shardKeyPattern(shardKeyPattern),
_lowerBoundKey(lowerBoundKey),
_upperBoundKey(upperBoundKey),
@@ -487,18 +504,42 @@ private:
static const std::string kSplitKeys;
};
+class RecoveryCommandInfo : public CommandInfo {
+public:
+ RecoveryCommandInfo(const PersistedBalancerCommand& persistedCommand)
+ : CommandInfo(persistedCommand.getTarget(), persistedCommand.getNss()),
+ _serialisedCommand(persistedCommand.getRemoteCommand()),
+ _requiresDistributedLock(persistedCommand.getRequiresDistributedLock()) {}
+
+ BSONObj serialise() const override {
+ return _serialisedCommand;
+ }
+
+ bool requiresRecoveryOnCrash() const override {
+ return true;
+ }
+
+ bool requiresDistributedLock() const override {
+ return _requiresDistributedLock;
+ }
+
+private:
+ BSONObj _serialisedCommand;
+ bool _requiresDistributedLock;
+};
+
/**
* Helper data structure for submitting the shard command associated to a Request to the
* BalancerCommandsScheduler.
*/
struct CommandSubmissionHandle {
- CommandSubmissionHandle(uint32_t id, const std::shared_ptr<CommandInfo>& commandInfo)
+ CommandSubmissionHandle(UUID id, const std::shared_ptr<CommandInfo>& commandInfo)
: id(id), commandInfo(commandInfo) {}
CommandSubmissionHandle(CommandSubmissionHandle&& rhs)
: id(rhs.id), commandInfo(std::move(rhs.commandInfo)) {}
- const uint32_t id;
+ const UUID id;
const std::shared_ptr<CommandInfo> commandInfo;
};
@@ -509,14 +550,12 @@ using ExecutionContext = executor::TaskExecutor::CallbackHandle;
* Helper data structure for storing the outcome of a Command submission.
*/
struct CommandSubmissionResult {
- CommandSubmissionResult(uint32_t id,
- bool acquiredDistLock,
- StatusWith<ExecutionContext>&& context)
+ CommandSubmissionResult(UUID id, bool acquiredDistLock, StatusWith<ExecutionContext>&& context)
: id(id), acquiredDistLock(acquiredDistLock), context(std::move(context)) {}
CommandSubmissionResult(CommandSubmissionResult&& rhs)
: id(rhs.id), acquiredDistLock(rhs.acquiredDistLock), context(std::move(rhs.context)) {}
CommandSubmissionResult(const CommandSubmissionResult& rhs) = delete;
- uint32_t id;
+ UUID id;
bool acquiredDistLock;
StatusWith<ExecutionContext> context;
};
@@ -528,7 +567,7 @@ struct CommandSubmissionResult {
*/
class RequestData {
public:
- RequestData(uint32_t id, std::shared_ptr<CommandInfo>&& commandInfo)
+ RequestData(UUID id, std::shared_ptr<CommandInfo>&& commandInfo)
: _id(id),
_commandInfo(std::move(commandInfo)),
_deferredResponse(std::make_shared<Notification<executor::RemoteCommandResponse>>()),
@@ -544,7 +583,7 @@ public:
~RequestData() = default;
- uint32_t getId() {
+ UUID getId() const {
return _id;
}
@@ -577,7 +616,7 @@ private:
RequestData(const RequestData& rhs) = delete;
- const uint32_t _id;
+ const UUID _id;
std::shared_ptr<CommandInfo> _commandInfo;
@@ -596,40 +635,45 @@ public:
~BalancerCommandsSchedulerImpl();
- void start() override;
+ void start(OperationContext* opCtx) override;
void stop() override;
std::unique_ptr<MoveChunkResponse> requestMoveChunk(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardId& destination,
const MoveChunkSettings& commandSettings) override;
- std::unique_ptr<MergeChunksResponse> requestMergeChunks(const NamespaceString& nss,
+ std::unique_ptr<MergeChunksResponse> requestMergeChunks(OperationContext* opCtx,
+ const NamespaceString& nss,
const ChunkType& lowerBound,
const ChunkType& upperBound) override;
std::unique_ptr<SplitVectorResponse> requestSplitVector(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardKeyPattern& shardKeyPattern,
const SplitVectorSettings& commandSettings) override;
std::unique_ptr<SplitChunkResponse> requestSplitChunk(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardKeyPattern& shardKeyPattern,
const std::vector<BSONObj>& splitPoints) override;
std::unique_ptr<ChunkDataSizeResponse> requestChunkDataSize(
+ OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk,
const ShardKeyPattern& shardKeyPattern,
bool estimatedValue) override;
private:
- enum class SchedulerState { Running, Stopping, Stopped };
+ enum class SchedulerState { Recovering, Running, Stopping, Stopped };
static const int32_t _maxRunningRequests{10};
@@ -643,25 +687,22 @@ private:
stdx::thread _workerThreadHandle;
/**
- * Generator of unique Request IDs (within the life scope of BalancerCommandsSchedulerImpl)
- */
- AtomicWord<uint32_t> _requestIdCounter{0};
-
- /**
* Collection of all pending + running requests currently managed by
* BalancerCommandsSchedulerImpl, organized by ID.
*/
- stdx::unordered_map<uint32_t, RequestData> _incompleteRequests;
+ stdx::unordered_map<UUID, RequestData, UUID::Hash> _incompleteRequests;
/**
* List of request IDs that have not been yet submitted.
*/
- std::list<uint32_t> _pendingRequestIds;
+ std::list<UUID> _pendingRequestIds;
/**
* List of request IDs that are currently running (submitted, but not yet completed).
*/
- stdx::unordered_set<uint32_t> _runningRequestIds;
+ stdx::unordered_set<UUID, UUID::Hash> _runningRequestIds;
+
+ std::vector<UUID> _obsoleteRecoveryDocumentIds;
/**
* State to acquire and release DistLocks on a per namespace basis
@@ -674,9 +715,12 @@ private:
};
stdx::unordered_map<NamespaceString, Migrations> _migrationLocks;
- void _setState(WithLock, SchedulerState state);
+ ResponseHandle _buildAndEnqueueNewRequest(OperationContext* opCtx,
+ std::shared_ptr<CommandInfo>&& commandInfo);
- ResponseHandle _enqueueNewRequest(std::shared_ptr<CommandInfo>&& commandInfo);
+ ResponseHandle _enqueueRequest(WithLock, RequestData&& request);
+
+ bool _canSubmitNewRequests(WithLock);
Status _acquireDistLock(OperationContext* opCtx, NamespaceString nss);
@@ -689,9 +733,13 @@ private:
CommandSubmissionResult&& submissionResult);
void _applyCommandResponse(OperationContext* opCtx,
- uint32_t requestId,
+ UUID requestId,
const executor::TaskExecutor::ResponseStatus& response);
+ std::vector<RequestData> _loadRequestsToRecover(OperationContext* opCtx);
+
+ void _cleanUpObsoleteRecoveryInfo(WithLock, OperationContext* opCtx);
+
void _workerThread();
};
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
index 6127d08634b..e449c829eaa 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
@@ -29,9 +29,10 @@
#include "mongo/platform/basic.h"
+#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/s/balancer/balancer_commands_scheduler.h"
#include "mongo/db/s/balancer/balancer_commands_scheduler_impl.h"
-#include "mongo/db/s/shard_server_test_fixture.h"
+#include "mongo/db/s/config/config_server_test_fixture.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
namespace mongo {
@@ -39,49 +40,25 @@ namespace {
using unittest::assertGet;
-class BalancerCommandsSchedulerTest : public ShardServerTestFixture {
+class BalancerCommandsSchedulerTest : public ConfigServerTestFixture {
public:
- const std::vector<ShardType> kShardList{ShardType("shard0", "Host0:12345"),
- ShardType("shard1", "Host1:12345")};
- const NamespaceString kNss{"testDb.testColl"};
-
- void setUp() override {
- ShardServerTestFixture::setUp();
- for (auto shardType : kShardList) {
- auto shard = assertGet(
- shardRegistry()->getShard(operationContext(), ShardId(shardType.getName())));
- RemoteCommandTargeterMock::get(shard->getTargeter())
- ->setFindHostReturnValue(HostAndPort::parse(shardType.getHost()));
- }
- }
+ const ShardId kShardId0 = ShardId("shard0");
+ const ShardId kShardId1 = ShardId("shard1");
+ const HostAndPort kShardHost0 = HostAndPort("TestHost0", 12345);
+ const HostAndPort kShardHost1 = HostAndPort("TestHost1", 12346);
- void tearDown() override {
- _scheduler.stop();
- ShardServerTestFixture::tearDown();
- }
+ const std::vector<ShardType> kShardList{
+ ShardType(kShardId0.toString(), kShardHost0.toString()),
+ ShardType(kShardId1.toString(), kShardHost1.toString())};
- std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient() override {
- class StaticCatalogClient final : public ShardingCatalogClientMock {
- public:
- StaticCatalogClient(const std::vector<ShardType> kShardList) : _shards(kShardList) {}
-
- StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
- OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
- return repl::OpTimeWith<std::vector<ShardType>>(_shards);
- }
-
- private:
- const std::vector<ShardType> _shards;
- };
- return std::make_unique<StaticCatalogClient>(kShardList);
- }
+ const NamespaceString kNss{"testDb.testColl"};
- ChunkType makeChunk(long long min, std::string shardName) {
+ ChunkType makeChunk(long long min, const ShardId& shardId) {
ChunkType chunk;
chunk.setMin(BSON("x" << min));
chunk.setMax(BSON("x" << min + 10));
chunk.setJumbo(false);
- chunk.setShard(ShardId(shardName));
+ chunk.setShard(shardId);
chunk.setVersion(ChunkVersion(1, 1, OID::gen(), Timestamp(10)));
return chunk;
}
@@ -95,29 +72,65 @@ public:
MoveChunkRequest::ForceJumbo::kDoNotForce);
}
+ std::vector<BSONObj> getPersistedCommandDocuments(OperationContext* opCtx) {
+ auto statusWithPersistedCommandDocs =
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString::kConfigBalancerCommandsNamespace,
+ BSONObj(),
+ BSONObj(),
+ boost::none);
+
+ ASSERT_OK(statusWithPersistedCommandDocs.getStatus());
+ return statusWithPersistedCommandDocs.getValue().docs;
+ }
+
+
protected:
+ void setUp() override {
+ setUpAndInitializeConfigDb();
+ setupShards(kShardList);
+ // Scheduler commands target shards that need to be retrieved.
+ auto opCtx = operationContext();
+ configureTargeter(opCtx, kShardId0, kShardHost0);
+ configureTargeter(opCtx, kShardId1, kShardHost1);
+ }
+
+ void tearDown() override {
+ _scheduler.stop();
+ ConfigServerTestFixture::tearDown();
+ }
+
+ void configureTargeter(OperationContext* opCtx, ShardId shardId, const HostAndPort& host) {
+ auto targeter = RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(opCtx, shardId))->getTargeter());
+ targeter->setFindHostReturnValue(kShardHost0);
+ }
+
BalancerCommandsSchedulerImpl _scheduler;
};
TEST_F(BalancerCommandsSchedulerTest, StartAndStopScheduler) {
- _scheduler.start();
+ _scheduler.start(operationContext());
_scheduler.stop();
}
TEST_F(BalancerCommandsSchedulerTest, ResilientToMultipleStarts) {
- _scheduler.start();
- _scheduler.start();
+ _scheduler.start(operationContext());
+ _scheduler.start(operationContext());
}
TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) {
- _scheduler.start();
- ChunkType moveChunk = makeChunk(0, "shard0");
+ _scheduler.start(operationContext());
+ ChunkType moveChunk = makeChunk(0, kShardId0);
auto networkResponseFuture = launchAsync([&]() {
onCommand(
[&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); });
});
auto resp = _scheduler.requestMoveChunk(
- kNss, moveChunk, ShardId("shard1"), getDefaultMoveChunkSettings());
+ operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
ASSERT_OK(resp->getOutcome());
networkResponseFuture.default_timed_get();
// Ensure DistLock is released correctly
@@ -133,34 +146,34 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) {
}
TEST_F(BalancerCommandsSchedulerTest, SuccessfulMergeChunkCommand) {
- _scheduler.start();
- ChunkType chunk1 = makeChunk(0, "shard0");
- ChunkType chunk2 = makeChunk(10, "shard0");
+ _scheduler.start(operationContext());
+ ChunkType chunk1 = makeChunk(0, kShardId0);
+ ChunkType chunk2 = makeChunk(10, kShardId0);
auto networkResponseFuture = launchAsync([&]() {
onCommand(
[&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); });
});
- auto resp = _scheduler.requestMergeChunks(kNss, chunk1, chunk2);
+ auto resp = _scheduler.requestMergeChunks(operationContext(), kNss, chunk1, chunk2);
ASSERT_OK(resp->getOutcome());
networkResponseFuture.default_timed_get();
_scheduler.stop();
}
TEST_F(BalancerCommandsSchedulerTest, MergeChunkNonexistentShard) {
- _scheduler.start();
- ChunkType brokenChunk1 = makeChunk(0, "shard0");
+ _scheduler.start(operationContext());
+ ChunkType brokenChunk1 = makeChunk(0, kShardId0);
brokenChunk1.setShard(ShardId("nonexistent"));
- ChunkType brokenChunk2 = makeChunk(10, "shard0");
+ ChunkType brokenChunk2 = makeChunk(10, kShardId0);
brokenChunk2.setShard(ShardId("nonexistent"));
- auto resp = _scheduler.requestMergeChunks(kNss, brokenChunk1, brokenChunk2);
+ auto resp = _scheduler.requestMergeChunks(operationContext(), kNss, brokenChunk1, brokenChunk2);
auto shardNotFoundError = Status{ErrorCodes::ShardNotFound, "Shard nonexistent not found"};
ASSERT_EQ(resp->getOutcome(), shardNotFoundError);
_scheduler.stop();
}
TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitVectorCommand) {
- _scheduler.start();
- ChunkType splitChunk = makeChunk(0, "shard0");
+ _scheduler.start(operationContext());
+ ChunkType splitChunk = makeChunk(0, kShardId0);
BSONObjBuilder splitChunkResponse;
splitChunkResponse.append("ok", "1");
BSONArrayBuilder splitKeys(splitChunkResponse.subarrayStart("splitKeys"));
@@ -171,8 +184,11 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitVectorCommand) {
return splitChunkResponse.obj();
});
});
- auto resp = _scheduler.requestSplitVector(
- kNss, splitChunk, ShardKeyPattern(BSON("x" << 1)), SplitVectorSettings());
+ auto resp = _scheduler.requestSplitVector(operationContext(),
+ kNss,
+ splitChunk,
+ ShardKeyPattern(BSON("x" << 1)),
+ SplitVectorSettings());
ASSERT_OK(resp->getOutcome());
ASSERT_OK(resp->getSplitKeys().getStatus());
ASSERT_EQ(resp->getSplitKeys().getValue().size(), 1);
@@ -182,22 +198,25 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitVectorCommand) {
}
TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitChunkCommand) {
- _scheduler.start();
- ChunkType splitChunk = makeChunk(0, "shard0");
+ _scheduler.start(operationContext());
+ ChunkType splitChunk = makeChunk(0, kShardId0);
auto networkResponseFuture = launchAsync([&]() {
onCommand(
[&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); });
});
- auto resp = _scheduler.requestSplitChunk(
- kNss, splitChunk, ShardKeyPattern(BSON("x" << 1)), std::vector<BSONObj>{BSON("x" << 5)});
+ auto resp = _scheduler.requestSplitChunk(operationContext(),
+ kNss,
+ splitChunk,
+ ShardKeyPattern(BSON("x" << 1)),
+ std::vector<BSONObj>{BSON("x" << 5)});
ASSERT_OK(resp->getOutcome());
networkResponseFuture.default_timed_get();
_scheduler.stop();
}
TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) {
- _scheduler.start();
- ChunkType chunk = makeChunk(0, "shard0");
+ _scheduler.start(operationContext());
+ ChunkType chunk = makeChunk(0, kShardId0);
BSONObjBuilder chunkSizeResponse;
chunkSizeResponse.append("ok", "1");
chunkSizeResponse.append("size", 156);
@@ -206,8 +225,8 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) {
onCommand(
[&](const executor::RemoteCommandRequest& request) { return chunkSizeResponse.obj(); });
});
- auto resp =
- _scheduler.requestChunkDataSize(kNss, chunk, ShardKeyPattern(BSON("x" << 1)), false);
+ auto resp = _scheduler.requestChunkDataSize(
+ operationContext(), kNss, chunk, ShardKeyPattern(BSON("x" << 1)), false);
ASSERT_OK(resp->getOutcome());
ASSERT_OK(resp->getSize().getStatus());
ASSERT_EQ(resp->getSize().getValue(), 156);
@@ -218,14 +237,14 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) {
}
TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenNetworkReturnsError) {
- _scheduler.start();
- ChunkType moveChunk = makeChunk(0, "shard0");
+ _scheduler.start(operationContext());
+ ChunkType moveChunk = makeChunk(0, kShardId0);
auto timeoutError = Status{ErrorCodes::NetworkTimeout, "Mock error: network timed out"};
auto networkResponseFuture = launchAsync([&]() {
onCommand([&](const executor::RemoteCommandRequest& request) { return timeoutError; });
});
auto resp = _scheduler.requestMoveChunk(
- kNss, moveChunk, ShardId("shard1"), getDefaultMoveChunkSettings());
+ operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
ASSERT_EQUALS(resp->getOutcome(), timeoutError);
networkResponseFuture.default_timed_get();
// Ensure DistLock is released correctly
@@ -241,9 +260,9 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenNetworkReturnsError) {
}
TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenSchedulerIsStopped) {
- ChunkType moveChunk = makeChunk(0, "shard0");
+ ChunkType moveChunk = makeChunk(0, kShardId0);
auto resp = _scheduler.requestMoveChunk(
- kNss, moveChunk, ShardId("shard1"), getDefaultMoveChunkSettings());
+ operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
ASSERT_EQUALS(
resp->getOutcome(),
Status(ErrorCodes::CallbackCanceled, "Request rejected - balancer scheduler is stopped"));
@@ -262,10 +281,10 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) {
std::unique_ptr<MoveChunkResponse> resp;
{
FailPointEnableBlock failPoint("pauseBalancerWorkerThread");
- _scheduler.start();
- ChunkType moveChunk = makeChunk(0, "shard0");
+ _scheduler.start(operationContext());
+ ChunkType moveChunk = makeChunk(0, kShardId0);
resp = _scheduler.requestMoveChunk(
- kNss, moveChunk, ShardId("shard1"), getDefaultMoveChunkSettings());
+ operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
_scheduler.stop();
}
ASSERT_EQUALS(
@@ -282,12 +301,98 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) {
}
}
+TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenRequestIsSubmitted) {
+ // This prevents the request from being submitted by the scheduler worker thread.
+ FailPointEnableBlock failPoint("pauseBalancerWorkerThread");
+
+ auto opCtx = operationContext();
+ _scheduler.start(opCtx);
+ ChunkType moveChunk = makeChunk(0, kShardId0);
+ auto requestSettings = getDefaultMoveChunkSettings();
+
+ auto deferredResponse = _scheduler.requestMoveChunk(
+ operationContext(), kNss, moveChunk, ShardId(kShardId1), requestSettings);
+
+ // The command is persisted...
+ auto persistedCommandDocs = getPersistedCommandDocuments(opCtx);
+ ASSERT_EQUALS(1, persistedCommandDocs.size());
+ auto persistedCommand = PersistedBalancerCommand::parse(
+ IDLParserErrorContext("BalancerCommandsSchedulerTest"), persistedCommandDocs[0]);
+ // ... with the expected info.
+ ASSERT_EQ(deferredResponse->getRequestId(), persistedCommand.getRequestId());
+ ASSERT_EQ(kNss, persistedCommand.getNss());
+ ASSERT_EQ(moveChunk.getShard(), persistedCommand.getTarget());
+ ASSERT_TRUE(persistedCommand.getRequiresDistributedLock());
+ auto originalCommandInfo = MoveChunkCommandInfo(kNss,
+ moveChunk.getShard(),
+ kShardId1,
+ moveChunk.getMin(),
+ moveChunk.getMax(),
+ requestSettings.maxChunkSizeBytes,
+ requestSettings.secondaryThrottle,
+ requestSettings.waitForDelete,
+ requestSettings.forceJumbo,
+ moveChunk.getVersion());
+ ASSERT_BSONOBJ_EQ(originalCommandInfo.serialise(), persistedCommand.getRemoteCommand());
+}
+
+TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecoveringFromCrash) {
+ FailPoint* failpoint = globalFailPointRegistry().find("pauseBalancerWorkerThread");
+ failpoint->setMode(FailPoint::Mode::alwaysOn);
+ auto opCtx = operationContext();
+ _scheduler.start(opCtx);
+ ChunkType moveChunk = makeChunk(0, kShardId0);
+ auto requestSettings = getDefaultMoveChunkSettings();
+ auto networkResponseFuture = launchAsync([&]() {
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ auto originalCommandInfo = MoveChunkCommandInfo(kNss,
+ moveChunk.getShard(),
+ kShardId1,
+ moveChunk.getMin(),
+ moveChunk.getMax(),
+ requestSettings.maxChunkSizeBytes,
+ requestSettings.secondaryThrottle,
+ requestSettings.waitForDelete,
+ requestSettings.forceJumbo,
+ moveChunk.getVersion());
+ // 4. ... Which is inspected here.
+ ASSERT_BSONOBJ_EQ(originalCommandInfo.serialise(), request.cmdObj);
+
+ return BSON("ok" << true);
+ });
+ });
+
+ auto resp = _scheduler.requestMoveChunk(
+ operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
+ _scheduler.stop();
+ failpoint->setMode(FailPoint::Mode::off);
+
+ // 1. The original submission is expected to fail...
+ ASSERT_EQUALS(
+ resp->getOutcome(),
+ Status(ErrorCodes::CallbackCanceled, "Request cancelled - balancer scheduler is stopping"));
+
+ // 2. ... And a recovery document to be persisted
+ auto persistedCommandDocs = getPersistedCommandDocuments(operationContext());
+ ASSERT_EQUALS(1, persistedCommandDocs.size());
+
+ // 3. After restarting, the persisted document should eventually trigger a remote execution...
+ _scheduler.start(opCtx);
+ networkResponseFuture.default_timed_get();
+
+ // 5. Once the recovery is complete, no persisted documents should remain
+ // (stop() is invoked to ensure that the observed state is stable).
+ _scheduler.stop();
+ persistedCommandDocs = getPersistedCommandDocuments(operationContext());
+ ASSERT_EQUALS(0, persistedCommandDocs.size());
+}
+
TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL) {
OperationContext* opCtx;
FailPoint* failpoint = globalFailPointRegistry().find("pauseBalancerWorkerThread");
failpoint->setMode(FailPoint::Mode::alwaysOn);
{
- _scheduler.start();
+ _scheduler.start(operationContext());
opCtx = Client::getCurrent()->getOperationContext();
const std::string whyMessage(str::stream()
<< "Test acquisition of distLock for " << kNss.ns());
@@ -295,9 +400,9 @@ TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL
opCtx, kNss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout);
ASSERT_OK(scopedDistLock.getStatus());
failpoint->setMode(FailPoint::Mode::off);
- ChunkType moveChunk = makeChunk(0, "shard0");
+ ChunkType moveChunk = makeChunk(0, kShardId0);
auto resp = _scheduler.requestMoveChunk(
- kNss, moveChunk, ShardId("shard1"), getDefaultMoveChunkSettings());
+ operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
ASSERT_EQ(
resp->getOutcome(),
Status(ErrorCodes::LockBusy, "Failed to acquire dist lock testDb.testColl locally"));
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.cpp b/src/mongo/db/s/config/sharding_catalog_manager.cpp
index 188279a1b7c..c3bdca61a81 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/query/query_request_helper.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/balancer/balancer_command_document_gen.h"
#include "mongo/db/s/balancer/type_migration.h"
#include "mongo/db/s/sharding_util.h"
#include "mongo/db/s/type_lockpings.h"
@@ -427,6 +428,18 @@ Status ShardingCatalogManager::_initConfigIndexes(OperationContext* opCtx) {
return result.withContext("couldn't create ns_1_min_1 index on config.migrations");
}
+ result =
+ configShard->createIndexOnConfig(opCtx,
+ NamespaceString::kConfigBalancerCommandsNamespace,
+ BSON(PersistedBalancerCommand::kRequestIdFieldName << 1),
+ unique);
+ if (!result.isOK()) {
+ return result.withContext(
+ "couldn't create requestId_1 index on "
+ "config.balancerCommandsSchedulerOngoingOperations");
+ }
+
+
result = configShard->createIndexOnConfig(
opCtx, ShardType::ConfigNS, BSON(ShardType::host() << 1), unique);
if (!result.isOK()) {